Introduction to ksqlDB on Kubernetes with Spring Boot
In this article, you will learn how to run ksqlDB on Kubernetes and use it with Spring Boot. You will also see how to run Kafka on Kubernetes based on the Strimzi operator. In order to integrate Spring Boot with the ksqlDB server, we are going to leverage a lightweight Java client provided by ksqlDB. This client supports pull and push queries. It also provides an API for inserting rows and creating tables or streams. You can read more about it in the ksqlDB documentation here.
Our sample Spring Boot application is very simple. We will use Spring Cloud Stream Supplier
bean for generating and sending events to the Kafka topic. For more information about Kafka with Spring Cloud Stream please refer to the following article. On the other hand, our application gets data from the Kafka topic using kSQL queries. It also creates KTable
on startup.
Let’s take a look at our architecture.
Source Code
If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then go to the transactions-service
directory. After that, you should just follow my instructions. Let’s begin.
Prerequisites
We will use several tools. You need to have:
- Kubernetes cluster – it may be a single-node, local cluster like Minikube or Kind. Personally, I’m using Kubernetes on the Docker Desktop
kubectl
CLI – to interact with the cluster- Helm – we will use it to install the ksqlDB server on Kubernetes. If you don’t have Helm, you will have to install it
Run Kafka on Kubernetes with Strimzi
Of course, we need an instance of Kafka to perform our exercise. There are several ways to run Kafka on Kubernetes. I’ll show you how to do it with the operator-based approach. In the first step, you need to install OLM (Operator Lifecycle Manager) on your cluster. In order to do that, you can just execute the following command on your Kubernetes context:
$ curl -L https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.21.2/install.sh -o install.sh
$ chmod +x install.sh
$ ./install.sh v0.21.2
Then, you can proceed to the Strimzi operator installation. That’s just a single command.
$ kubectl create -f https://operatorhub.io/install/stable/strimzi-kafka-operator.yaml
Now, we can create a Kafka cluster on Kubernetes. Let’s begin with a dedicated namespace for our exercise:
$ kubectl create ns kafka
I assume you have a single-node Kubernetes cluster, so we also create a single-node Kafka. Here’s the YAML manifest with Kafka
CRD. You can find it in the repository under the path k8s/cluster.yaml
.
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
entityOperator:
topicOperator: {}
userOperator: {}
kafka:
config:
default.replication.factor: 1
inter.broker.protocol.version: "3.2"
min.insync.replicas: 1
offsets.topic.replication.factor: 1
transaction.state.log.min.isr: 1
transaction.state.log.replication.factor: 1
listeners:
- name: plain
port: 9092
tls: false
type: internal
- name: tls
port: 9093
tls: true
type: internal
replicas: 1
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 30Gi
deleteClaim: true
version: 3.2.0
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 10Gi
deleteClaim: true
Let’s apply it to Kubernetes in the kafka
namespace:
$ kubectl apply -f k8s/cluster.yaml -n kafka
You should see a single instance of Kafka and also a single instance of Zookeeper. If the pods are running, it means you have Kafka on Kubernetes.
$ kubectl get pod -n kafka
NAME READY STATUS RESTARTS AGE
my-cluster-entity-operator-68cc6bc4d9-qs88p 3/3 Running 0 46m
my-cluster-kafka-0 1/1 Running 0 48m
my-cluster-zookeeper-0 1/1 Running 0 48m
Kafka is available inside the cluster under the name my-cluster-kafka-bootstrap
and port 9092
.
kubectl get svc -n kafka
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-cluster-kafka-bootstrap ClusterIP 10.108.109.255 <none> 9091/TCP,9092/TCP,9093/TCP 47m
my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP,9093/TCP 47m
my-cluster-zookeeper-client ClusterIP 10.102.10.251 <none> 2181/TCP 47m
my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 47m
Run KsqlDB Server on Kubernetes
The KsqlDB Server is a part of the Confluent Platform. Since we are not installing the whole Confluent Platform on Kubernetes, but just an open-source Kafka cluster, we need to install KsqlDB Server separately. Let’s do it with Helm. There is no “official” Helm chart for the KSQL server. Therefore, we should go directly to the Confluent Helm repository on GitHub:
$ git clone https://github.com/confluentinc/cp-helm-charts.git
$ cd cp-helm-charts
In this repository, you can find separate Helm charts for every single Confluent component including e.g. control center or KSQL Server. The location of our chart inside the repository is charts/cp-ksql-server
. We need to override some default settings during installation. First of all, we have to disable the headless mode. In the headless mode, KSQL Server does not expose the HTTP endpoint and loads queries from the input script. Our Spring Boot app will connect to the server through HTTP. In the next step, we should override the default address of the Kafka cluster and the default version of the KSQL Server which is still 6.1.0
there. We will use the latest version 7.1.1
. Here’s the helm
command you should run on your Kubernetes cluster:
$ helm install cp-ksql-server \
--set ksql.headless=false \
--set kafka.bootstrapServers=my-cluster-kafka-bootstrap:9092 \
--set imageTag=7.1.1 \
charts/cp-ksql-server -n kafka
Here’s the result:
Let’s verify if KSQL is running on the cluster:
$ kubectl get pod -n kafka | grep ksql
cp-ksql-server-679fc98889-hldfv 2/2 Running 0 2m11s
The HTTP endpoint is available for other applications under the name cp-ksql-server
and port 8088
:
$ kubectl get svc -n kafka | grep ksql
cp-ksql-server ClusterIP 10.109.189.36 <none> 8088/TCP,5556/TCP 3m25s
Now, we have the whole required staff running on our Kubernetes cluster. Therefore, we can proceed to the Spring Boot app implementation.
Integrate Spring Boot with ksqlDB
I didn’t find any out-of-the-box integration between Spring Boot and ksqlDB. Therefore, we will use the ksqldb-api-client
directly. In the first, we need to include the ksqlDB Maven repository and some dependencies:
<dependencies>
...
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>0.26.0</version>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-udf</artifactId>
<version>0.26.0</version>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-common</artifactId>
<version>0.26.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>ksqlDB</id>
<name>ksqlDB</name>
<url>https://ksqldb-maven.s3.amazonaws.com/maven/</url>
</repository>
</repositories>
After that, we can define a Spring @Bean
returning the ksqlDB Client
implementation. Since we will run our application in the same namespace as the KSQL Server, we need to provide the Kubernetes Service name as the host name.
@Configuration
public class KSQLClientProducer {
@Bean
Client ksqlClient() {
ClientOptions options = ClientOptions.create()
.setHost("cp-ksql-server")
.setPort(8088);
return Client.create(options);
}
}
Our application is interacting with KSQL Server through an HTTP endpoint. It creates a single KTable
on startup. To do that, we need to invoke the executeStatement
method on the instance of the KSQL Client
bean. We are creating the SOURCE table to enable running pull queries on it. The table gets data from the transactions
topic. It expects JSON format in the incoming events.
public class KTableCreateListener implements ApplicationListener<ContextRefreshedEvent> {
private static final Logger LOG = LoggerFactory.getLogger(KTableCreateListener.class);
private Client ksqlClient;
public KTableCreateListener(Client ksqlClient) {
this.ksqlClient = ksqlClient;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
try {
String sql = """
CREATE SOURCE TABLE IF NOT EXISTS transactions_view (
id BIGINT PRIMARY KEY,
sourceAccountId BIGINT,
targetAccountId BIGINT,
amount INT
) WITH (
kafka_topic='transactions',
value_format='JSON'
);
""";
ExecuteStatementResult result = ksqlClient.executeStatement(sql).get();
LOG.info("Result: {}", result.queryId().orElse(null));
} catch (ExecutionException | InterruptedException e) {
LOG.error("Error: ", e);
}
}
}
After creating the table we can run some queries on it. There are pretty simple queries. We are trying to find all transactions and all transactions related to the particular account.
@RestController
@RequestMapping("/transactions")
public class TransactionResource {
private static final Logger LOG = LoggerFactory.getLogger(TransactionResource.class);
Client ksqlClient;
public TransactionResource(Client ksqlClient) {
this.ksqlClient = ksqlClient;
}
@GetMapping
public List<Transaction> getTransactions() throws ExecutionException, InterruptedException {
StreamedQueryResult sqr = ksqlClient
.streamQuery("SELECT * FROM transactions_view;")
.get();
Row row;
List<Transaction> l = new ArrayList<>();
while ((row = sqr.poll()) != null) {
l.add(mapRowToTransaction(row));
}
return l;
}
@GetMapping("/target/{accountId}")
public List<Transaction> getTransactionsByTargetAccountId(@PathVariable("accountId") Long accountId)
throws ExecutionException, InterruptedException {
StreamedQueryResult sqr = ksqlClient
.streamQuery("SELECT * FROM transactions_view WHERE sourceAccountId=" + accountId + ";")
.get();
Row row;
List<Transaction> l = new ArrayList<>();
while ((row = sqr.poll()) != null) {
l.add(mapRowToTransaction(row));
}
return l;
}
private Transaction mapRowToTransaction(Row row) {
Transaction t = new Transaction();
t.setId(row.getLong("ID"));
t.setSourceAccountId(row.getLong("SOURCEACCOUNTID"));
t.setTargetAccountId(row.getLong("TARGETACCOUNTID"));
t.setAmount(row.getInteger("AMOUNT"));
return t;
}
}
Sending events to the topic with Spring Cloud Stream
Finally, we can proceed to the last part of our exercise. We need to generate test data and send it to the Kafka transactions
topic. The simplest way to achieve it is with the Spring Cloud Stream Kafka module. Firstly, let’s add the following Maven dependency:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
Then, we may create a producer based on the Spring Supplier
bean. The Supplier
bean continuously generates and sends new events to the target channel. By default, it repeats the action once per second.
@Configuration
public class KafkaEventProducer {
private static long transactionId = 0;
private static final Random r = new Random();
@Bean
public Supplier<Message<Transaction>> transactionsSupplier() {
return () -> {
Transaction t = new Transaction();
t.setId(++transactionId);
t.setSourceAccountId(r.nextLong(1, 100));
t.setTargetAccountId(r.nextLong(1, 100));
t.setAmount(r.nextInt(1, 10000));
Message<Transaction> o = MessageBuilder
.withPayload(t)
.setHeader(KafkaHeaders.MESSAGE_KEY, new TransactionKey(t.getId()))
.build();
return o;
};
}
}
Of course, we also need to provide the address of our Kafka cluster and the name of a target topic for the channel. The address of Kafka is injected at the deployment phase.
spring.kafka.bootstrap-servers = ${KAFKA_URL}
spring.cloud.stream.bindings.transactionsSupplier-out-0.destination = transactions
Finally, let’s deploy our Spring Boot on Kubernetes. Here’s the YAML manifest containing Kubernetes Deployment
and Service
definitions:
apiVersion: apps/v1
kind: Deployment
metadata:
name: transactions
spec:
selector:
matchLabels:
app: transactions
template:
metadata:
labels:
app: transactions
spec:
containers:
- name: transactions
image: piomin/transactions-service
env:
- name: KAFKA_URL
value: my-cluster-kafka-bootstrap:9092
ports:
- containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
name: transactions
spec:
type: ClusterIP
selector:
app: transactions
ports:
- port: 8080
Let’s deploy the app in the kafka
namespace:
$ kubectl apply -f k8s/deployment.yaml -n kafka
Testing ksqlDB on Kubernetes
Once the app is deployed on Kubernetes, let’s enable port-forward
to test it on the local port:
$ kubectl port-forward service/transactions 8080:8080
Now, we can test our two HTTP endpoints. Let’s start with the endpoint for searching all transactions:
$ curl http://localhost:8080/transactions
Then, you can call the endpoint for searching all transactions related to the targetAccountId
, e.g.:
$ curl http://localhost:8080/transactions/target/10
Final Thoughts
In this article, I wanted to show how you can start with ksqlDB on Kubernetes. We used such frameworks as Spring Boot and Spring Cloud Stream to interact with Kafka and ksqlDB. You could see how to run the Kafka cluster on Kubernetes using the Strimzi operator or how to deploy KSQL Server directly from the Helm repository.
12 COMMENTS