Apache Kafka on Kubernetes with Strimzi

Apache Kafka on Kubernetes with Strimzi

In this article, you will learn how to install and manage Apache Kafka on Kubernetes with Strimzi. The Strimzi operator lets us declaratively define and configure Kafka clusters, and several other components like Kafka Connect, Mirror Maker, or Cruise Control. Of course, it’s not the only way to install Kafka on Kubernetes. As an alternative, we can use the Bitnami Helm chart available here. In comparison to that approach, Strimzi simplifies the creation of additional components. We will analyze it on the example of the Cruise Control tool.

You can find many other articles about Apache Kafka on my blog. For example, to read about concurrency with Spring Kafka please refer to the following post. There is also an article about Kafka transactions available here.

Prerequisites

In order to proceed with the exercise, you need to have a Kubernetes cluster. This cluster should have at least three worker nodes since I’m going to show you the approach with Kafka brokers spread across several nodes. We can easily simulate multiple Kubernetes nodes locally with Kind. You need to install the kind CLI tool and start Docker on your laptop. Here’s the Kind configuration manifest containing a definition of a single control plane and 4 worker nodes:

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
- role: worker

Then, we need to create the Kubernetes cluster based on the manifest visible above with the following kind command:

$ kind create cluster --name c1 --config cluster.yaml

The name of our Kind cluster is c1. It corresponds to the kind-c1 Kubernetes context, which is automatically set as default after creating the cluster. After that, we can display a list of Kubernetes nodes using the following kubectl command:

$ kubectl get node
NAME               STATUS   ROLES           AGE  VERSION
c1-control-plane   Ready    control-plane   1m   v1.27.3
c1-worker          Ready    <none>          1m   v1.27.3
c1-worker2         Ready    <none>          1m   v1.27.3
c1-worker3         Ready    <none>          1m   v1.27.3
c1-worker4         Ready    <none>          1m   v1.27.3

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, go to the kafka directory. There are two Spring Boot apps inside the producer and consumer directories. The required Kubernetes manifests are available inside the k8s directory. You can apply them with kubectl or using the Skaffold CLI tool. The repository is already configured to work with Skaffold and Kind. To proceed with the exercise just follow my instructions in the next sections.

Architecture

Let’s analyze our main goals in this exercise. Of course, we want to run a Kafka cluster on Kubernetes as simple as possible. There are several requirements for the cluster:

  1. It should automatically expose broker metrics in the Prometheus format. Then we will use Prometheus mechanisms to get the metrics and store them for visualization.
  2. It should consist of at least 3 brokers. Each broker has to run on a different Kubernetes worker node.
  3. Our Kafka needs to work in the Zookeeper-less mode. Therefore, we need to enable the KRaft protocol between the brokers.
  4. Once we scale up the Kafka cluster, we must automatically rebalance it to reassign partition replicas to the new broker. In order to do that, we will use the Cruise Control support in Strimzi.

Here’s the diagram that visualizes the described architecture. We will also run two simple Spring Boot apps on Kubernetes that connect the Kafka cluster and use it to send/receive messages.

kafka-on-kubernetes-arch

1. Install Monitoring Stack on Kubernetes

In the first step, we will install the monitoring on our Kubernetes cluster. We are going to use the kube-prometheus-stack Helm chart for that. It provides preconfigured instances of Prometheus and Grafana. It also comes with several CRD objects that allow us to easily customize monitoring mechanisms according to our needs. Let’s add the following Helm repository:

$ helm repo add prometheus-community \
    https://prometheus-community.github.io/helm-charts

Then, we can install the chart in the monitoring namespace. We can leave the default configuration.

$ helm install kube-prometheus-stack \
    prometheus-community/kube-prometheus-stack \
    --version 52.1.0 -n monitoring --create-namespace

2. Install Strimzi Operator on Kubernetes

In the next step, we will install the Strimzi operator on Kubernetes using Helm chart. The same as before we need to add the Helm repository:

$ helm repo add strimzi https://strimzi.io/charts

Then, we can proceed to the installation. This time we will override some configuration settings. The Strimzi Helm chart comes with a set of Grafana dashboards to visualize metrics exported by Kafka brokers and some other components managed by Strimzi. We place those dashboards inside the monitoring namespace. By default, the Strimzi chart doesn’t add the dashboards, so we also need to enable that feature in the values YAML file. That’s not all. Because we want to run Kafka in the KRaft mode, we need to enable it using feature gates. Enabling the UseKRaft feature gate requires the KafkaNodePools feature gate to be enabled as well. Then when we deploy a Kafka cluster in KRaft mode, we also must use the KafkaNodePool resources. Here’s the full list of overridden Helm chart values:

dashboards:
  enabled: true
  namespace: monitoring
featureGates: +UseKRaft,+KafkaNodePools,+UnidirectionalTopicOperator

Finally, let’s install the operator in the strimzi namespace using the following command:

$ helm install strimzi-kafka-operator strimzi/strimzi-kafka-operator \
    --version 0.38.0 \
    -n strimzi --create-namespace \
    -f strimzi-values.yaml

3. Run Kafka in the KRaft Mode

In the current version of Strimzi KRaft mode support is still in the alpha phase. This will probably change soon but for now, we have to deal with some inconveniences. In the previous section, we enabled three feature gates required to run Kafka in KRaft mode. Thanks to that we can finally define our Kafka cluster. In the first step, we need to create a node pool. This new Strimzi object is responsible for configuring brokers and controllers in the cluster. Controllers are responsible for coordinating operations and maintaining the cluster’s state. Fortunately, a single node in the poll can act as a controller and a broker at the same time.

Let’s create the KafkaNodePool object for our cluster. As you see it defines two roles: broker and controller (1). We can also configure storage for the cluster members (2). One of our goals is to avoid sharing the same Kubernetes node between Kafka brokers. Therefore, we will define the podAntiAffinity section (3). Setting the topologyKey to kubernetes.io/hostname indicates that the selected pods are not scheduled on nodes with the same hostname (4).

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: dual-role
  namespace: strimzi
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles: # (1)
    - controller
    - broker
  storage: # (2)
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 20Gi
        deleteClaim: false
  template:
    pod:
      affinity:
        podAntiAffinity: # (3)
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: strimzi.io/name
                    operator: In
                    values:
                      - my-cluster-kafka
              topologyKey: "kubernetes.io/hostname" # (4)

Once we create a node pool, we can proceed to the Kafka object creation. We need to enable Kraft mode and node pools for the particular cluster by annotating it with strimzi.io/kraft and strimzi.io/node-pools (1). The sections like storage (2) or zookeeper (5) are not used in the KRaft mode but are still required by the CRD. We should also configure the cluster metrics exporter (3) and enable the Cruise Control component (4). Of course, our cluster is exposing API for the client connection under the 9092 port.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: strimzi
  annotations: # (1)
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: '3.6'
    storage: # (2)
      type: persistent-claim
      size: 5Gi
      deleteClaim: true
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    version: 3.6.0
    replicas: 3
    metricsConfig: # (3)
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics
          key: kafka-metrics-config.yml
  entityOperator:
    topicOperator: {}
    userOperator: {}
  cruiseControl: {} # (4)
  # (5)
  zookeeper:
    storage:
      type: persistent-claim
      deleteClaim: true
      size: 2Gi
    replicas: 3

The metricsConfig section in the Kafka object took the ConfigMap as the configuration source. This ConfigMap contains a single kafka-metrics-config.yml entry with the Prometheus rules definition.

kind: ConfigMap
apiVersion: v1
metadata:
  name: kafka-metrics
  namespace: strimzi
  labels:
    app: strimzi
data:
  kafka-metrics-config.yml: |
    lowercaseOutputName: true
    rules:
    - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
      name: kafka_server_$1_$2
      type: GAUGE
      labels:
        clientId: "$3"
        topic: "$4"
        partition: "$5"
    - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
      name: kafka_server_$1_$2
      type: GAUGE
      labels:
        clientId: "$3"
        broker: "$4:$5"
    - pattern: kafka.server<type=(.+), cipher=(.+), protocol=(.+), listener=(.+), networkProcessor=(.+)><>connections
      name: kafka_server_$1_connections_tls_info
      type: GAUGE
      labels:
        cipher: "$2"
        protocol: "$3"
        listener: "$4"
        networkProcessor: "$5"
    - pattern: kafka.server<type=(.+), clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections
      name: kafka_server_$1_connections_software
      type: GAUGE
      labels:
        clientSoftwareName: "$2"
        clientSoftwareVersion: "$3"
        listener: "$4"
        networkProcessor: "$5"
    - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+):"
      name: kafka_server_$1_$4
      type: GAUGE
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+)
      name: kafka_server_$1_$4
      type: GAUGE
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate
      name: kafka_$1_$2_$3_percent
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value
      name: kafka_$1_$2_$3_percent
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value
      name: kafka_$1_$2_$3_percent
      type: GAUGE
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        "$6": "$7"
        quantile: "0.$8"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        quantile: "0.$6"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        quantile: "0.$4"
    - pattern: "kafka.server<type=raft-metrics><>(.+-total|.+-max):"
      name: kafka_server_raftmetrics_$1
      type: COUNTER
    - pattern: "kafka.server<type=raft-metrics><>(.+):"
      name: kafka_server_raftmetrics_$1
      type: GAUGE
    - pattern: "kafka.server<type=raft-channel-metrics><>(.+-total|.+-max):"
      name: kafka_server_raftchannelmetrics_$1
      type: COUNTER
    - pattern: "kafka.server<type=raft-channel-metrics><>(.+):"
      name: kafka_server_raftchannelmetrics_$1
      type: GAUGE
    - pattern: "kafka.server<type=broker-metadata-metrics><>(.+):"
      name: kafka_server_brokermetadatametrics_$1
      type: GAUGE

4. Interacting with Kafka on Kubernetes

Once we apply the KafkaNodePool and Kafka objects to the Kubernetes cluster, Strimzi starts provisioning. As a result, you should see the broker pods, a single pod related to Cruise Control, and a metrics exporter pod. Each Kafka broker pod is running on a different Kubernetes node:

Clients can connect Kafka using the my-cluster-kafka-bootstrap Service under the 9092 port:

$ kubectl get svc
NAME                         TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                        AGE
my-cluster-cruise-control    ClusterIP   10.96.108.204   <none>        9090/TCP                                       4m10s
my-cluster-kafka-bootstrap   ClusterIP   10.96.155.136   <none>        9091/TCP,9092/TCP,9093/TCP                     4m59s
my-cluster-kafka-brokers     ClusterIP   None            <none>        9090/TCP,9091/TCP,8443/TCP,9092/TCP,9093/TCP   4m59s

In the next step, we will deploy our two apps for producing and consuming messages. The producer app sends one message per second to the target topic:

@SpringBootApplication
@EnableScheduling
public class KafkaProducer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaProducer.class);

   public static void main(String[] args) {
      SpringApplication.run(KafkaProducer.class, args);
   }

   AtomicLong id = new AtomicLong();
   @Autowired
   KafkaTemplate<Long, Info> template;

   @Value("${POD:kafka-producer}")
   private String pod;
   @Value("${NAMESPACE:empty}")
   private String namespace;
   @Value("${CLUSTER:localhost}")
   private String cluster;
   @Value("${TOPIC:test}")
   private String topic;

   @Scheduled(fixedRate = 1000)
   public void send() {
      Info info = new Info(id.incrementAndGet(), 
                           pod, namespace, cluster, "HELLO");
      CompletableFuture<SendResult<Long, Info>> result = template
         .send(topic, info.getId(), info);
      result.whenComplete((sr, ex) ->
         LOG.info("Sent({}): {}", sr.getProducerRecord().key(), 
         sr.getProducerRecord().value()));
   }
}

Here’s the Deployment manifest for the producer app:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap
          - name: CLUSTER
            value: c1
          - name: TOPIC
            value: test-1
          - name: POD
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace

Before running the app we can create the test-1 topic with the Strimzi CRD:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: test-1
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 12
  replicas: 3
  config:
    retention.ms: 7200000
    segment.bytes: 1000000

The consumer app is listening for incoming messages. Here’s the bean responsible for receiving and logging messages:

@SpringBootApplication
@EnableKafka
public class KafkaConsumer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaConsumer.class);

   public static void main(String[] args) {
      SpringApplication.run(KafkaConsumer.class, args);
   }

   @Value("${app.in.topic}")
   private String topic;

   @KafkaListener(id = "info", topics = "${app.in.topic}")
   public void onMessage(@Payload Info info,
      @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key,
      @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("Received(key={}, partition={}): {}", key, partition, info);
   }
}

Here’s the Deployment manifest for the consumer app:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer
spec:
  selector:
    matchLabels:
      app: consumer
  template:
    metadata:
      labels:
        app: consumer
    spec:
      containers:
      - name: consumer
        image: piomin/consumer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: TOPIC
            value: test-1
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap

We can run both Spring Boot apps using Skaffold. Firstly, we need to go to the kafka directory in our repository. Then let’s run the following command:

$ skaffold run -n strimzi --tail

Finally, we can verify the logs printed by our apps. As you see, all the messages sent by the producer app are received by the consumer app.

kafka-on-kubernetes-logs

5. Kafka Metrics in Prometheus

Once we installed the Strimzi Helm chart with the dashboard.enabled=true and dashboard.namespace=monitoring, we have several Grafana dashboard manifests placed in the monitoring namespace. Each dashboard is represented as a ConfigMap. Let’s display a list of ConfigMaps installed by the Strimzi Helm chart:

$ kubectl get cm -n monitoring | grep strimzi
strimzi-cruise-control                                    1      2m
strimzi-kafka                                             1      2m
strimzi-kafka-bridge                                      1      2m
strimzi-kafka-connect                                     1      2m
strimzi-kafka-exporter                                    1      2m
strimzi-kafka-mirror-maker-2                              1      2m
strimzi-kafka-oauth                                       1      2m
strimzi-kraft                                             1      2m
strimzi-operators                                         1      2m
strimzi-zookeeper                                         1      2m

Since Grafana is also installed in the monitoring namespace, it automatically imports all the dashboards from ConfigMaps annotated with grafana_dashboard. Consequently, after logging into Grafana (admin / prom-operator), we can easily switch between all the Kafka-related dashboards.

The only problem is that Prometheus doesn’t scrape the metrics exposed by the Kafka pods. Since we have already configured metrics exporting on the Strimzi Kafka CRD, Kafka pods expose the /metric endpoint for Prometheus under the 9404 port. Let’s take a look at the Kafka broker pod details:

In order to force Prometheus to scrape metrics from Kafka pods, we need to create the PodMonitor object. We should place it in the monitoring (1) namespace and set the release=kube-prometheus-stack label (2). The PodMonitor object filters all the pods from the strimzi namespace (3) that contains the strimzi.io/kind label having one of the values: Kafka, KafkaConnect, KafkaMirrorMaker, KafkaMirrorMaker2 (4). Also, it has to query the /metrics endpoint under the port with the tcp-prometheus name (5).

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: kafka-resources-metrics
  namespace: monitoring
  labels:
    app: strimzi
    release: kube-prometheus-stack
spec:
  selector:
    matchExpressions:
      - key: "strimzi.io/kind"
        operator: In
        values: ["Kafka", "KafkaConnect", "KafkaMirrorMaker", "KafkaMirrorMaker2"]
  namespaceSelector:
    matchNames:
      - strimzi
  podMetricsEndpoints:
  - path: /metrics
    port: tcp-prometheus
    relabelings:
    - separator: ;
      regex: __meta_kubernetes_pod_label_(strimzi_io_.+)
      replacement: $1
      action: labelmap
    - sourceLabels: [__meta_kubernetes_namespace]
      separator: ;
      regex: (.*)
      targetLabel: namespace
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_name]
      separator: ;
      regex: (.*)
      targetLabel: kubernetes_pod_name
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_node_name]
      separator: ;
      regex: (.*)
      targetLabel: node_name
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_host_ip]
      separator: ;
      regex: (.*)
      targetLabel: node_ip
      replacement: $1
      action: replace

Finally, we can display the Grafana dashboard with Kafka metrics visualization. Let’s choose the dashboard with the “Strimzi Kafka” name. Here’s the general view:

kafka-on-kubernetes-metrics

There are several other diagrams available. For example, we can take a look at the statistics related to the incoming and outgoing messages.

6. Rebalancing Kafka with Cruise Control

Let’s analyze the typical scenario around Kafka related to increasing the number of brokers in the cluster. Before we do it, we will generate more incoming traffic to the test-1 topic. In order to do it, we can use the Grafana k6 tool. The k6 tool provides several extensions for load testing – including the Kafka plugin. Here’s the Deployment manifest that runs k6 with the Kafka extension on Kubernetes.

kind: ConfigMap
apiVersion: v1
metadata:
  name: load-test-cm
  namespace: strimzi
data:
  load-test.js: |
    import {
      Writer,
      SchemaRegistry,
      SCHEMA_TYPE_JSON,
    } from "k6/x/kafka";
    const writer = new Writer({
      brokers: ["my-cluster-kafka-bootstrap.strimzi:9092"],
      topic: "test-1",
    });
    const schemaRegistry = new SchemaRegistry();
    export default function () {
      writer.produce({
        messages: [
          {
            value: schemaRegistry.serialize({
              data: {
                id: 1,
                source: "test",
                space: "strimzi",
                cluster: "c1",
                message: "HELLO"
              },
              schemaType: SCHEMA_TYPE_JSON,
            }),
          },
        ],
      });
    }
    
    export function teardown(data) {
      writer.close();
    }
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: k6-test
  namespace: strimzi
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: k6-test
  template:
    metadata:
      labels:
        app.kubernetes.io/name: k6-test
    spec:
      containers:
        - image: mostafamoradian/xk6-kafka:latest
          name: xk6-kafka
          command:
            - "k6"
            - "run"
            - "--vus"
            - "1"
            - "--duration"
            - "720s"
            - "/tests/load-test.js"
          env:
            - name: KAFKA_URL
              value: my-cluster-kafka-bootstrap
            - name: CLUSTER
              value: c1
            - name: TOPIC
              value: test-1
            - name: POD
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
          volumeMounts:
            - mountPath: /tests
              name: test
      volumes:
        - name: test
          configMap:
            name: load-test-cm

Let’s apply the manifest to the strimzi namespace with the following command:

$ kubectl apply -f k8s/k6.yaml

After that, we can take a look at the k6 Pod logs. As you see, it generates and sends a lot of messages to the test-1 topic on our Kafka cluster:

Now, let’s increase the number of Kafka brokers in our cluster. We can do it by changing the value of the replicas field in the KafkaNodePool object:

$ kubectl scale kafkanodepool dual-role --replicas=4 -n strimzi

After a while, Strimzi will start a new pod with another Kafka broker. Although we have a new member of the Kafka cluster, all the partitions are still distributed only across three previous brokers. The situation would be different for the new topic. However, the partitions related to the existing topics won’t be automatically migrated to the new broker instance. Let’s verify the current partition structure for the test-1 topic with kcat CLI (I’m exposing Kafka API locally with kubectl port-forward):

$ kcat -b localhost:9092 -L -t test-1
Metadata for test-1 (from broker -1: localhost:9092/bootstrap):
 4 brokers:
  broker 0 at my-cluster-dual-role-0.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 1 at my-cluster-dual-role-1.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 2 at my-cluster-dual-role-2.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 3 at my-cluster-dual-role-3.my-cluster-kafka-brokers.strimzi.svc:9092 (controller)
 1 topics:
  topic "test-1" with 12 partitions:
    partition 0, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 1, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 2, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 3, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 4, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 5, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 6, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 7, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 8, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 9, leader 0, replicas: 0,2,1, isrs: 1,0,2
    partition 10, leader 2, replicas: 2,1,0, isrs: 1,0,2
    partition 11, leader 1, replicas: 1,0,2, isrs: 1,0,2

Here comes Cruise Control. Cruise Control makes managing and operating Kafka much easier. For example, it allows us to move partitions across brokers after scaling up the cluster. Let’s see how it works. We have already enabled Cruise Control in the Strimzi Kafka CRD. In order to begin a rebalancing procedure, we should create the KafkaRebalance object. This object is responsible for asking Cruise Control to generate an optimization proposal.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: my-rebalance
  labels:
    strimzi.io/cluster: my-cluster
spec: {}

If the optimization proposal is ready you will see the ProposalReady value in the Status.Conditions.Type field. I won’t get into the details of Cruise Control. It suggested moving 58 partition replicas between separate brokers in the cluster.

Let’s accept the proposal by annotating the KafkaRebalance object with strimzi.io/rebalance=approve:

$ kubectl annotate kafkarebalance my-rebalance \   
    strimzi.io/rebalance=approve -n strimzi

Finally, we can run the kcat command on the test-1 topic once again. Now, as you see, partition replicas are spread across all the brokers.

$ kcat -b localhost:9092 -L -t test-1
Metadata for test-1 (from broker -1: localhost:9092/bootstrap):
 4 brokers:
  broker 0 at my-cluster-dual-role-0.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 1 at my-cluster-dual-role-1.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 2 at my-cluster-dual-role-2.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 3 at my-cluster-dual-role-3.my-cluster-kafka-brokers.strimzi.svc:9092 (controller)
 1 topics:
  topic "test-1" with 12 partitions:
    partition 0, leader 2, replicas: 2,1,3, isrs: 1,2,3
    partition 1, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 2, leader 2, replicas: 0,2,1, isrs: 1,0,2
    partition 3, leader 0, replicas: 0,2,3, isrs: 0,2,3
    partition 4, leader 1, replicas: 3,2,1, isrs: 1,2,3
    partition 5, leader 2, replicas: 2,3,0, isrs: 0,2,3
    partition 6, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 7, leader 1, replicas: 3,1,0, isrs: 1,0,3
    partition 8, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 9, leader 0, replicas: 0,3,1, isrs: 1,0,3
    partition 10, leader 2, replicas: 2,3,0, isrs: 0,2,3
    partition 11, leader 1, replicas: 1,0,3, isrs: 1,0,3

Final Thoughts

Strimzi allows us not only to install and manage Kafka but also the whole ecosystem around it. In this article, I showed how to export metrics to Prometheus and use the Cruise Control tool to rebalance a cluster after scale-up. We also ran Kafka in KRaft mode and then connected two simple Java apps with the cluster through Kubernetes Service.

6 COMMENTS

comments user
Konstantin

Could you please make a post about opentelemtry and tracing Kafka messages going to different topics (topic1 -> topic2 – topic3) using strimzi springboot and open telemetry with gui like jager

comments user
ishanrakitha

Thank you very much for the article. This is very informative.

    comments user
    piotr.minkowski

    You are welcome

comments user
Ivan

Thanks for the article, but Im encountering a problem on my side. I use Macbook pro 2 with minkube and when I install the operator and the node pools I receive the following error:
mkdir: cannot create directory ‘/var/lib/kafka/data-0/kafka-log0’: Permission denied
Do you have any idea if this is caused by the arm cpu or Im missing something because from permissions perspective I don’t have any issues.

    comments user
    piotr.minkowski

    No, I didn’t have. Do you run according to my instructions? did you try to run it in the ephemeral mode?

Leave a Reply