Introduction to ksqlDB on Kubernetes with Spring Boot

Introduction to ksqlDB on Kubernetes with Spring Boot

In this article, you will learn how to run ksqlDB on Kubernetes and use it with Spring Boot. You will also see how to run Kafka on Kubernetes based on the Strimzi operator. In order to integrate Spring Boot with the ksqlDB server, we are going to leverage a lightweight Java client provided by ksqlDB. This client supports pull and push queries. It also provides an API for inserting rows and creating tables or streams. You can read more about it in the ksqlDB documentation here.

Our sample Spring Boot application is very simple. We will use Spring Cloud Stream Supplier bean for generating and sending events to the Kafka topic. For more information about Kafka with Spring Cloud Stream please refer to the following article. On the other hand, our application gets data from the Kafka topic using kSQL queries. It also creates KTable on startup.

Let’s take a look at our architecture.

ksqldb-kubernetes-arch

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then go to the transactions-service directory. After that, you should just follow my instructions. Let’s begin.

Prerequisites

We will use several tools. You need to have:

  1. Kubernetes cluster – it may be a single-node, local cluster like Minikube or Kind. Personally, I’m using Kubernetes on the Docker Desktop
  2. kubectl CLI – to interact with the cluster
  3. Helm – we will use it to install the ksqlDB server on Kubernetes. If you don’t have Helm, you will have to install it

Run Kafka on Kubernetes with Strimzi

Of course, we need an instance of Kafka to perform our exercise. There are several ways to run Kafka on Kubernetes. I’ll show you how to do it with the operator-based approach. In the first step, you need to install OLM (Operator Lifecycle Manager) on your cluster. In order to do that, you can just execute the following command on your Kubernetes context:

$ curl -L https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.21.2/install.sh -o install.sh
$ chmod +x install.sh
$ ./install.sh v0.21.2

Then, you can proceed to the Strimzi operator installation. That’s just a single command.

$ kubectl create -f https://operatorhub.io/install/stable/strimzi-kafka-operator.yaml

Now, we can create a Kafka cluster on Kubernetes. Let’s begin with a dedicated namespace for our exercise:

$ kubectl create ns kafka

I assume you have a single-node Kubernetes cluster, so we also create a single-node Kafka. Here’s the YAML manifest with Kafka CRD. You can find it in the repository under the path k8s/cluster.yaml.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    config:
      default.replication.factor: 1
      inter.broker.protocol.version: "3.2"
      min.insync.replicas: 1
      offsets.topic.replication.factor: 1
      transaction.state.log.min.isr: 1
      transaction.state.log.replication.factor: 1
    listeners:
      - name: plain
        port: 9092
        tls: false
        type: internal
      - name: tls
        port: 9093
        tls: true
        type: internal
    replicas: 1
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 30Gi
          deleteClaim: true
    version: 3.2.0
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: true

Let’s apply it to Kubernetes in the kafka namespace:

$ kubectl apply -f k8s/cluster.yaml -n kafka

You should see a single instance of Kafka and also a single instance of Zookeeper. If the pods are running, it means you have Kafka on Kubernetes.

$ kubectl get pod -n kafka
NAME                                          READY   STATUS    RESTARTS  AGE
my-cluster-entity-operator-68cc6bc4d9-qs88p   3/3     Running   0         46m
my-cluster-kafka-0                            1/1     Running   0         48m
my-cluster-zookeeper-0                        1/1     Running   0         48m

Kafka is available inside the cluster under the name my-cluster-kafka-bootstrap and port 9092.

kubectl get svc -n kafka
NAME                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                               AGE
my-cluster-kafka-bootstrap    ClusterIP   10.108.109.255   <none>        9091/TCP,9092/TCP,9093/TCP            47m
my-cluster-kafka-brokers      ClusterIP   None             <none>        9090/TCP,9091/TCP,9092/TCP,9093/TCP   47m
my-cluster-zookeeper-client   ClusterIP   10.102.10.251    <none>        2181/TCP                              47m
my-cluster-zookeeper-nodes    ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP            47m

Run KsqlDB Server on Kubernetes

The KsqlDB Server is a part of the Confluent Platform. Since we are not installing the whole Confluent Platform on Kubernetes, but just an open-source Kafka cluster, we need to install KsqlDB Server separately. Let’s do it with Helm. There is no “official” Helm chart for the KSQL server. Therefore, we should go directly to the Confluent Helm repository on GitHub:

$ git clone https://github.com/confluentinc/cp-helm-charts.git
$ cd cp-helm-charts

In this repository, you can find separate Helm charts for every single Confluent component including e.g. control center or KSQL Server. The location of our chart inside the repository is charts/cp-ksql-server. We need to override some default settings during installation. First of all, we have to disable the headless mode. In the headless mode, KSQL Server does not expose the HTTP endpoint and loads queries from the input script. Our Spring Boot app will connect to the server through HTTP. In the next step, we should override the default address of the Kafka cluster and the default version of the KSQL Server which is still 6.1.0 there. We will use the latest version 7.1.1. Here’s the helm command you should run on your Kubernetes cluster:

$ helm install cp-ksql-server \
    --set ksql.headless=false \
    --set kafka.bootstrapServers=my-cluster-kafka-bootstrap:9092 \
    --set imageTag=7.1.1 \
  charts/cp-ksql-server -n kafka

Here’s the result:

Let’s verify if KSQL is running on the cluster:

$ kubectl get pod -n kafka | grep ksql
cp-ksql-server-679fc98889-hldfv               2/2     Running   0               2m11s

The HTTP endpoint is available for other applications under the name cp-ksql-server and port 8088:

$ kubectl get svc -n kafka | grep ksql
cp-ksql-server                ClusterIP   10.109.189.36    <none>        8088/TCP,5556/TCP                     3m25s

Now, we have the whole required staff running on our Kubernetes cluster. Therefore, we can proceed to the Spring Boot app implementation.

Integrate Spring Boot with ksqlDB

I didn’t find any out-of-the-box integration between Spring Boot and ksqlDB. Therefore, we will use the ksqldb-api-client directly. In the first, we need to include the ksqlDB Maven repository and some dependencies:

<dependencies>
        ...

  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-api-client</artifactId>
    <version>0.26.0</version>
  </dependency>
  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-udf</artifactId>
    <version>0.26.0</version>
  </dependency>
  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-common</artifactId>
    <version>0.26.0</version>
  </dependency>
</dependencies>

<repositories>
  <repository>
    <id>ksqlDB</id>
    <name>ksqlDB</name>
    <url>https://ksqldb-maven.s3.amazonaws.com/maven/</url>
  </repository>
</repositories>

After that, we can define a Spring @Bean returning the ksqlDB Client implementation. Since we will run our application in the same namespace as the KSQL Server, we need to provide the Kubernetes Service name as the host name.

@Configuration
public class KSQLClientProducer {

    @Bean
    Client ksqlClient() {
        ClientOptions options = ClientOptions.create()
                .setHost("cp-ksql-server")
                .setPort(8088);
        return Client.create(options);
    }
}

Our application is interacting with KSQL Server through an HTTP endpoint. It creates a single KTable on startup. To do that, we need to invoke the executeStatement method on the instance of the KSQL Client bean. We are creating the SOURCE table to enable running pull queries on it. The table gets data from the transactions topic. It expects JSON format in the incoming events.

public class KTableCreateListener implements ApplicationListener<ContextRefreshedEvent> {

   private static final Logger LOG = LoggerFactory.getLogger(KTableCreateListener.class);
   private Client ksqlClient;

   public KTableCreateListener(Client ksqlClient) {
      this.ksqlClient = ksqlClient;
   }

   @Override
   public void onApplicationEvent(ContextRefreshedEvent event) {
      try {
         String sql = """
                 CREATE SOURCE TABLE IF NOT EXISTS transactions_view (
                   id BIGINT PRIMARY KEY,
                   sourceAccountId BIGINT,
                   targetAccountId BIGINT,
                   amount INT
                 ) WITH (
                   kafka_topic='transactions',
                   value_format='JSON'
                 );
                 """;
         ExecuteStatementResult result = ksqlClient.executeStatement(sql).get();
         LOG.info("Result: {}", result.queryId().orElse(null));
      } catch (ExecutionException | InterruptedException e) {
         LOG.error("Error: ", e);
      }
   }
}

After creating the table we can run some queries on it. There are pretty simple queries. We are trying to find all transactions and all transactions related to the particular account.

@RestController
@RequestMapping("/transactions")
public class TransactionResource {

   private static final Logger LOG = LoggerFactory.getLogger(TransactionResource.class);
   Client ksqlClient;

   public TransactionResource(Client ksqlClient) {
      this.ksqlClient = ksqlClient;
   }

   @GetMapping
   public List<Transaction> getTransactions() throws ExecutionException, InterruptedException {
      StreamedQueryResult sqr = ksqlClient
            .streamQuery("SELECT * FROM transactions_view;")
            .get();
      Row row;
      List<Transaction> l = new ArrayList<>();
      while ((row = sqr.poll()) != null) {
         l.add(mapRowToTransaction(row));
      }
      return l;
   }

   @GetMapping("/target/{accountId}")
   public List<Transaction> getTransactionsByTargetAccountId(@PathVariable("accountId") Long accountId)
            throws ExecutionException, InterruptedException {
      StreamedQueryResult sqr = ksqlClient
            .streamQuery("SELECT * FROM transactions_view WHERE sourceAccountId=" + accountId + ";")
            .get();
      Row row;
      List<Transaction> l = new ArrayList<>();
      while ((row = sqr.poll()) != null) {
         l.add(mapRowToTransaction(row));
      }
      return l;
   }

   private Transaction mapRowToTransaction(Row row) {
      Transaction t = new Transaction();
      t.setId(row.getLong("ID"));
      t.setSourceAccountId(row.getLong("SOURCEACCOUNTID"));
      t.setTargetAccountId(row.getLong("TARGETACCOUNTID"));
      t.setAmount(row.getInteger("AMOUNT"));
      return t;
   }

}

Sending events to the topic with Spring Cloud Stream

Finally, we can proceed to the last part of our exercise. We need to generate test data and send it to the Kafka transactions topic. The simplest way to achieve it is with the Spring Cloud Stream Kafka module. Firstly, let’s add the following Maven dependency:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

Then, we may create a producer based on the Spring Supplier bean. The Supplier bean continuously generates and sends new events to the target channel. By default, it repeats the action once per second.

@Configuration
public class KafkaEventProducer {

   private static long transactionId = 0;
   private static final Random r = new Random();

   @Bean
   public Supplier<Message<Transaction>> transactionsSupplier() {
      return () -> {
          Transaction t = new Transaction();
          t.setId(++transactionId);
          t.setSourceAccountId(r.nextLong(1, 100));
          t.setTargetAccountId(r.nextLong(1, 100));
          t.setAmount(r.nextInt(1, 10000));
          Message<Transaction> o = MessageBuilder
                .withPayload(t)
                .setHeader(KafkaHeaders.MESSAGE_KEY, new TransactionKey(t.getId()))
                .build();
          return o;
      };
   }
}

Of course, we also need to provide the address of our Kafka cluster and the name of a target topic for the channel. The address of Kafka is injected at the deployment phase.

spring.kafka.bootstrap-servers = ${KAFKA_URL}
spring.cloud.stream.bindings.transactionsSupplier-out-0.destination = transactions

Finally, let’s deploy our Spring Boot on Kubernetes. Here’s the YAML manifest containing Kubernetes Deployment and Service definitions:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: transactions
spec:
  selector:
    matchLabels:
      app: transactions
  template:
    metadata:
      labels:
        app: transactions
    spec:
      containers:
      - name: transactions
        image: piomin/transactions-service
        env:
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap:9092
        ports:
        - containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: transactions
spec:
  type: ClusterIP
  selector:
    app: transactions
  ports:
    - port: 8080

Let’s deploy the app in the kafka namespace:

$ kubectl apply -f k8s/deployment.yaml -n kafka

Testing ksqlDB on Kubernetes

Once the app is deployed on Kubernetes, let’s enable port-forward to test it on the local port:

$ kubectl port-forward service/transactions 8080:8080

Now, we can test our two HTTP endpoints. Let’s start with the endpoint for searching all transactions:

$ curl http://localhost:8080/transactions

Then, you can call the endpoint for searching all transactions related to the targetAccountId, e.g.:

$ curl http://localhost:8080/transactions/target/10

Final Thoughts

In this article, I wanted to show how you can start with ksqlDB on Kubernetes. We used such frameworks as Spring Boot and Spring Cloud Stream to interact with Kafka and ksqlDB. You could see how to run the Kafka cluster on Kubernetes using the Strimzi operator or how to deploy KSQL Server directly from the Helm repository.

12 COMMENTS

comments user
Oscar (@oscarsalvatore)

Great post. Thanks!

    comments user
    piotr.minkowski

    🙂

comments user
Marcin

Is my understanding correct that queries created this way have linear complexity and looking for a value is similar to iterating through a linked list as you have to read each message from kafka sequentially even when querying for a key? On the other hands using kafka streams and state store allows u to read it all on application startup and build the local rocksDB instance for the queries to be efficient. Correct?

    comments user
    piotr.minkowski

    Yes 🙂

comments user
Nosheen

very helpful. especially the dependency issue is resolved by this article.

    comments user
    piotr.minkowski

    Thanks!

comments user
Priyank

Confluent Platform is a paid product with time-limited trial. Wish the article covered the community edition of ksqldb. From what I hear, the starting price is USD 90k for the license.

    comments user
    piotr.minkowski

    It covers a free version of ksqldb. I’m not mentioning the Confluent Platform in that article. Even for running Kafka on k8s I’m using strimzi

comments user
anhvh2610gmailcom

Hi!
How to enable authentication in Ksqldb

comments user
wackazong

You can skip cloning the git repo for the ksql server installation. Use the whole repo and disable all other services using the enabled flag, see the values.yaml.

    comments user
    piotr.minkowski

    Ok, I’ll take a look on it. Thanks for info.

Leave a Reply