Knative Eventing with Kafka and Spring Cloud
In this article, you will learn how to run eventing applications on Knative using Kafka and Spring Cloud. I’ll show you what is Knative Eventing, and how to integrate it with the Kafka broker. We will build our applications on top of Spring Cloud Function and Spring Cloud Stream. All these solutions seem to be a perfect match. Why? Let me invite you to read the article.
However, before we proceed you need to have a piece of knowledge about Knative basic concepts. Therefore, I suggest you read more about it. You can start with those two articles: Spring Boot on Knative and Microservices on Knative with GraalVM and Spring Boot. Of course, you can as well refer to the Knative documentation.
Source Code
If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.
Today we will base on the simple architecture that complies with an eventual consistency pattern. It is also known as a SAGA pattern. What exactly is that? The sample system consists of three services. The order-service
creates a new order that is related to the customers and products. That order is sent to the Kafka topic. Then, our two other applications customer-service
and product-service
receive the order event. After that, they perform a reservation. The customer-service
reserves an order’s amount on the customer’s account. Meanwhile the product-service
reserves a number of products specified in the order. Both these services send a response to the order-service
through the Kafka topic. If the order-service
receives positive reservations from both services it confirms the order. Then, it sends an event with that information. Both customer-service
and product-service
receive the event and confirm reservations. You can verify it in the picture below.
Prerequisites
Before we start, we first need to install Knative on the Kubernetes cluster. I’m using a local instance of Kubernetes. But you may as well use any remote like GKE. However, the latest version of Knative requires a Kubernetes cluster v1.17 or later. Of course, we need to install both Serving and Eventing components. You may find the detailed installation instructions here.
That’s not all. We also need to install Kafka Eventing Broker. Here’s the link to the releases site. It includes several deployments and CRDs. You should pay special attention to the KafkaSource
and KafkaBinding
CRDs, since we will use them later.
Finally, we need to install Kafka cluster on Kubernetes. The recommended way to do that is with the Strimzi operator. Strimzi provides container images and operators for running Kafka on Kubernetes. It also comes with a set of CRDs for managing the Kafka cluster. Once you install it you may proceed to the next steps. I installed it in the kafka
namespace. Here’s the list of running pods.
Step 1: Create and configure Knative Kafka Broker
In the first step, we are going to create a Kafka cluster using Strimzi CRD. To simplify, we won’t use any more advanced configuration settings. For example, I used ephemeral storage, which is not recommended in production. I set three instances of Zookeeper. I heard that Kafka is finally planning to resign from Zookeeper, but the current version still bases on it.
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 1
listeners:
plain: {}
tls: {}
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
log.message.format.version: "2.4"
storage:
type: ephemeral
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
The Knative broker allows to route events to different event sinks or consumers. We may use different broker providers. When an event is sent to the broker, all request metadata other than the CloudEvent
data and context attributes are stripped away. The event delivery mechanism hides details of event routing from the event producer and consumer. The default broker class is MTChannelBasedBroker
. We will change it into Kafka
.
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: default
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
In this article, we won’t directly use Kafka broker. Instead, we will use the
KafkaSource
object that takes events from a particular topic and sends them to the subscriber. If you want to useBroker
you need to define KnativeTrigger
that refers to it.
The broker refers to the ConfigMap
kafka-broker-config
. The most important thing there is to set the address of the Kafka cluster. If you didn’t change anything in the default Kafka installation files it is ${KAFKA_CLUSTER_NAME}-kafka-bootstrap
and port 9092
.
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
default.topic.partitions: "10"
default.topic.replication.factor: "1"
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
Step 2: Create an application with Spring Cloud Stream
Let’s start with dependencies. Each of our applications uses an in-memory H2 database. They integrate with the database using the Spring Data JPA repository pattern. However, the most important thing is that they all base on Spring Cloud Stream to interact with Kafka topics. Spring Cloud Stream requires adding a concrete binder implementation to the classpath. That’s why we add the spring-cloud-starter-stream-kafka
starter. For some time the Spring Cloud Stream programming model is built on top of Spring Cloud Function. Fortunately, we may easily export functions as an HTTP endpoint. This feature will be useful for us later. Currently, let’s just take a look at a list of included dependencies.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-function-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
Here’s the model class for the order-service
. Once the order is created and saved in the database, the order-service
sends it to the output Kafka topic.
@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order {
@Id
private Integer id;
private Integer customerId;
private Integer productId;
private int amount;
private int productCount;
@Enumerated
private OrderStatus status = OrderStatus.NEW;
}
We have three functions inside the order-service application main class. Two of them send events to the output destination continuously. On the other hand, the third of them confirm()
wait for incoming events. We will discuss it later. The orderEventSupplier
function represents the first step in our scenario. It creates a new order with test data, saves it in the database before sending.
@SpringBootApplication
@Slf4j
public class OrderSagaApplication {
public static void main(String[] args) {
SpringApplication.run(OrderSagaApplication.class, args);
}
private static int num = 0;
private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
@Bean
public Supplier<Order> orderEventSupplier() {
return () -> repository.save(new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW));
}
@Bean
public Supplier<Order> orderConfirmSupplier() {
return () -> queue.poll();
}
@Bean
public Consumer<Message<Order>> confirm() {
return this::doConfirm;
}
@Autowired
OrderRepository repository;
private void doConfirm(Message<Order> msg) {
Order o = msg.getPayload();
log.info("Order received: {}", o);
Order order = repository.findById(o.getId()).orElseThrow();
if (order.getStatus() == OrderStatus.NEW) {
order.setStatus(OrderStatus.IN_PROGRESS);
} else if (order.getStatus() == OrderStatus.IN_PROGRESS) {
order.setStatus(OrderStatus.CONFIRMED);
log.info("Order confirmed : {}", order);
queue.offer(order);
}
repository.save(order);
}
}
The name of the output Kafka topic is order-events
. We set it for both Supplier
functions using the Spring Cloud Stream bindings pattern. On the other hand, the Consumer
function will not receive events directly from the Kafka topic. Why? Because it is a part of Knative Eventing process and I will explain it later in the step. For now, it is important to specify that only suppliers bind to the external destination using the spring.cloud.function.definition
property.
spring.application.name: order-saga
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: order-events
spring.cloud.stream.bindings.orderConfirmSupplier-out-0.destination: order-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier;orderConfirmSupplier
Finally, we need to create the KafkaBinding
that will inject Kafka bootstrap information into the application container (through the Knative Service
). Then, the application can access it as the KAFKA_BOOTSTRAP_SERVERS
environment variable.
apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
name: kafka-binding-order-saga
spec:
subject:
apiVersion: serving.knative.dev/v1
kind: Service
name: order-saga
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
Step 3: Create Kafka sources and Spring Cloud Function endpoints
Ok, we have already created a function responsible for generating and sending orders to the Kafka topic inside the order-service
. So, now our goal is to receive and handle it on the customer-service
and product-service
sides. Our applications won’t directly listen for incoming events on the Kafka topic. To clarify, the basic Knative Eventing assumption is that the application don’t care how the events are published. It will just receive the events as an HTTP POST. And here comes KafkaSource
object. It takes a list of input topics and a destination sink as parameters. In our case, it gets messages from order-events
and send it as HTTP POST to the endpoint /customers/reserve
of the customer-saga
Knative Service
.
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- order-events
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: customer-saga
uri: /customers/reserve
Here’s an implementation of the customer-saga
application. Thanks to Spring Cloud Function Web it automatically exports the reserve
function as the HTTP endpoint with the path /reserve
. Once, the consumer receives the event it performs the rest of business logic. If the input order has a NEW
status the customer-saga
creates reservation for a particular amount on the customer account. Then it sends event response to the order-saga
. In other words, it first puts event into BlockingQueue
. We also use a Supplier
function for sending events to the Kafka topic. This time supplier function takes Order
objects from BlockingQueue
. Finally, if our application receives confirmation order from order-saga
it commits the whole transaction by removing reserved amount.
@SpringBootApplication
@Slf4j
public class CustomerSagaApplication {
public static void main(String[] args) {
SpringApplication.run(CustomerSagaApplication.class, args);
}
private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
@Autowired
private CustomerRepository repository;
@Bean
public Supplier<Order> orderEventSupplier() {
return () -> queue.poll();
}
@Bean
public Consumer<Message<Order>> reserve() {
return this::doReserve;
}
private void doReserve(Message<Order> msg) {
Order order = msg.getPayload();
log.info("Body: {}", order);
Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
log.info("Customer: {}", customer);
if (order.getStatus() == OrderStatus.NEW) {
customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
order.setStatus(OrderStatus.IN_PROGRESS);
queue.offer(order);
} else if (order.getStatus() == OrderStatus.CONFIRMED) {
customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
}
repository.save(customer);
}
}
We can also set the base context path for HTTP endpoints using the spring.cloud.function.web.path
property. So, the final path of our target endpoint is /customers/reserver
. It is the same as the address defined in the KafkaSource
definition.
spring.cloud.function.web.path: /customers
Here’s a configuration for the customer-saga
inside the application.yml
file.
spring.application.name: customer-saga
spring.cloud.function.web.path: /customers
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: reserve-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier
The implementation of the business logic inside product-saga
is pretty similar to the customer-saga
. There is a single Consumer
function that receives orders, and a single Supplier
responsible for sending a response to the order-saga
.
@SpringBootApplication
@Slf4j
public class ProductSagaApplication {
public static void main(String[] args) {
SpringApplication.run(ProductSagaApplication.class, args);
}
@Autowired
private ProductRepository repository;
private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
@Bean
public Supplier<Order> orderEventSupplier() {
return () -> queue.poll();
}
@Bean
public Consumer<Message<Order>> reserve() {
return this::doReserve;
}
private void doReserve(Message<Order> msg) {
Order order = msg.getPayload();
log.info("Body: {}", order);
Product product = repository.findById(order.getProductId()).orElseThrow();
log.info("Product: {}", product);
if (order.getStatus() == OrderStatus.NEW) {
product.setReservedItems(product.getReservedItems() + order.getProductsCount());
product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
order.setStatus(OrderStatus.IN_PROGRESS);
queue.offer(order);
} else if (order.getStatus() == OrderStatus.CONFIRMED) {
product.setReservedItems(product.getReservedItems() - order.getProductsCount());
}
repository.save(product);
}
}
Step 4: Run applications on Knative Eventing and Kafka
Here’s a typical definition of the Knative Service
for our applications. I’m using the dev.local
option, but if you run a remote cluster you may replace it with your Docker username or any other repository account you have.
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: order-saga
spec:
template:
spec:
containers:
- image: dev.local/order-saga
I use Skaffold together with Jib Maven Plugin for building and deploying applications on Knative. My target namespace is serverless
. With the tail
option you may observe logs after deployment. Of course, you may as well use the skaffold dev
command.
$ skaffold run --tail -n serverless
After running all our applications on Knative eventing with Kafka we may verify a list of services using kn
CLI.
Then, we may verify that all KafkaBindings
have been created. To do let’s just execute the following kubectl
command.
The next important component is KafkaSource
. We have already created three sources, a single one per application.
After starting, the order-saga
application continuously generates and sends a new order each second. Both product-saga
and customer-saga
receive events and send responses. Thanks to that, the traffic is exchanged without any interruption. Except for the application pods we have three pods with Kafka sources.
Let’s just take a look at the application logs. Here are the logs from the order-saga
. As you see it receives the order reservations from both customer-saga
and product-saga
. After that, it confirms the order and sends a response back to the order-events
topic on Kafka. Basically, that’s what we wanted to achieve.
Final Thoughts
I hope you enjoyed this article. Knative is still a relatively new solution. I think we may expect some new and interesting features in the near future. With Knative Eventing you may use some other event sources than Kafka. Personally, I’m waiting for integration with RabbitMQ, which is under development now. For a full list of available solutions, you may refer to that site.
It is my third article about Knative and Spring Boot. You may expect more articles about Knative soon! Next time, I’m going to show you an example with another popular Java framework – Quarkus.
4 COMMENTS