Serverless on OpenShift with Knative, Quarkus and Kafka

Serverless on OpenShift with Knative, Quarkus and Kafka

In this article, you will learn how to build and run Quarkus serverless apps on OpenShift and integrate them through Knative Eventing. We will use Kafka to exchange messages between the apps. However, Knative supports various event sources. Kafka is just one of the available options. You can check out a full list of supported solutions in the Knative Eventing docs.

I have already published several articles about Knative on my blog. If you want a brief start read my article about Knative basics and Spring Boot. There is also a similar article to the current one more focused on Kubernetes. Today, we will focus more on the OpenShift support for the serverless features. Also, Knative is changing dynamically, so there are some significant differences in comparison to the version described in my previous articles.

Although I’m running my example apps on OpenShift, I’ll give you a recipe for how to do the same thing on vanilla Kubernetes. In order to run them on Kubernetes, you need to activate the kubernetes Maven profile instead of openshift during the build.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

How it works

During the exercise, we will deploy three Quarkus apps on OpenShift: order-service, stock-service and payment-service. All these apps are exposing a single HTTP POST endpoint for incoming events. They are also using Quarkus REST client for sending events to Kafka through the Knative Eventing. The order-service app is sending a single event that both should receive stock-service and payment-service. Then they are processing the event and send a response back to the order-service. All those things happen asynchronously by leveraging Kafka and Knative Broker. However, that process is completely transparent for the apps, which just expose the HTTP endpoint and use the HTTP client to call the endpoint exposed by the KafkaSink object.

The diagram is visible below illustrates the architecture of our solution. There are several Knative objects: KafkaSink, KafkaSource, Trigger, and Broker. The KafkaSink object eliminates the need to use Kafka client on the app side. It receives HTTP requests in CloudEvent format and converts them to the Kafka message sent to the particular topic. The KafkaSource object receives messages from Kafka and sends them to the Knative Broker. Finally, we need to define Trigger. The Trigger object filters the events inside Broker and sends them to the target app by calling its HTTP endpoint.

openshift-serverless-arch

Prerequisites

Before we proceed to the Quarkus apps, we need to install and configure two operators on OpenShift: AMQ Streams (Kafka Strimzi) and OpenShift Serverless (Knative).

openshift-serverless-operators

As a configuration phase, I define the creation of four components: Kafka, KnativeServing, KnativeEventing and KnativeKafka. We can easily create them using OpenShift Console. In all cases, we can leave the default settings. I create the Kafka instance in the kafka namespace. Just in case, here’s the YAML manifest for creating a 3-node Kafka cluster:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  kafka:
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: '3.3'
    storage:
      type: ephemeral
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    version: 3.3.1
    replicas: 3
  zookeeper:
    storage:
      type: ephemeral
    replicas: 3

The KnativeServing should be created in the knative-serving namespace, while KnativeEventing in the knative-eventing namespace.

kind: KnativeServing
apiVersion: operator.knative.dev/v1beta1
metadata:
  name: knative-serving
  namespace: knative-serving
spec: {}
---
kind: KnativeEventing
apiVersion: operator.knative.dev/v1beta1
metadata:
  name: knative-eventing
  namespace: knative-eventing
spec: {}

Or just “click” the create button in OpenShift Console.

Finally, the last required component – KnativeKafka. We should at least enable the sink and source to install KafkaSink and KafkaSource CRDs and controllers.

apiVersion: operator.serverless.openshift.io/v1alpha1
kind: KnativeKafka
metadata:
  name: knative-kafka
  namespace: knative-eventing
spec:
  logging:
    level: INFO
  sink:
    enabled: true
  source:
    enabled: true

Functions Support in Quarkus

Although we will implement event-driven architecture today, our Quarkus apps are just exposing and calling HTTP endpoints. In order to expose the method as the HTTP endpoint we need to include the Quarkus Funqy HTTP module. On the other hand, to call the HTTP endpoint exposed by another component we can leverage Quarkus declarative REST client. Our app is storing data in the in-memory H2 database and uses Panache ORM.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-funqy-http</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-rest-client-jackson</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-jdbc-h2</artifactId>
</dependency>

In order to expose the method as the HTTP endpoint we just need to annotate it with @Funq. Here’s the function implementation from the payment-service. It receives two types of orders: reservation and confirmation. For the reservation order reservation (status=NEW) it reserves funds in the customer account. For the confirmation type, it accepts or rollbacks the transaction depending on the order’s status. By default, the method annotated with the @Funq annotation is exposed under the same path as its name – in our case the address of the endpoint is POST /reserve.

public class OrderReserveFunction {

   private static final String SOURCE = "payment";
    
   Logger log;
   OrderReserveService orderReserveService;
   OrderConfirmService orderConfirmService;

   public OrderReserveFunction(Logger log,
                               OrderReserveService orderReserveService,
                               OrderConfirmService orderConfirmService) {
      this.log = log;
      this.orderReserveService = orderReserveService;
      this.orderConfirmService = orderConfirmService;
   }

   @Funq
   public Customer reserve(Order order) {
      log.infof("Received order: %s", order);
      if (order.getStatus() == OrderStatus.NEW) {
         return orderReserveService.doReserve(order);
      } else {
         return orderConfirmService.doConfirm(order);
      }
   }

}

Let’s take a look at the payment reservation implementation. We assume multiple incoming requests to the same concurrently, so we need to lock the entity during the transaction. Once the reservation is performed, we need to send a response back to the order-service. We are leveraging Quarkus REST client for that.

@ApplicationScoped
public class OrderReserveService {

    private static final String SOURCE = "payment";

    Logger log;
    CustomerRepository repository;
    OrderSender sender;

    public OrderReserveService(Logger log,
                               CustomerRepository repository,
                               @RestClient OrderSender sender) {
        this.log = log;
        this.repository = repository;
        this.sender = sender;
    }

    @Transactional
    public Customer doReserve(Order order) {
        Customer customer = repository.findById(order.getCustomerId(), LockModeType.PESSIMISTIC_WRITE);
        if (customer == null)
            throw new NotFoundException();
        log.infof("Customer: %s", customer);
        if (order.getAmount() < customer.getAmountAvailable()) {
            order.setStatus(OrderStatus.IN_PROGRESS);
            customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
            customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
        } else {
            order.setStatus(OrderStatus.REJECTED);
        }
        order.setSource(SOURCE);
        repository.persist(customer);
        log.infof("Order reserved: %s", order);
        sender.send(order);
        return customer;
    }
}

Here’s the implementation of our REST client. It sends a message to the endpoint exposed by the KafkaSink object. The path of the endpoint corresponds to the name of the KafkaSink object. We also need to set HTTP headers to meet the CloudEvent format. Therefore we are registering the custom ClientHeadersFactory implementation.

@ApplicationScoped
@RegisterRestClient
@RegisterClientHeaders(CloudEventHeadersFactory.class)
public interface OrderSender {

    @POST
    @Path("/payment-sink")
    void send(Order order);

}

Our custom ClientHeadersFactory implementation sets some Ce-* (CloudEvent) headers. The most important header is Ce-Type and Ce-Source since we will do filtering based on that values then.

@ApplicationScoped
public class CloudEventHeadersFactory implements ClientHeadersFactory {

    AtomicLong id = new AtomicLong();

    @Override
    public MultivaluedMap<String, String> update(MultivaluedMap<String, String> incoming,
                                                 MultivaluedMap<String, String> outgoing) {
        MultivaluedMap<String, String> result = new MultivaluedHashMap<>();
        result.add("Ce-Id", String.valueOf(id.incrementAndGet()));
        result.add("Ce-Specversion", "1.0");
        result.add("Ce-Type", "reserve-event");
        result.add("Ce-Source", "stock");
        return result;
    }

}

Finally, let’s take a look at the payment confirmation service:

@ApplicationScoped
public class OrderConfirmService {

    private static final String SOURCE = "payment";

    Logger log;
    CustomerRepository repository;

    public OrderConfirmService(Logger log, 
                               CustomerRepository repository) {
        this.log = log;
        this.repository = repository;
    }

    @Transactional
    public Customer doConfirm(Order order) {
        Customer customer = repository.findById(order.getCustomerId());
        if (customer == null)
            throw new NotFoundException();
        log.infof("Customer: %s", customer);
        if (order.getStatus() == OrderStatus.CONFIRMED) {
            customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
            repository.persist(customer);
        } else if (order.getStatus() == OrderStatus.ROLLBACK && !order.getRejectedService().equals(SOURCE)) {
            customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
            customer.setAmountAvailable(customer.getAmountAvailable() + order.getAmount());
            repository.persist(customer);
        }
        return customer;
    }
    
}

Also, let’s take a look at the implementation of the function in the order-service.

public class OrderConfirmFunction {

    private final Logger log;
    private final OrderService orderService;

    public OrderConfirmFunction(Logger log, OrderService orderService) {
        this.log = log;
        this.orderService = orderService;
    }

    @Funq
    public void confirm(Order order) {
        log.infof("Accepted order: %s", order);
        orderService.doConfirm(order);
    }

}

Here’s the function implementation for the stock-service:

public class OrderReserveFunction {

    private static final String SOURCE = "stock";

    private final OrderReserveService orderReserveService;
    private final OrderConfirmService orderConfirmService;
    private final Logger log;

    public OrderReserveFunction(OrderReserveService orderReserveService,
                                OrderConfirmService orderConfirmService,
                                Logger log) {
        this.orderReserveService = orderReserveService;
        this.orderConfirmService = orderConfirmService;
        this.log = log;
    }

    @Funq
    public void reserve(Order order) {
        log.infof("Received order: %s", order);
        if (order.getStatus() == OrderStatus.NEW) {
            orderReserveService.doReserve(order);
        } else {
            orderConfirmService.doConfirm(order);
        }
    }

}

Configure Knative Eventing

After we finished the implementation of app logic we can proceed to the configuration of OpenShift Serverless and Knative components. If you are using my GitHub repository you don’t have to manually apply any YAML manifests. All the required configuration is applied during the Maven build. It is possible thanks to the Quarkus Kubernetes extension and its support for Knative. We just need to place all the required YAML manifests inside the src/main/kubernetes/knative.yml and the magic happens by itself.

However, in order to understand what happens let’s discuss step-by-step Knative configuration. In the first step, we need to create the KafkaSink objects. KafkaSink exposes the HTTP endpoint and gets CloudEvent on input. Then it sends that event to the particular topic in the Kafka cluster. Here’s the KafkaSink definition for the payment-service:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: payment-sink
  namespace: demo-eventing
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topic: reserve-events
  numPartitions: 1

Both payment-service and stock-service send messages on the same reserve-events topic. Therefore, we can also create a single KafkaSink per those two services (I created two sinks, each of them dedicated to the single app). On the other hand, the order-service app sends messages to the order-events topic, so we have to create a separate KafkaSink:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: order-sink
  namespace: demo-eventing
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topic: order-events
  numPartitions: 1

After that, let’s print the list of sinks in our cluster:

Now, this URL address should be set in the application properties for the REST client configuration. Here’s the fragment of the Quarkus application.properties:

%openshift.quarkus.rest-client."pl.piomin.samples.quarkus.serverless.order.client.OrderSender".url = http://kafka-sink-ingress.knative-eventing.svc.cluster.local/demo-eventing

With KafkaSink we are able to send messages to the Kafka cluster. In order to receive them on the target apps side we need to create other objects. In the first step, we will create Knative Broker and KafkaSource object. The broker may be easily created using kn CLI:

$ kn broker create default

The KafkaSource object connects to the Kafka cluster and receives messages from the defined list of topics. In our case, these are order-events and reserve-events. The output of the KafkaSource object is the already-created default broker. It means that all the messages exchanged between our three apps are delivered to the Knative Broker.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-to-broker
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
    - reserve-events
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default

In the final step, we need to configure the mechanism responsible for getting messages from Knative Broker and sending them to the target services. In order to do that, we have to create Trigger objects. A trigger can filter messages by CloudEvent attributes. Cloud event attributes are related to the Ce-* HTTP headers from the request. For example, the payment-service app receives only messages sent by the order-service and containing the order-event type.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: payment-trigger
spec:
  broker: default
  filter:
    attributes:
      source: order
      type: order-event
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: payment-service
    uri: /reserve

The stock-trigger object is very similar. It connects to the default Broker and gets only messages with the source=order and type=order-event. Finally, it calls the POST /reserve endpoint exposed by the stock-service.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: stock-trigger
spec:
  broker: default
  filter:
    attributes:
      source: order
      type: order-event
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: stock-service
    uri: /reserve

On the other hand, the order-service app should receive events from both stock-service and payment-service. Therefore, we are filtering messages just by the type attribute. The target endpoint of the order-service is POST /confirm.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: order-trigger
spec:
  broker: default
  filter:
    attributes:
      type: reserve-event
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-service
    uri: /confirm

Run Quarkus Apps on Knative

We can leverage the Quarkus Kubernetes/OpenShift extension to run the app as a Knative service. In order to do that we need to include the quarkus-openshift dependency in Maven pom.xml. We would like to use that module during the build only if we need to deploy the app on the cluster. Therefore, we will create a custom Maven profile openshift. Besides including the quarkus-openshift dependency it also enables deployment by setting the quarkus.kubernetes.deploy property to true and activates the custom Quarkus profile openshift.

<profiles>
  <profile>
    <id>openshift</id>
    <activation>
      <property>
        <name>openshift</name>
      </property>
    </activation>
    <dependencies>
      <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-openshift</artifactId>
      </dependency>
    </dependencies>
    <properties>
      <quarkus.kubernetes.deploy>true</quarkus.kubernetes.deploy>
      <quarkus.profile>openshift</quarkus.profile>
    </properties>
  </profile>
</profiles>

Once we include the quarkus-openshift module, we may use Quarkus configuration properties to customize the deployment process. Firstly, we need to set the quarkus.kubernetes.deployment-target property to knative. Thanks to that Quarkus will automatically generate the YAML manifest with Knative Service instead of a standard Kubernetes Deployment. We can also override default autoscaling settings with the quarkus.knative.revision-auto-scaling.* properties. The whole build process is running on the cluster with S2I (source-2-image), so we can use the internal OpenShift registry (the quarkus.container-image.registry property). Here’s the fragment of the application.properties file for the order-service.

quarkus.kubernetes-client.trust-certs = true
quarkus.kubernetes.deployment-target = knative
quarkus.knative.env.vars.tick-timeout = 10000
quarkus.knative.revision-auto-scaling.metric = rps
quarkus.knative.revision-auto-scaling.target = 50

%openshift.quarkus.container-image.group = demo-eventing
%openshift.quarkus.container-image.registry = image-registry.openshift-image-registry.svc:5000
%openshift.app.orders.timeout = ${TICK_TIMEOUT}

Finally, we just need to activate the openshift profile during the build and all the apps will be deployed to the target OpenShift cluster. You can deploy a single app or all the apps by running the following command in the repository root directory:

$ mvn clean package -Popenshift

Once we deploy our apps we display a list of Knative services.

openshift-serverless-knative-svc

We can also verify if all the triggers have been configured properly.

Also, let’s take a look at the “Topology” view on OpenShift which illustrates our serverless architecture.

openshift-serverless-topology

Testing Services

It is also worth creating some automated tests to verify the basic functionality before deployment. Since we have simple HTTP apps and an in-memory H2 database we can create standard tests. The only thing we need to do is to mock the HTTP client.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-junit5</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>io.rest-assured</groupId>
  <artifactId>rest-assured</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-junit5-mockito</artifactId>
  <scope>test</scope>
</dependency>

Here’s the JUnit test for the payment-service. We are verifying both the reservation and confirmation processes for the same order.

@QuarkusTest
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class OrderReserveFunctionTests {

    private static int amount;
    private CustomerRepository repository;
    @InjectMock
    @RestClient
    OrderSender sender;

    public OrderReserveFunctionTests(CustomerRepository repository) {
        this.repository = repository;
    }

    @Test
    @org.junit.jupiter.api.Order(1)
    void reserve() {
        given().contentType("application/json").body(createTestOrder(OrderStatus.NEW)).post("/reserve")
                .then()
                .statusCode(204);

        Customer c = repository.findById(1L);
        amount = c.getAmountAvailable();
        assertEquals(100, c.getAmountReserved());
    }

    @Test
    @org.junit.jupiter.api.Order(2)
    void confirm() {
        given().contentType("application/json").body(createTestOrder(OrderStatus.CONFIRMED)).post("/reserve")
                .then()
                .statusCode(204);

        Customer c = repository.findById(1L);
        assertEquals(0, c.getAmountReserved());
        assertEquals(amount, c.getAmountAvailable());
    }

    private Order createTestOrder(OrderStatus status) {
        Order o = new Order();
        o.setId(1L);
        o.setSource("test");
        o.setStatus(status);
        o.setAmount(100);
        o.setCustomerId(1L);
        return o;
    }
}

Also, let’s take a look at the logs of apps running on OpenShift. As you see, the order-service receives events from both stock-service and payment-service. After that, it confirms the order and sends a confirmation message to both services.

Here are the logs from the payment-service. As you see, it receives the CloudEvent generated by the order-service (the Ce-Source header equals to order).

Final Thoughts

With OpenShift Serverless and Knative Eventing, you can easily build event-driven architecture for simple HTTP-based apps. It is completely transparent for the app, which medium is used to store events and how they are routed. The only thing it needs to do is to prepare a request according to the CloudEvent specification. OpenShift Serverless brings several features to simplify development. We can also leverage Quarkus Kubernetes Extension to easily build and deploy our apps on OpenShift as Knative services.

2 COMMENTS

comments user
Jaya

Very nice Piotr 🙂

    comments user
    piotr.minkowski

    Thanks 🙂

Leave a Reply