Knative Eventing with Quarkus, Kafka and Camel

Knative Eventing with Quarkus, Kafka and Camel

In this article, you will learn how to use Quarkus with Camel to create applications that send messages to Kafka and receive CloudEvent from Knative Eventing. We will build a very similar system to the system described in my previous article Knative Eventing with Kafka and Quarkus. However, this time we will use Apache Camel instead of several Quarkus extensions including Kafka support.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

First, you should go to the saga directory. It contains two applications built on top of Quarkus and Apache Camel. Today we will implement an eventual consistency pattern (also known as a SAGA pattern). It will use the Knative Eventing model for exchanging events between our applications.

1. Prerequisites

Before we start, we need to configure some components like Kafka, Knative or Kafka Eventing Broker. Let’s go further these steps.

1.1. Install Apache Kafka cluster

Firstly, let’s create our kafka namespace.

$ kubectl create namespace kafka

Then we apply the installation files, including ClusterRolesClusterRoleBindings and other required Kubernetes CustomResourceDefinitions (CRD).

$ kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

After that, we can create a single-node persistent Apache Kafka cluster. We use an example custom resource for the Strimzi operator.

$ kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka

Finally, we may verify our installation. You should the same result as shown below.

1.2. Install Knative Serving and Eventing

You can install Knative on your Kubernetes cluster using YAML manifests or operator. The current version of Knative is 0.23. This is minimal list of steps you need to do with YAML-based installation is visible below. For more details, you may refer to the documentation. I place or the required commands to simplify a process for you. Let’s install Knative Serving.

$ kubectl apply -f https://github.com/knative/serving/releases/download/v0.23.0/serving-crds.yaml
$ kubectl apply -f https://github.com/knative/serving/releases/download/v0.23.0/serving-core.yaml
$ kubectl apply -f https://github.com/knative/net-kourier/releases/download/v0.23.0/kourier.yaml
$ kubectl patch configmap/config-network \
  --namespace knative-serving \
  --type merge \
  --patch '{"data":{"ingress.class":"kourier.ingress.networking.knative.dev"}}'

Then let’s install Knative Eventing.

$ kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-crds.yaml
$ kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-core.yaml

1.3. Install Knative Kafka Eventing

The following commands install the Apache Kafka broker, and run event routing in a system namespace, knative-eventing, by default.

$ kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-controller.yaml
$ kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-broker.yaml

Then, we should install CRD with KafkaBinding and KafkaSource.

$ kubectl apply -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml

Finally, let’s just create a broker.

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: default
  namespace: default
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing

1.4. Install Apache Camel K operator (optional)

If you would like to use Apache Camel K to run the Quarkus application on Knative you first install its operator. After downloading Camel K CLI you just need to run the following command.

$ kamel install

2. Run the application with Apache Camel K

In order to deploy and run an application on Knative, we may execute the kamel run command. Some parameters might be set in the source file. In the command visible below, I’m setting the name Knative service, source file location, and Quarkus properties.

$ kamel run --name order-saga --dev OrderRoute.java \
    -p quarkus.datasource.db-kind=h2 \
    -p quarkus.datasource.jdbc.url=jdbc:h2:mem:testdb \
    -p quarkus.hibernate-orm.packages=com.github.piomin.entity.model.order

For more details about deploying Quarkus with Camel K on Kubernetes you may refer to my article Apache Camel K and Quarkus on Kubernetes.

Finally, I was not able to deploy exactly this application on Knative with Camel K. That’s because it didn’t see JPA entities included in the application with the external library. However, the application is also prepared for deploying with Camel K. The whole source code is a single Java file and there are some Camel K modeline hooks in this source code.

3. Integrate Quarkus with Apache Camel

We can easily integrate Apache Camel routes with Quarkus. Camel Quarkus provides extensions for many of the Camel components. We need to include those components in our Maven pom.xml. What type of components do we need? The full list is visible below. However, first, let’s discuss them a little bit more.

Our application uses JPA to store entities in the H2 database. So we will include the Camel Quarkus JPA extension to provide the JPA implementation with Hibernate. For a single persistence unit, this extension automatically creates EntityManagerFactory and TransactionManager. In order to integrate with Apache Kafka, we need to include the Camel Quarkus Kafka extension. In order to receive events from Knative, we should expose the HTTP POST endpoint. That’s why we need several extensions like Platform HTTP or Jackson. Here’s my list of Maven dependencies.

<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-core</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-platform-http</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-bean</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-timer</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-jpa</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-rest</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-jackson</artifactId>
</dependency>

Then, we just need to create a class that extends RouteBuilder. The routes have to be defined inside the configure() method. Before we get into the details let’s analyze our domain model classes.

public class CustomerRoute extends RouteBuilder {
   @Override
   public void configure() throws Exception { 
      ...
   }
}

4. Domain model for Quarkus JPA and Kafka

I created a separate project for entity model classes. The repository is available on GitHub https://github.com/piomin/entity-model.git. Thanks to that, I have a typical serverless application with consists of a single class. It is also possible to easily deploy it on Knative with Camel K. Here’s a model entity for order-saga.

@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order implements Serializable {
    @Id
    @GeneratedValue
    private Long id;
    private Integer customerId;
    private Integer productId;
    private int amount;
    private int productCount;
    @Enumerated
    private OrderStatus status = OrderStatus.NEW;
}

Just to simplify, I’m using the same class when sending events to Kafka. We can also take a look at a model entity for customer-saga.

@Entity
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Customer implements Serializable {
    @Id
    @GeneratedValue
    private Long id;
    private String name;
    private int amountAvailable;
    private int amountReserved;
}

5. Building Camel routes with Quarkus and Kafka extension

In the first step, we are going to generate orders and send them to the Kafka topic. Before that, we will store them in the H2 database using the Camel JPA extension. This part of logic is implemented inside order-saga.

from("timer:tick?period=10000")
   .setBody(exchange -> 
      new Order(null, r.nextInt(10) + 1, r.nextInt(10) + 1, 100, 1, OrderStatus.NEW))
   .to("jpa:" + Order.class.getName())
   .marshal().json(JsonLibrary.Jackson)
   .log("New Order: ${body}")
   .toD("kafka:order-events?brokers=${env.KAFKA_BOOTSTRAP_SERVERS}");

Some things need to be clarified here. Before sending a message to the Kafka topic we need to serialize it to the JSON format. The application does not anything about the Kafka address. This address has been injected into the container by the KafkaBinding object. It is available for the Camel route as the environment variable KAFKA_BOOTSTRAP_SERVERS.

Now, let’s switch to the customer-saga application. In order to receive an event for the Knative broker, we should expose an HTTP POST endpoint. This endpoint takes Order as an input. Then, if the order’s status equals NEW it performs a reservation on the customer account. Before that, it sends back a response to a reserver-events topic.

Also, let’s take a look at the fragment responsible for searching the customer in the database and performing an update. We use Quarkus Camel JPA extension. First, we need to define a JPQL query to retrieve an entity. Then, we update the Customer entity depending on the order status.

rest("/customers")
   .post("/reserve").consumes("application/json")
   .route()
      .log("Order received: ${body}")
      .unmarshal().json(JsonLibrary.Jackson, Order.class)
      .choice()
         .when().simple("${body.status.toString()} == 'NEW'")
            .setBody(exchange -> {
               Order order = exchange.getIn().getBody(Order.class);
               order.setStatus(OrderStatus.IN_PROGRESS);
               return order;
            })
            .marshal().json(JsonLibrary.Jackson)
            .log("Reservation sent: ${body}")
            .toD("kafka:reserve-events?brokers=${env.KAFKA_BOOTSTRAP_SERVERS}")
      .end()
      .unmarshal().json(JsonLibrary.Jackson, Order.class)
      .setProperty("orderAmount", simple("${body.amount}", Integer.class))
      .setProperty("orderStatus", simple("${body.status}", OrderStatus.class))
      .toD("jpa:" + Customer.class.getName() + 
         "?query=select c from Customer c where c.id= ${body.customerId}")
      .choice()
         .when().simple("${exchangeProperty.orderStatus} == 'IN_PROGRESS'")
            .setBody(exchange -> {
               Customer customer = (Customer) exchange.getIn().getBody(List.class).get(0);
               customer.setAmountReserved(customer.getAmountReserved() + 
                  exchange.getProperty("orderAmount", Integer.class));
               customer.setAmountAvailable(customer.getAmountAvailable() - 
                  exchange.getProperty("orderAmount", Integer.class));
               return customer;
            })
            .otherwise()
               .setBody(exchange -> {
                  Customer customer = (Customer) exchange.getIn().getBody(List.class).get(0);
                  customer.setAmountReserved(customer.getAmountReserved() - 
                     exchange.getProperty("orderAmount", Integer.class));
                  return customer;
               })
      .end()
      .log("Current customer: ${body}")
      .to("jpa:" + Customer.class.getName() + "?useExecuteUpdate=true")
      .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(201)).setBody(constant(null))
.endRest();

We can also generate some test data in customer-saga using Camel route. It runs once just after the Quarkus application startup.

from("timer://runOnce?repeatCount=1&delay=100")
   .loop(10)
      .setBody(exchange -> new Customer(null, "Test"+(++i), r.nextInt(50000), 0))
      .to("jpa:" + Customer.class.getName())
      .log("Add: ${body}")
   .end();

6. Configure Knative Eventing with Kafka broker

6.1. Architecture with Quarkus, Camel and Kafka

We have already created two applications built on top of Quarkus and Apache Camel. Both of these applications expose HTTP POST endpoints and send events to the Kafka topics. Now, we need to create some Kubernetes objects to orchestrate the process. So far, we just send the events to topics, they have not been routed to the target applications. Let’s take a look at the architecture of our system. There are two topics on Kafka that receive events: order-events and reserve-events. The messages from those topics are not automatically sent to the Knative broker. So, first, we need to create the KafkaSource object to get messages from these topics and send them to the broker.

quarkus-camel-kafka-arch

6.2. Configure Knative eventing

The KafkaSource object takes the list of input topics and a target application. In that case, the target application is the Knative broker. We may create a single KafkaSource for both topics or two sources per each topic. Here’s the KafkaSource definition that takes messages from the reserve-events topic.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-reserve-order
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - reserve-events
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default

Let’s create both sources. After creating them, we may verify if everything went well by executing the command kubectl get sources.

quarkus-camel-kafka-sources

Before running our application on Knative we should create KafkaBinding objects. This object is responsible for injecting the address of the Kafka cluster into the application containers. The address of a broker will be available for the application under the KAFKA_BOOTSTRAP_SERVERS environment variable.

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-customer-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: customer-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

Let’s create both KafkaBinding objects. Here’s the list of available bindings after running the kubectl get bindings command.

Finally, we may proceed to the last step of our configuration. We will create triggers. A trigger represents a desire to subscribe to events from a specific broker. Moreover, we may apply a simple filtering mechanism using the Trigger object. For example, if we want to send only the events from the order-events topic and with the type dev.knative.kafka.event we should create a definition as shown below.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: customer-saga-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source-orders-customer#order-events
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

Similarly, we should create a trigger that sends messages to the order-saga POST endpoint. It gets messages from the reserve-events source and sends them to the /orders/confirm endpoint.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: order-saga-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source-reserve-order#reserve-events
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-saga
    uri: /orders/confirm

Finally, we can display a list of active triggers by executing the command kubectl get trigger.

quarkus-camel-kafka-triggers

7. Deploy Quarkus application on Knative

Once we finished the development of our sample applications we may deploy them on Knative. One of the possible deployment options is with Apache Camel K. In case of any problems with this type of deployment we may also use the Quarkus Kubernetes module. Firstly, let’s include two required modules. We will also leverage the Jib Maven plugin.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-kubernetes</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-container-image-jib</artifactId>
</dependency>

The rest of the configuration should be provided inside the application properties file. In the first step, we need to enable automatic deployment on Kubernetes by setting the property quarkus.kubernetes.deploy to true. By default, Quarkus creates a standard Kubernetes Deployment. Therefore, we should set the quarkus.kubernetes.deployment-target to knative. In that case, it will generate a Knative Service YAML. Finally, we have to change the name of the image group to dev.local. Of course, it is required just if we run our applications on the local Kubernetes cluster like me.

quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = knative
quarkus.container-image.group = dev.local

Now, if run the build with the mvn clean package command, our application will be automatically deployed on Knative. After that, let’s verify the list of Knative services.

Once, the order-saga application is started, it generates one order per 10 seconds and then sends it to the Kafka topic order-events. We can easily verify that it works properly, by checking out a list of active topics as shown below.

We can also verify a list of Knative events exchanged by the applications.

$ kubectl get eventtype

Final Thoughts

Quarkus and Apache Camel seem to be a perfect combination when creating serverless applications on Knative. We can easily implement the whole logic within a single source code file. We can also use Camel K to deploy our applications on Kubernetes or Knative. You can compare the approach described in this article with the one based on Quarkus and its extensions to Kafka or Knative available in this repository.

4 COMMENTS

comments user
bbk

Excellent post! Great coverage of Knative eventing concepts.

    comments user
    piotr.minkowski

    Thanks!

comments user
Bala

Excellent post! Great coverage of interplay between Camel/Quarkus & Knative!!!

    comments user
    piotr.minkowski

    Thanks!

Leave a Reply