Knative Eventing with Kafka and Quarkus
In this article, you will learn how to run eventing applications on Knative using Kafka and Quarkus. Previously I described the same approach for Kafka and Spring Cloud. If you want to compare both of them read my article Knative Eventing with Kafka and Spring Cloud. We will deploy exactly the same architecture. However, instead of Spring Cloud Functions we will use Quarkus Funqy. Also, Spring Cloud Stream may be replaced with Quarkus Kafka. Before we start, let’s clarify some things.
Concept over Knative and Quarkus
Quarkus supports Knative in several ways. First of all, we may use the Quarkus Kubernetes module to simplify deployment on Knative. We can also use the Quarkus Funqy Knative Event extension to route and process cloud events within functions. That’s not all. Quarkus supports a serverless functional style. With the Quarkus Funqy module, we can write functions deployable to various FaaS (including Knative). These functions can be invoked through HTTP. Finally, we may integrate our application with Kafka topics using annotations from the Quarkus Kafka extension.
The Quarkus Funqy Knative Event module bases on the Knative broker and triggers. Since we will use Kafka Source instead of broker and trigger we won’t include that module. However, we can still take advantage of Quarkus Funqy and HTTP binding.
Source Code
If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.
As I mentioned before, we will the same architecture and scenario as in my previous article about Knative eventing. Let’s briefly describe it.
Today we will implement an eventual consistency pattern (also known as a SAGA pattern). How it works? The sample system consists of three services. The order-service
creates a new order that is related to the customers and products. That order is sent to the Kafka topic. Then, our two other applications customer-service
and product-service
receive the order event. After that, they perform a reservation. The customer-service
reserves an order’s amount on the customer’s account. Meanwhile the product-service
reserves a number of products specified in the order. Both these services send a response to the order-service
through the Kafka topic. If the order-service
receives positive reservations from both services it confirms the order. Then, it sends an event with that information. Both customer-service
and product-service
receive the event and confirm reservations. You can verify it in the picture below.
Prerequisites
There are several requirements we need to comply before start. I described them all in my previous article about Knative Eventing. Here’s just a brief remind:
- Kubernetes cluster with at least 1.17 version. I’m using a local cluster. If you use a remote cluster replace
dev.local
in image name into your Docker account name - Install Knative Serving and Eventing on your cluster. You may find the detailed installation instructions here.
- Install Kafka Eventing Broker. Here’s the link to the releases site. You don’t need everything – we will use the
KafkaSource
andKafkaBinding
CRDs - Install Kafka cluster with the Strimzi operator. I installed it in the
kafka
namespace. The name of my cluster ismy-cluster
.
Step 1. Installing Kafka Knative components
Assuming you have already installed all the required elements to run Knative Eventing on your Kubernetes cluster, we may create some components dedicated to applications. You may find YAML manifests with object declarations in the k8s
directory inside every single application directory. Firstly, let’s create a KafkaBinding. It is responsible for injecting the address of the Kafka cluster into the application container. Thanks to KafkaBinding that address is visible inside the container as the KAFKA_BOOTSTRAP_SERVERS
environment variable. Here’s an example of the YAML declaration for the customer-saga
application. We should create similar objects for two other applications.
apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
name: kafka-binding-customer-saga
spec:
subject:
apiVersion: serving.knative.dev/v1
kind: Service
name: customer-saga
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
In the next step, we create the KafkaSource
object. It reads events from the particular topic and passes them to the consumer. It calls the HTTP POST endpoint exposed by the application. We can override a default context path of the HTTP endpoint. For the customer-saga the target URL is /reserve
. It should receive events from the order-events
topic. Because both customer-saga
and product-saga
listen for events from the order-events
topic we need to create a similar KafkaSource
object also for product-saga
.
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source-orders-customer
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- order-events
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: customer-saga
uri: /reserve
On the other hand, the order-saga
listens for events on the reserve-events
topic. If you want to verify our scenario once again please refer to the diagram in the Source Code section. This time the target URL is /confirm
.
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source-orders-confirm
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- reserve-events
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: order-saga
uri: /confirm
Let’s verify a list of Kafka sources. In our case there is a single KafkaSource
per application. Before deploying our Quarkus application on Knative your Kafka source won’t be ready.
Step 2. Integrating Quarkus with Kafka
In order to integrate Quarkus with Apache Kafka, we may use the SmallRye Reactive Messaging library. Thanks to that we may define an input and output topic for each method using annotations. The messages are serialized to JSON. We can also automatically expose Kafka connection status in health check. Here’s the list of dependencies we need to include in Maven pom.xml
.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
</dependency>
Before we start with the source code, we need to provide some configuration settings in the application.properties
file. Of course, Kafka requires a connection URL to the cluster. We use the environment variable injected by the KafkaBinding
object. Also, the output topic name should be configured. Here’s a list of required properties for the order-saga
application.
kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}
mp.messaging.outgoing.order-events.connector = smallrye-kafka
mp.messaging.outgoing.order-events.topic = order-events
mp.messaging.outgoing.order-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer
Finally, we may switch to the code. Let’s start with order-saga
. It will continuously send orders to the order-events
topic. Those events are received by both customer-saga
and product-saga
applications. The method responsible for generating and sending events returns reactive stream using Mutiny Multi
. It sends an event every second. We need to annotate the method with the @Outgoing
annotation passing the name of output defined in application properties. Also, @Broadcast
annotation Indicates that the event is dispatched to all subscribers. Before sending, every order needs to be persisted in a database (we use H2 in-memory database).
@ApplicationScoped
@Slf4j
public class OrderPublisher {
private static int num = 0;
@Inject
private OrderRepository repository;
@Inject
private UserTransaction transaction;
@Outgoing("order-events")
@Broadcast
public Multi<Order> orderEventsPublish() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(tick -> {
Order o = new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW);
try {
transaction.begin();
repository.persist(o);
transaction.commit();
} catch (Exception e) {
log.error("Error in transaction", e);
}
log.info("Order published: {}", o);
return o;
});
}
}
Step 3. Handling Knative events with Quarkus Funqy
Ok, in the previous step we have already implemented a part of the code responsible for sending events to the Kafka topic. We also have KafkaSource
that is responsible for dispatching events from the Kafka topic into the application HTTP endpoint. Now, we just need to handle them. It is very simple with Quarkus Funqy. It allows us to create functions according to the serverless Faas approach. But we can also easily bound each function to the HTTP endpoint with the Quarkus Funqy HTTP extension. Let’s include it in our dependencies.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-funqy-http</artifactId>
</dependency>
In order to create a function with Quarkus Funqy, we just need to annotate the particular method with @Funq
. The name of the method is reserve
, so it is automatically bound to the HTTP endpoint POST /reserve
. It takes a single input parameter, which represents incoming order. It is automatically deserialized from JSON.
In the fragment of code visible below, we implement order handling in the customer-saga
application. Once it receives an order, it performs a reservation on the customer account. Then it needs to send a response to the order-saga
. To do that we may use Quarkus reactive messaging support once again. We define the Emitter
object that allows us to send a single event into the topic. We may use inside a method that does not return any output that should be sent to a topic (with @Outgoing
). The Emitter bean should be annotated with @Channel
. It works similar to @Outgoing
. We also need to define an output topic related to the name of the channel.
@Slf4j
public class OrderReserveFunction {
@Inject
private CustomerRepository repository;
@Inject
@Channel("reserve-events")
Emitter<Order> orderEmitter;
@Funq
public void reserve(Order order) {
log.info("Received order: {}", order);
doReserve(order);
}
private void doReserve(Order order) {
Customer customer = repository.findById(order.getCustomerId());
log.info("Customer: {}", customer);
if (order.getStatus() == OrderStatus.NEW) {
customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
order.setStatus(OrderStatus.IN_PROGRESS);
log.info("Order reserved: {}", order);
orderEmitter.send(order);
} else if (order.getStatus() == OrderStatus.CONFIRMED) {
customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
}
repository.persist(customer);
}
}
Here are configuration properties for integration between Kafka and Emitter
. The same configuration properties should be created for both customer-saga
and product-saga
.
kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}
mp.messaging.outgoing.reserve-events.connector = smallrye-kafka
mp.messaging.outgoing.reserve-events.topic = reserve-events
mp.messaging.outgoing.reserve-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer
Finally, let’s take a look at the implementation of the Quarkus function inside the product-saga
application. It also sends a response to the reserve-events
topic using the Emitter
object. It handles incoming orders and performs a reservation for the requested number of products.
@Slf4j
public class OrderReserveFunction {
@Inject
private ProductRepository repository;
@Inject
@Channel("reserve-events")
Emitter<Order> orderEmitter;
@Funq
public void reserve(Order order) {
log.info("Received order: {}", order);
doReserve(order);
}
private void doReserve(Order order) {
Product product = repository.findById(order.getProductId());
log.info("Product: {}", product);
if (order.getStatus() == OrderStatus.NEW) {
product.setReservedItems(product.getReservedItems() + order.getProductsCount());
product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
order.setStatus(OrderStatus.IN_PROGRESS);
orderEmitter.send(order);
} else if (order.getStatus() == OrderStatus.CONFIRMED) {
product.setReservedItems(product.getReservedItems() - order.getProductsCount());
}
repository.persist(product);
}
}
Step 4. Deploy Quarkus application on Knative
Finally, we can deploy all our applications on Knative. To simplify that process we may use Quarkus Kubernetes support. It is able to automatically generate deployment manifests based on the source code and application properties. Quarkus also supports building images with Jib. So first, let’s add the following dependencies to Maven pom.xml
.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image-jib</artifactId>
</dependency>
In the next step, we need to add some configuration settings to the application.properties
file. To enable automatic deployment on Kubernetes the property quarkus.kubernetes.deploy
must be set to true
. Then we should change the target platform into Knative. Thanks to that Quarkus will generate Knative Service
instead of a standard Kubernetes Deployment
. The last property quarkus.container-image.group
is responsible for setting the name of the image owner group. For local development with Knative, we should set the dev.local
value there.
quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = knative
quarkus.container-image.group = dev.local
After setting all the values visible above we just need to execute Maven build to deploy the application.
$ mvn clean package
After running Maven build for all the applications let’s verify a list of Knative Services
.
Once the order-saga
application starts it begins sending orders continuously. It also receives order events sent by customer-saga
and product-saga
. Those events are processed by the Quarkus function. Here are the logs printed by order-saga
.
Final Thoughts
As you see, we can easily implement and deploy Quarkus applications on Knative. Quarkus provides several extensions that simplify integration with the Knative Eventing model and Kafka broker. We can use Quarkus Funqy to implement the serverless FaaS approach or SmallRye Reactive Messaging to integrate with Apache Kafka. You can compare that Quarkus support with Spring Boot in my previous article: Knative Eventing with Kafka and Spring Cloud.
Leave a Reply