Knative Eventing with Quarkus, Kafka and Camel
In this article, you will learn how to use Quarkus with Camel to create applications that send messages to Kafka and receive CloudEvent
from Knative Eventing. We will build a very similar system to the system described in my previous article Knative Eventing with Kafka and Quarkus. However, this time we will use Apache Camel instead of several Quarkus extensions including Kafka support.
Source Code
If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.
First, you should go to the saga
directory. It contains two applications built on top of Quarkus and Apache Camel. Today we will implement an eventual consistency pattern (also known as a SAGA pattern). It will use the Knative Eventing model for exchanging events between our applications.
1. Prerequisites
Before we start, we need to configure some components like Kafka, Knative or Kafka Eventing Broker. Let’s go further these steps.
1.1. Install Apache Kafka cluster
Firstly, let’s create our kafka
namespace.
$ kubectl create namespace kafka
Then we apply the installation files, including ClusterRoles
, ClusterRoleBindings
and other required Kubernetes CustomResourceDefinitions
(CRD).
$ kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
After that, we can create a single-node persistent Apache Kafka cluster. We use an example custom resource for the Strimzi operator.
$ kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka
Finally, we may verify our installation. You should the same result as shown below.
1.2. Install Knative Serving and Eventing
You can install Knative on your Kubernetes cluster using YAML manifests or operator. The current version of Knative is 0.23. This is minimal list of steps you need to do with YAML-based installation is visible below. For more details, you may refer to the documentation. I place or the required commands to simplify a process for you. Let’s install Knative Serving.
$ kubectl apply -f https://github.com/knative/serving/releases/download/v0.23.0/serving-crds.yaml
$ kubectl apply -f https://github.com/knative/serving/releases/download/v0.23.0/serving-core.yaml
$ kubectl apply -f https://github.com/knative/net-kourier/releases/download/v0.23.0/kourier.yaml
$ kubectl patch configmap/config-network \
--namespace knative-serving \
--type merge \
--patch '{"data":{"ingress.class":"kourier.ingress.networking.knative.dev"}}'
Then let’s install Knative Eventing.
$ kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-crds.yaml
$ kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-core.yaml
1.3. Install Knative Kafka Eventing
The following commands install the Apache Kafka broker, and run event routing in a system namespace, knative-eventing
, by default.
$ kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-controller.yaml
$ kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-broker.yaml
Then, we should install CRD with KafkaBinding
and KafkaSource
.
$ kubectl apply -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml
Finally, let’s just create a broker.
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: default
namespace: default
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
1.4. Install Apache Camel K operator (optional)
If you would like to use Apache Camel K to run the Quarkus application on Knative you first install its operator. After downloading Camel K CLI you just need to run the following command.
$ kamel install
2. Run the application with Apache Camel K
In order to deploy and run an application on Knative, we may execute the kamel run
command. Some parameters might be set in the source file. In the command visible below, I’m setting the name Knative service, source file location, and Quarkus properties.
$ kamel run --name order-saga --dev OrderRoute.java \
-p quarkus.datasource.db-kind=h2 \
-p quarkus.datasource.jdbc.url=jdbc:h2:mem:testdb \
-p quarkus.hibernate-orm.packages=com.github.piomin.entity.model.order
For more details about deploying Quarkus with Camel K on Kubernetes you may refer to my article Apache Camel K and Quarkus on Kubernetes.
Finally, I was not able to deploy exactly this application on Knative with Camel K. That’s because it didn’t see JPA entities included in the application with the external library. However, the application is also prepared for deploying with Camel K. The whole source code is a single Java file and there are some Camel K modeline hooks in this source code.
3. Integrate Quarkus with Apache Camel
We can easily integrate Apache Camel routes with Quarkus. Camel Quarkus provides extensions for many of the Camel components. We need to include those components in our Maven pom.xml
. What type of components do we need? The full list is visible below. However, first, let’s discuss them a little bit more.
Our application uses JPA to store entities in the H2 database. So we will include the Camel Quarkus JPA extension to provide the JPA implementation with Hibernate. For a single persistence unit, this extension automatically creates EntityManagerFactory
and TransactionManager
. In order to integrate with Apache Kafka, we need to include the Camel Quarkus Kafka extension. In order to receive events from Knative, we should expose the HTTP POST endpoint. That’s why we need several extensions like Platform HTTP or Jackson. Here’s my list of Maven dependencies.
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-platform-http</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-bean</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-timer</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-jackson</artifactId>
</dependency>
Then, we just need to create a class that extends RouteBuilder
. The routes have to be defined inside the configure()
method. Before we get into the details let’s analyze our domain model classes.
public class CustomerRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
...
}
}
4. Domain model for Quarkus JPA and Kafka
I created a separate project for entity model classes. The repository is available on GitHub https://github.com/piomin/entity-model.git. Thanks to that, I have a typical serverless application with consists of a single class. It is also possible to easily deploy it on Knative with Camel K. Here’s a model entity for order-saga
.
@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order implements Serializable {
@Id
@GeneratedValue
private Long id;
private Integer customerId;
private Integer productId;
private int amount;
private int productCount;
@Enumerated
private OrderStatus status = OrderStatus.NEW;
}
Just to simplify, I’m using the same class when sending events to Kafka. We can also take a look at a model entity for customer-saga
.
@Entity
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Customer implements Serializable {
@Id
@GeneratedValue
private Long id;
private String name;
private int amountAvailable;
private int amountReserved;
}
5. Building Camel routes with Quarkus and Kafka extension
In the first step, we are going to generate orders and send them to the Kafka topic. Before that, we will store them in the H2 database using the Camel JPA extension. This part of logic is implemented inside order-saga
.
from("timer:tick?period=10000")
.setBody(exchange ->
new Order(null, r.nextInt(10) + 1, r.nextInt(10) + 1, 100, 1, OrderStatus.NEW))
.to("jpa:" + Order.class.getName())
.marshal().json(JsonLibrary.Jackson)
.log("New Order: ${body}")
.toD("kafka:order-events?brokers=${env.KAFKA_BOOTSTRAP_SERVERS}");
Some things need to be clarified here. Before sending a message to the Kafka topic we need to serialize it to the JSON format. The application does not anything about the Kafka address. This address has been injected into the container by the KafkaBinding object. It is available for the Camel route as the environment variable KAFKA_BOOTSTRAP_SERVERS
.
Now, let’s switch to the customer-saga
application. In order to receive an event for the Knative broker, we should expose an HTTP POST endpoint. This endpoint takes Order
as an input. Then, if the order’s status equals NEW
it performs a reservation on the customer account. Before that, it sends back a response to a reserver-events
topic.
Also, let’s take a look at the fragment responsible for searching the customer in the database and performing an update. We use Quarkus Camel JPA extension. First, we need to define a JPQL query to retrieve an entity. Then, we update the Customer
entity depending on the order status.
rest("/customers")
.post("/reserve").consumes("application/json")
.route()
.log("Order received: ${body}")
.unmarshal().json(JsonLibrary.Jackson, Order.class)
.choice()
.when().simple("${body.status.toString()} == 'NEW'")
.setBody(exchange -> {
Order order = exchange.getIn().getBody(Order.class);
order.setStatus(OrderStatus.IN_PROGRESS);
return order;
})
.marshal().json(JsonLibrary.Jackson)
.log("Reservation sent: ${body}")
.toD("kafka:reserve-events?brokers=${env.KAFKA_BOOTSTRAP_SERVERS}")
.end()
.unmarshal().json(JsonLibrary.Jackson, Order.class)
.setProperty("orderAmount", simple("${body.amount}", Integer.class))
.setProperty("orderStatus", simple("${body.status}", OrderStatus.class))
.toD("jpa:" + Customer.class.getName() +
"?query=select c from Customer c where c.id= ${body.customerId}")
.choice()
.when().simple("${exchangeProperty.orderStatus} == 'IN_PROGRESS'")
.setBody(exchange -> {
Customer customer = (Customer) exchange.getIn().getBody(List.class).get(0);
customer.setAmountReserved(customer.getAmountReserved() +
exchange.getProperty("orderAmount", Integer.class));
customer.setAmountAvailable(customer.getAmountAvailable() -
exchange.getProperty("orderAmount", Integer.class));
return customer;
})
.otherwise()
.setBody(exchange -> {
Customer customer = (Customer) exchange.getIn().getBody(List.class).get(0);
customer.setAmountReserved(customer.getAmountReserved() -
exchange.getProperty("orderAmount", Integer.class));
return customer;
})
.end()
.log("Current customer: ${body}")
.to("jpa:" + Customer.class.getName() + "?useExecuteUpdate=true")
.setHeader(Exchange.HTTP_RESPONSE_CODE, constant(201)).setBody(constant(null))
.endRest();
We can also generate some test data in customer-saga
using Camel route. It runs once just after the Quarkus application startup.
from("timer://runOnce?repeatCount=1&delay=100")
.loop(10)
.setBody(exchange -> new Customer(null, "Test"+(++i), r.nextInt(50000), 0))
.to("jpa:" + Customer.class.getName())
.log("Add: ${body}")
.end();
6. Configure Knative Eventing with Kafka broker
6.1. Architecture with Quarkus, Camel and Kafka
We have already created two applications built on top of Quarkus and Apache Camel. Both of these applications expose HTTP POST endpoints and send events to the Kafka topics. Now, we need to create some Kubernetes objects to orchestrate the process. So far, we just send the events to topics, they have not been routed to the target applications. Let’s take a look at the architecture of our system. There are two topics on Kafka that receive events: order-events
and reserve-events
. The messages from those topics are not automatically sent to the Knative broker. So, first, we need to create the KafkaSource
object to get messages from these topics and send them to the broker.
6.2. Configure Knative eventing
The KafkaSource
object takes the list of input topics and a target application. In that case, the target application is the Knative broker. We may create a single KafkaSource
for both topics or two sources per each topic. Here’s the KafkaSource
definition that takes messages from the reserve-events
topic.
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source-reserve-order
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- reserve-events
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
Let’s create both sources. After creating them, we may verify if everything went well by executing the command kubectl get sources
.
Before running our application on Knative we should create KafkaBinding
objects. This object is responsible for injecting the address of the Kafka cluster into the application containers. The address of a broker will be available for the application under the KAFKA_BOOTSTRAP_SERVERS
environment variable.
apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
name: kafka-binding-customer-saga
spec:
subject:
apiVersion: serving.knative.dev/v1
kind: Service
name: customer-saga
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
Let’s create both KafkaBinding
objects. Here’s the list of available bindings after running the kubectl get bindings
command.
Finally, we may proceed to the last step of our configuration. We will create triggers. A trigger represents a desire to subscribe to events from a specific broker. Moreover, we may apply a simple filtering mechanism using the Trigger
object. For example, if we want to send only the events from the order-events
topic and with the type dev.knative.kafka.event
we should create a definition as shown below.
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: customer-saga-trigger
spec:
broker: default
filter:
attributes:
type: dev.knative.kafka.event
source: /apis/v1/namespaces/default/kafkasources/kafka-source-orders-customer#order-events
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: customer-saga
uri: /customers/reserve
Similarly, we should create a trigger that sends messages to the order-saga
POST endpoint. It gets messages from the reserve-events
source and sends them to the /orders/confirm
endpoint.
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: order-saga-trigger
spec:
broker: default
filter:
attributes:
type: dev.knative.kafka.event
source: /apis/v1/namespaces/default/kafkasources/kafka-source-reserve-order#reserve-events
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: order-saga
uri: /orders/confirm
Finally, we can display a list of active triggers by executing the command kubectl get trigger
.
7. Deploy Quarkus application on Knative
Once we finished the development of our sample applications we may deploy them on Knative. One of the possible deployment options is with Apache Camel K. In case of any problems with this type of deployment we may also use the Quarkus Kubernetes module. Firstly, let’s include two required modules. We will also leverage the Jib Maven plugin.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image-jib</artifactId>
</dependency>
The rest of the configuration should be provided inside the application properties file. In the first step, we need to enable automatic deployment on Kubernetes by setting the property quarkus.kubernetes.deploy
to true
. By default, Quarkus creates a standard Kubernetes Deployment
. Therefore, we should set the quarkus.kubernetes.deployment-target
to knative
. In that case, it will generate a Knative Service
YAML. Finally, we have to change the name of the image group to dev.local
. Of course, it is required just if we run our applications on the local Kubernetes cluster like me.
quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = knative
quarkus.container-image.group = dev.local
Now, if run the build with the mvn clean package
command, our application will be automatically deployed on Knative. After that, let’s verify the list of Knative services.
Once, the order-saga
application is started, it generates one order per 10 seconds and then sends it to the Kafka topic order-events
. We can easily verify that it works properly, by checking out a list of active topics as shown below.
We can also verify a list of Knative events exchanged by the applications.
$ kubectl get eventtype
Final Thoughts
Quarkus and Apache Camel seem to be a perfect combination when creating serverless applications on Knative. We can easily implement the whole logic within a single source code file. We can also use Camel K to deploy our applications on Kubernetes or Knative. You can compare the approach described in this article with the one based on Quarkus and its extensions to Kafka or Knative available in this repository.
4 COMMENTS