Serverless on OpenShift with Knative, Quarkus and Kafka
In this article, you will learn how to build and run Quarkus serverless apps on OpenShift and integrate them through Knative Eventing. We will use Kafka to exchange messages between the apps. However, Knative supports various event sources. Kafka is just one of the available options. You can check out a full list of supported solutions in the Knative Eventing docs.
I have already published several articles about Knative on my blog. If you want a brief start read my article about Knative basics and Spring Boot. There is also a similar article to the current one more focused on Kubernetes. Today, we will focus more on the OpenShift support for the serverless features. Also, Knative is changing dynamically, so there are some significant differences in comparison to the version described in my previous articles.
Although I’m running my example apps on OpenShift, I’ll give you a recipe for how to do the same thing on vanilla Kubernetes. In order to run them on Kubernetes, you need to activate the kubernetes Maven profile instead of openshift during the build.
Source Code
If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.
How it works
During the exercise, we will deploy three Quarkus apps on OpenShift: order-service
, stock-service
and payment-service
. All these apps are exposing a single HTTP POST endpoint for incoming events. They are also using Quarkus REST client for sending events to Kafka through the Knative Eventing. The order-service
app is sending a single event that both should receive stock-service
and payment-service
. Then they are processing the event and send a response back to the order-service
. All those things happen asynchronously by leveraging Kafka and Knative Broker. However, that process is completely transparent for the apps, which just expose the HTTP endpoint and use the HTTP client to call the endpoint exposed by the KafkaSink
object.
The diagram is visible below illustrates the architecture of our solution. There are several Knative objects: KafkaSink
, KafkaSource
, Trigger
, and Broker
. The KafkaSink
object eliminates the need to use Kafka client on the app side. It receives HTTP requests in CloudEvent
format and converts them to the Kafka message sent to the particular topic. The KafkaSource
object receives messages from Kafka and sends them to the Knative Broker
. Finally, we need to define Trigger
. The Trigger
object filters the events inside Broker
and sends them to the target app by calling its HTTP endpoint.
Prerequisites
Before we proceed to the Quarkus apps, we need to install and configure two operators on OpenShift: AMQ Streams (Kafka Strimzi) and OpenShift Serverless (Knative).
As a configuration phase, I define the creation of four components: Kafka
, KnativeServing
, KnativeEventing
and KnativeKafka
. We can easily create them using OpenShift Console. In all cases, we can leave the default settings. I create the Kafka instance in the kafka
namespace. Just in case, here’s the YAML manifest for creating a 3-node Kafka cluster:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: kafka
spec:
kafka:
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: '3.3'
storage:
type: ephemeral
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
version: 3.3.1
replicas: 3
zookeeper:
storage:
type: ephemeral
replicas: 3
The KnativeServing
should be created in the knative-serving
namespace, while KnativeEventing
in the knative-eventing
namespace.
kind: KnativeServing
apiVersion: operator.knative.dev/v1beta1
metadata:
name: knative-serving
namespace: knative-serving
spec: {}
---
kind: KnativeEventing
apiVersion: operator.knative.dev/v1beta1
metadata:
name: knative-eventing
namespace: knative-eventing
spec: {}
Or just “click” the create button in OpenShift Console.
Finally, the last required component – KnativeKafka
. We should at least enable the sink
and source
to install KafkaSink
and KafkaSource
CRDs and controllers.
apiVersion: operator.serverless.openshift.io/v1alpha1
kind: KnativeKafka
metadata:
name: knative-kafka
namespace: knative-eventing
spec:
logging:
level: INFO
sink:
enabled: true
source:
enabled: true
Functions Support in Quarkus
Although we will implement event-driven architecture today, our Quarkus apps are just exposing and calling HTTP endpoints. In order to expose the method as the HTTP endpoint we need to include the Quarkus Funqy HTTP module. On the other hand, to call the HTTP endpoint exposed by another component we can leverage Quarkus declarative REST client. Our app is storing data in the in-memory H2 database and uses Panache ORM.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-funqy-http</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-h2</artifactId>
</dependency>
In order to expose the method as the HTTP endpoint we just need to annotate it with @Funq
. Here’s the function implementation from the payment-service
. It receives two types of orders: reservation and confirmation. For the reservation order reservation (status=NEW
) it reserves funds in the customer account. For the confirmation type, it accepts or rollbacks the transaction depending on the order’s status. By default, the method annotated with the @Funq
annotation is exposed under the same path as its name – in our case the address of the endpoint is POST /reserve
.
public class OrderReserveFunction {
private static final String SOURCE = "payment";
Logger log;
OrderReserveService orderReserveService;
OrderConfirmService orderConfirmService;
public OrderReserveFunction(Logger log,
OrderReserveService orderReserveService,
OrderConfirmService orderConfirmService) {
this.log = log;
this.orderReserveService = orderReserveService;
this.orderConfirmService = orderConfirmService;
}
@Funq
public Customer reserve(Order order) {
log.infof("Received order: %s", order);
if (order.getStatus() == OrderStatus.NEW) {
return orderReserveService.doReserve(order);
} else {
return orderConfirmService.doConfirm(order);
}
}
}
Let’s take a look at the payment reservation implementation. We assume multiple incoming requests to the same concurrently, so we need to lock the entity during the transaction. Once the reservation is performed, we need to send a response back to the order-service
. We are leveraging Quarkus REST client for that.
@ApplicationScoped
public class OrderReserveService {
private static final String SOURCE = "payment";
Logger log;
CustomerRepository repository;
OrderSender sender;
public OrderReserveService(Logger log,
CustomerRepository repository,
@RestClient OrderSender sender) {
this.log = log;
this.repository = repository;
this.sender = sender;
}
@Transactional
public Customer doReserve(Order order) {
Customer customer = repository.findById(order.getCustomerId(), LockModeType.PESSIMISTIC_WRITE);
if (customer == null)
throw new NotFoundException();
log.infof("Customer: %s", customer);
if (order.getAmount() < customer.getAmountAvailable()) {
order.setStatus(OrderStatus.IN_PROGRESS);
customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
} else {
order.setStatus(OrderStatus.REJECTED);
}
order.setSource(SOURCE);
repository.persist(customer);
log.infof("Order reserved: %s", order);
sender.send(order);
return customer;
}
}
Here’s the implementation of our REST client. It sends a message to the endpoint exposed by the KafkaSink
object. The path of the endpoint corresponds to the name of the KafkaSink
object. We also need to set HTTP headers to meet the CloudEvent
format. Therefore we are registering the custom ClientHeadersFactory
implementation.
@ApplicationScoped
@RegisterRestClient
@RegisterClientHeaders(CloudEventHeadersFactory.class)
public interface OrderSender {
@POST
@Path("/payment-sink")
void send(Order order);
}
Our custom ClientHeadersFactory
implementation sets some Ce-* (CloudEvent
) headers. The most important header is Ce-Type
and Ce-Source
since we will do filtering based on that values then.
@ApplicationScoped
public class CloudEventHeadersFactory implements ClientHeadersFactory {
AtomicLong id = new AtomicLong();
@Override
public MultivaluedMap<String, String> update(MultivaluedMap<String, String> incoming,
MultivaluedMap<String, String> outgoing) {
MultivaluedMap<String, String> result = new MultivaluedHashMap<>();
result.add("Ce-Id", String.valueOf(id.incrementAndGet()));
result.add("Ce-Specversion", "1.0");
result.add("Ce-Type", "reserve-event");
result.add("Ce-Source", "stock");
return result;
}
}
Finally, let’s take a look at the payment confirmation service:
@ApplicationScoped
public class OrderConfirmService {
private static final String SOURCE = "payment";
Logger log;
CustomerRepository repository;
public OrderConfirmService(Logger log,
CustomerRepository repository) {
this.log = log;
this.repository = repository;
}
@Transactional
public Customer doConfirm(Order order) {
Customer customer = repository.findById(order.getCustomerId());
if (customer == null)
throw new NotFoundException();
log.infof("Customer: %s", customer);
if (order.getStatus() == OrderStatus.CONFIRMED) {
customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
repository.persist(customer);
} else if (order.getStatus() == OrderStatus.ROLLBACK && !order.getRejectedService().equals(SOURCE)) {
customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
customer.setAmountAvailable(customer.getAmountAvailable() + order.getAmount());
repository.persist(customer);
}
return customer;
}
}
Also, let’s take a look at the implementation of the function in the order-service
.
public class OrderConfirmFunction {
private final Logger log;
private final OrderService orderService;
public OrderConfirmFunction(Logger log, OrderService orderService) {
this.log = log;
this.orderService = orderService;
}
@Funq
public void confirm(Order order) {
log.infof("Accepted order: %s", order);
orderService.doConfirm(order);
}
}
Here’s the function implementation for the stock-service
:
public class OrderReserveFunction {
private static final String SOURCE = "stock";
private final OrderReserveService orderReserveService;
private final OrderConfirmService orderConfirmService;
private final Logger log;
public OrderReserveFunction(OrderReserveService orderReserveService,
OrderConfirmService orderConfirmService,
Logger log) {
this.orderReserveService = orderReserveService;
this.orderConfirmService = orderConfirmService;
this.log = log;
}
@Funq
public void reserve(Order order) {
log.infof("Received order: %s", order);
if (order.getStatus() == OrderStatus.NEW) {
orderReserveService.doReserve(order);
} else {
orderConfirmService.doConfirm(order);
}
}
}
Configure Knative Eventing
After we finished the implementation of app logic we can proceed to the configuration of OpenShift Serverless and Knative components. If you are using my GitHub repository you don’t have to manually apply any YAML manifests. All the required configuration is applied during the Maven build. It is possible thanks to the Quarkus Kubernetes extension and its support for Knative. We just need to place all the required YAML manifests inside the src/main/kubernetes/knative.yml
and the magic happens by itself.
However, in order to understand what happens let’s discuss step-by-step Knative configuration. In the first step, we need to create the KafkaSink
objects. KafkaSink
exposes the HTTP endpoint and gets CloudEvent
on input. Then it sends that event to the particular topic in the Kafka cluster. Here’s the KafkaSink
definition for the payment-service
:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: payment-sink
namespace: demo-eventing
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topic: reserve-events
numPartitions: 1
Both payment-service
and stock-service
send messages on the same reserve-events
topic. Therefore, we can also create a single KafkaSink
per those two services (I created two sinks, each of them dedicated to the single app). On the other hand, the order-service app sends messages to the order-events
topic, so we have to create a separate KafkaSink
:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: order-sink
namespace: demo-eventing
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topic: order-events
numPartitions: 1
After that, let’s print the list of sinks in our cluster:
Now, this URL address should be set in the application properties for the REST client configuration. Here’s the fragment of the Quarkus application.properties
:
%openshift.quarkus.rest-client."pl.piomin.samples.quarkus.serverless.order.client.OrderSender".url = http://kafka-sink-ingress.knative-eventing.svc.cluster.local/demo-eventing
With KafkaSink
we are able to send messages to the Kafka cluster. In order to receive them on the target apps side we need to create other objects. In the first step, we will create Knative Broker
and KafkaSource
object. The broker may be easily created using kn
CLI:
$ kn broker create default
The KafkaSource
object connects to the Kafka cluster and receives messages from the defined list of topics. In our case, these are order-events
and reserve-events
. The output of the KafkaSource
object is the already-created default
broker. It means that all the messages exchanged between our three apps are delivered to the Knative Broker
.
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source-to-broker
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- order-events
- reserve-events
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
In the final step, we need to configure the mechanism responsible for getting messages from Knative Broker
and sending them to the target services. In order to do that, we have to create Trigger
objects. A trigger can filter messages by CloudEvent
attributes. Cloud event attributes are related to the Ce-*
HTTP headers from the request. For example, the payment-service app receives only messages sent by the order-service
and containing the order-event
type.
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: payment-trigger
spec:
broker: default
filter:
attributes:
source: order
type: order-event
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: payment-service
uri: /reserve
The stock-trigger
object is very similar. It connects to the default Broker
and gets only messages with the source=order
and type=order-event
. Finally, it calls the POST /reserve
endpoint exposed by the stock-service
.
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: stock-trigger
spec:
broker: default
filter:
attributes:
source: order
type: order-event
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: stock-service
uri: /reserve
On the other hand, the order-service app should receive events from both stock-service
and payment-service
. Therefore, we are filtering messages just by the type attribute. The target endpoint of the order-service
is POST /confirm
.
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: order-trigger
spec:
broker: default
filter:
attributes:
type: reserve-event
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: order-service
uri: /confirm
Run Quarkus Apps on Knative
We can leverage the Quarkus Kubernetes/OpenShift extension to run the app as a Knative service. In order to do that we need to include the quarkus-openshift
dependency in Maven pom.xml
. We would like to use that module during the build only if we need to deploy the app on the cluster. Therefore, we will create a custom Maven profile openshift
. Besides including the quarkus-openshift
dependency it also enables deployment by setting the quarkus.kubernetes.deploy
property to true
and activates the custom Quarkus profile openshift
.
<profiles>
<profile>
<id>openshift</id>
<activation>
<property>
<name>openshift</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-openshift</artifactId>
</dependency>
</dependencies>
<properties>
<quarkus.kubernetes.deploy>true</quarkus.kubernetes.deploy>
<quarkus.profile>openshift</quarkus.profile>
</properties>
</profile>
</profiles>
Once we include the quarkus-openshift
module, we may use Quarkus configuration properties to customize the deployment process. Firstly, we need to set the quarkus.kubernetes.deployment-target
property to knative
. Thanks to that Quarkus will automatically generate the YAML manifest with Knative Service
instead of a standard Kubernetes Deployment
. We can also override default autoscaling settings with the quarkus.knative.revision-auto-scaling.*
properties. The whole build process is running on the cluster with S2I (source-2-image), so we can use the internal OpenShift registry (the quarkus.container-image.registry
property). Here’s the fragment of the application.properties
file for the order-service
.
quarkus.kubernetes-client.trust-certs = true
quarkus.kubernetes.deployment-target = knative
quarkus.knative.env.vars.tick-timeout = 10000
quarkus.knative.revision-auto-scaling.metric = rps
quarkus.knative.revision-auto-scaling.target = 50
%openshift.quarkus.container-image.group = demo-eventing
%openshift.quarkus.container-image.registry = image-registry.openshift-image-registry.svc:5000
%openshift.app.orders.timeout = ${TICK_TIMEOUT}
Finally, we just need to activate the openshift
profile during the build and all the apps will be deployed to the target OpenShift cluster. You can deploy a single app or all the apps by running the following command in the repository root directory:
$ mvn clean package -Popenshift
Once we deploy our apps we display a list of Knative services.
We can also verify if all the triggers have been configured properly.
Also, let’s take a look at the “Topology” view on OpenShift which illustrates our serverless architecture.
Testing Services
It is also worth creating some automated tests to verify the basic functionality before deployment. Since we have simple HTTP apps and an in-memory H2 database we can create standard tests. The only thing we need to do is to mock the HTTP client.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-mockito</artifactId>
<scope>test</scope>
</dependency>
Here’s the JUnit test for the payment-service
. We are verifying both the reservation and confirmation processes for the same order.
@QuarkusTest
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class OrderReserveFunctionTests {
private static int amount;
private CustomerRepository repository;
@InjectMock
@RestClient
OrderSender sender;
public OrderReserveFunctionTests(CustomerRepository repository) {
this.repository = repository;
}
@Test
@org.junit.jupiter.api.Order(1)
void reserve() {
given().contentType("application/json").body(createTestOrder(OrderStatus.NEW)).post("/reserve")
.then()
.statusCode(204);
Customer c = repository.findById(1L);
amount = c.getAmountAvailable();
assertEquals(100, c.getAmountReserved());
}
@Test
@org.junit.jupiter.api.Order(2)
void confirm() {
given().contentType("application/json").body(createTestOrder(OrderStatus.CONFIRMED)).post("/reserve")
.then()
.statusCode(204);
Customer c = repository.findById(1L);
assertEquals(0, c.getAmountReserved());
assertEquals(amount, c.getAmountAvailable());
}
private Order createTestOrder(OrderStatus status) {
Order o = new Order();
o.setId(1L);
o.setSource("test");
o.setStatus(status);
o.setAmount(100);
o.setCustomerId(1L);
return o;
}
}
Also, let’s take a look at the logs of apps running on OpenShift. As you see, the order-service
receives events from both stock-service
and payment-service
. After that, it confirms the order and sends a confirmation message to both services.
Here are the logs from the payment-service
. As you see, it receives the CloudEvent
generated by the order-service
(the Ce-Source
header equals to order
).
Final Thoughts
With OpenShift Serverless and Knative Eventing, you can easily build event-driven architecture for simple HTTP-based apps. It is completely transparent for the app, which medium is used to store events and how they are routed. The only thing it needs to do is to prepare a request according to the CloudEvent
specification. OpenShift Serverless brings several features to simplify development. We can also leverage Quarkus Kubernetes Extension to easily build and deploy our apps on OpenShift as Knative services.
2 COMMENTS