Apache Kafka on Kubernetes with Strimzi
In this article, you will learn how to install and manage Apache Kafka on Kubernetes with Strimzi. The Strimzi operator lets us declaratively define and configure Kafka clusters, and several other components like Kafka Connect, Mirror Maker, or Cruise Control. Of course, it’s not the only way to install Kafka on Kubernetes. As an alternative, we can use the Bitnami Helm chart available here. In comparison to that approach, Strimzi simplifies the creation of additional components. We will analyze it on the example of the Cruise Control tool.
You can find many other articles about Apache Kafka on my blog. For example, to read about concurrency with Spring Kafka please refer to the following post. There is also an article about Kafka transactions available here.
Prerequisites
In order to proceed with the exercise, you need to have a Kubernetes cluster. This cluster should have at least three worker nodes since I’m going to show you the approach with Kafka brokers spread across several nodes. We can easily simulate multiple Kubernetes nodes locally with Kind. You need to install the kind
CLI tool and start Docker on your laptop. Here’s the Kind configuration manifest containing a definition of a single control plane and 4 worker nodes:
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
- role: worker
Then, we need to create the Kubernetes cluster based on the manifest visible above with the following kind
command:
$ kind create cluster --name c1 --config cluster.yaml
The name of our Kind cluster is c1
. It corresponds to the kind-c1
Kubernetes context, which is automatically set as default after creating the cluster. After that, we can display a list of Kubernetes nodes using the following kubectl
command:
$ kubectl get node
NAME STATUS ROLES AGE VERSION
c1-control-plane Ready control-plane 1m v1.27.3
c1-worker Ready <none> 1m v1.27.3
c1-worker2 Ready <none> 1m v1.27.3
c1-worker3 Ready <none> 1m v1.27.3
c1-worker4 Ready <none> 1m v1.27.3
Source Code
If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, go to the kafka
directory. There are two Spring Boot apps inside the producer
and consumer
directories. The required Kubernetes manifests are available inside the k8s
directory. You can apply them with kubectl
or using the Skaffold CLI tool. The repository is already configured to work with Skaffold and Kind. To proceed with the exercise just follow my instructions in the next sections.
Architecture
Let’s analyze our main goals in this exercise. Of course, we want to run a Kafka cluster on Kubernetes as simple as possible. There are several requirements for the cluster:
- It should automatically expose broker metrics in the Prometheus format. Then we will use Prometheus mechanisms to get the metrics and store them for visualization.
- It should consist of at least 3 brokers. Each broker has to run on a different Kubernetes worker node.
- Our Kafka needs to work in the Zookeeper-less mode. Therefore, we need to enable the KRaft protocol between the brokers.
- Once we scale up the Kafka cluster, we must automatically rebalance it to reassign partition replicas to the new broker. In order to do that, we will use the Cruise Control support in Strimzi.
Here’s the diagram that visualizes the described architecture. We will also run two simple Spring Boot apps on Kubernetes that connect the Kafka cluster and use it to send/receive messages.
1. Install Monitoring Stack on Kubernetes
In the first step, we will install the monitoring on our Kubernetes cluster. We are going to use the kube-prometheus-stack Helm chart for that. It provides preconfigured instances of Prometheus and Grafana. It also comes with several CRD objects that allow us to easily customize monitoring mechanisms according to our needs. Let’s add the following Helm repository:
$ helm repo add prometheus-community \
https://prometheus-community.github.io/helm-charts
Then, we can install the chart in the monitoring
namespace. We can leave the default configuration.
$ helm install kube-prometheus-stack \
prometheus-community/kube-prometheus-stack \
--version 52.1.0 -n monitoring --create-namespace
2. Install Strimzi Operator on Kubernetes
In the next step, we will install the Strimzi operator on Kubernetes using Helm chart. The same as before we need to add the Helm repository:
$ helm repo add strimzi https://strimzi.io/charts
Then, we can proceed to the installation. This time we will override some configuration settings. The Strimzi Helm chart comes with a set of Grafana dashboards to visualize metrics exported by Kafka brokers and some other components managed by Strimzi. We place those dashboards inside the monitoring
namespace. By default, the Strimzi chart doesn’t add the dashboards, so we also need to enable that feature in the values
YAML file. That’s not all. Because we want to run Kafka in the KRaft mode, we need to enable it using feature gates. Enabling the UseKRaft
feature gate requires the KafkaNodePools
feature gate to be enabled as well. Then when we deploy a Kafka cluster in KRaft mode, we also must use the KafkaNodePool
resources. Here’s the full list of overridden Helm chart values:
dashboards:
enabled: true
namespace: monitoring
featureGates: +UseKRaft,+KafkaNodePools,+UnidirectionalTopicOperator
Finally, let’s install the operator in the strimzi
namespace using the following command:
$ helm install strimzi-kafka-operator strimzi/strimzi-kafka-operator \
--version 0.38.0 \
-n strimzi --create-namespace \
-f strimzi-values.yaml
3. Run Kafka in the KRaft Mode
In the current version of Strimzi KRaft mode support is still in the alpha phase. This will probably change soon but for now, we have to deal with some inconveniences. In the previous section, we enabled three feature gates required to run Kafka in KRaft mode. Thanks to that we can finally define our Kafka cluster. In the first step, we need to create a node pool. This new Strimzi object is responsible for configuring brokers and controllers in the cluster. Controllers are responsible for coordinating operations and maintaining the cluster’s state. Fortunately, a single node in the poll can act as a controller and a broker at the same time.
Let’s create the KafkaNodePool
object for our cluster. As you see it defines two roles: broker
and controller
(1). We can also configure storage for the cluster members (2). One of our goals is to avoid sharing the same Kubernetes node between Kafka brokers. Therefore, we will define the podAntiAffinity
section (3). Setting the topologyKey
to kubernetes.io/hostname
indicates that the selected pods are not scheduled on nodes with the same hostname (4).
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: dual-role
namespace: strimzi
labels:
strimzi.io/cluster: my-cluster
spec:
replicas: 3
roles: # (1)
- controller
- broker
storage: # (2)
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 20Gi
deleteClaim: false
template:
pod:
affinity:
podAntiAffinity: # (3)
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: strimzi.io/name
operator: In
values:
- my-cluster-kafka
topologyKey: "kubernetes.io/hostname" # (4)
Once we create a node pool, we can proceed to the Kafka
object creation. We need to enable Kraft mode and node pools for the particular cluster by annotating it with strimzi.io/kraft
and strimzi.io/node-pools
(1). The sections like storage
(2) or zookeeper
(5) are not used in the KRaft mode but are still required by the CRD. We should also configure the cluster metrics exporter (3) and enable the Cruise Control component (4). Of course, our cluster is exposing API for the client connection under the 9092
port.
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: strimzi
annotations: # (1)
strimzi.io/node-pools: enabled
strimzi.io/kraft: enabled
spec:
kafka:
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: '3.6'
storage: # (2)
type: persistent-claim
size: 5Gi
deleteClaim: true
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
version: 3.6.0
replicas: 3
metricsConfig: # (3)
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: kafka-metrics
key: kafka-metrics-config.yml
entityOperator:
topicOperator: {}
userOperator: {}
cruiseControl: {} # (4)
# (5)
zookeeper:
storage:
type: persistent-claim
deleteClaim: true
size: 2Gi
replicas: 3
The metricsConfig
section in the Kafka
object took the ConfigMap
as the configuration source. This ConfigMap
contains a single kafka-metrics-config.yml
entry with the Prometheus rules definition.
kind: ConfigMap
apiVersion: v1
metadata:
name: kafka-metrics
namespace: strimzi
labels:
app: strimzi
data:
kafka-metrics-config.yml: |
lowercaseOutputName: true
rules:
- pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
- pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
broker: "$4:$5"
- pattern: kafka.server<type=(.+), cipher=(.+), protocol=(.+), listener=(.+), networkProcessor=(.+)><>connections
name: kafka_server_$1_connections_tls_info
type: GAUGE
labels:
cipher: "$2"
protocol: "$3"
listener: "$4"
networkProcessor: "$5"
- pattern: kafka.server<type=(.+), clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections
name: kafka_server_$1_connections_software
type: GAUGE
labels:
clientSoftwareName: "$2"
clientSoftwareVersion: "$3"
listener: "$4"
networkProcessor: "$5"
- pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+):"
name: kafka_server_$1_$4
type: GAUGE
labels:
listener: "$2"
networkProcessor: "$3"
- pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+)
name: kafka_server_$1_$4
type: GAUGE
labels:
listener: "$2"
networkProcessor: "$3"
- pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate
name: kafka_$1_$2_$3_percent
type: GAUGE
- pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value
name: kafka_$1_$2_$3_percent
type: GAUGE
- pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value
name: kafka_$1_$2_$3_percent
type: GAUGE
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
"$6": "$7"
quantile: "0.$8"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
quantile: "0.$6"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
quantile: "0.$4"
- pattern: "kafka.server<type=raft-metrics><>(.+-total|.+-max):"
name: kafka_server_raftmetrics_$1
type: COUNTER
- pattern: "kafka.server<type=raft-metrics><>(.+):"
name: kafka_server_raftmetrics_$1
type: GAUGE
- pattern: "kafka.server<type=raft-channel-metrics><>(.+-total|.+-max):"
name: kafka_server_raftchannelmetrics_$1
type: COUNTER
- pattern: "kafka.server<type=raft-channel-metrics><>(.+):"
name: kafka_server_raftchannelmetrics_$1
type: GAUGE
- pattern: "kafka.server<type=broker-metadata-metrics><>(.+):"
name: kafka_server_brokermetadatametrics_$1
type: GAUGE
4. Interacting with Kafka on Kubernetes
Once we apply the KafkaNodePool
and Kafka
objects to the Kubernetes cluster, Strimzi starts provisioning. As a result, you should see the broker pods, a single pod related to Cruise Control, and a metrics exporter pod. Each Kafka broker pod is running on a different Kubernetes node:
Clients can connect Kafka using the my-cluster-kafka-bootstrap
Service
under the 9092
port:
$ kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-cluster-cruise-control ClusterIP 10.96.108.204 <none> 9090/TCP 4m10s
my-cluster-kafka-bootstrap ClusterIP 10.96.155.136 <none> 9091/TCP,9092/TCP,9093/TCP 4m59s
my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,8443/TCP,9092/TCP,9093/TCP 4m59s
In the next step, we will deploy our two apps for producing and consuming messages. The producer
app sends one message per second to the target topic:
@SpringBootApplication
@EnableScheduling
public class KafkaProducer {
private static final Logger LOG = LoggerFactory
.getLogger(KafkaProducer.class);
public static void main(String[] args) {
SpringApplication.run(KafkaProducer.class, args);
}
AtomicLong id = new AtomicLong();
@Autowired
KafkaTemplate<Long, Info> template;
@Value("${POD:kafka-producer}")
private String pod;
@Value("${NAMESPACE:empty}")
private String namespace;
@Value("${CLUSTER:localhost}")
private String cluster;
@Value("${TOPIC:test}")
private String topic;
@Scheduled(fixedRate = 1000)
public void send() {
Info info = new Info(id.incrementAndGet(),
pod, namespace, cluster, "HELLO");
CompletableFuture<SendResult<Long, Info>> result = template
.send(topic, info.getId(), info);
result.whenComplete((sr, ex) ->
LOG.info("Sent({}): {}", sr.getProducerRecord().key(),
sr.getProducerRecord().value()));
}
}
Here’s the Deployment
manifest for the producer
app:
apiVersion: apps/v1
kind: Deployment
metadata:
name: producer
spec:
selector:
matchLabels:
app: producer
template:
metadata:
labels:
app: producer
spec:
containers:
- name: producer
image: piomin/producer
resources:
requests:
memory: 200Mi
cpu: 100m
ports:
- containerPort: 8080
env:
- name: KAFKA_URL
value: my-cluster-kafka-bootstrap
- name: CLUSTER
value: c1
- name: TOPIC
value: test-1
- name: POD
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
Before running the app we can create the test-1
topic with the Strimzi CRD:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: test-1
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 12
replicas: 3
config:
retention.ms: 7200000
segment.bytes: 1000000
The consumer
app is listening for incoming messages. Here’s the bean responsible for receiving and logging messages:
@SpringBootApplication
@EnableKafka
public class KafkaConsumer {
private static final Logger LOG = LoggerFactory
.getLogger(KafkaConsumer.class);
public static void main(String[] args) {
SpringApplication.run(KafkaConsumer.class, args);
}
@Value("${app.in.topic}")
private String topic;
@KafkaListener(id = "info", topics = "${app.in.topic}")
public void onMessage(@Payload Info info,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
LOG.info("Received(key={}, partition={}): {}", key, partition, info);
}
}
Here’s the Deployment
manifest for the consumer
app:
apiVersion: apps/v1
kind: Deployment
metadata:
name: consumer
spec:
selector:
matchLabels:
app: consumer
template:
metadata:
labels:
app: consumer
spec:
containers:
- name: consumer
image: piomin/consumer
resources:
requests:
memory: 200Mi
cpu: 100m
ports:
- containerPort: 8080
env:
- name: TOPIC
value: test-1
- name: KAFKA_URL
value: my-cluster-kafka-bootstrap
We can run both Spring Boot apps using Skaffold. Firstly, we need to go to the kafka
directory in our repository. Then let’s run the following command:
$ skaffold run -n strimzi --tail
Finally, we can verify the logs printed by our apps. As you see, all the messages sent by the producer
app are received by the consumer
app.
5. Kafka Metrics in Prometheus
Once we installed the Strimzi Helm chart with the dashboard.enabled=true
and dashboard.namespace=monitoring
, we have several Grafana dashboard manifests placed in the monitoring
namespace. Each dashboard is represented as a ConfigMap
. Let’s display a list of ConfigMap
s installed by the Strimzi Helm chart:
$ kubectl get cm -n monitoring | grep strimzi
strimzi-cruise-control 1 2m
strimzi-kafka 1 2m
strimzi-kafka-bridge 1 2m
strimzi-kafka-connect 1 2m
strimzi-kafka-exporter 1 2m
strimzi-kafka-mirror-maker-2 1 2m
strimzi-kafka-oauth 1 2m
strimzi-kraft 1 2m
strimzi-operators 1 2m
strimzi-zookeeper 1 2m
Since Grafana is also installed in the monitoring
namespace, it automatically imports all the dashboards from ConfigMap
s annotated with grafana_dashboard
. Consequently, after logging into Grafana (admin
/ prom-operator
), we can easily switch between all the Kafka-related dashboards.
The only problem is that Prometheus doesn’t scrape the metrics exposed by the Kafka pods. Since we have already configured metrics exporting on the Strimzi Kafka
CRD, Kafka pods expose the /metric
endpoint for Prometheus under the 9404
port. Let’s take a look at the Kafka broker pod details:
In order to force Prometheus to scrape metrics from Kafka pods, we need to create the PodMonitor
object. We should place it in the monitoring
(1) namespace and set the release=kube-prometheus-stack
label (2). The PodMonitor
object filters all the pods from the strimzi
namespace (3) that contains the strimzi.io/kind
label having one of the values: Kafka
, KafkaConnect
, KafkaMirrorMaker
, KafkaMirrorMaker2
(4). Also, it has to query the /metrics
endpoint under the port with the tcp-prometheus
name (5).
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: kafka-resources-metrics
namespace: monitoring
labels:
app: strimzi
release: kube-prometheus-stack
spec:
selector:
matchExpressions:
- key: "strimzi.io/kind"
operator: In
values: ["Kafka", "KafkaConnect", "KafkaMirrorMaker", "KafkaMirrorMaker2"]
namespaceSelector:
matchNames:
- strimzi
podMetricsEndpoints:
- path: /metrics
port: tcp-prometheus
relabelings:
- separator: ;
regex: __meta_kubernetes_pod_label_(strimzi_io_.+)
replacement: $1
action: labelmap
- sourceLabels: [__meta_kubernetes_namespace]
separator: ;
regex: (.*)
targetLabel: namespace
replacement: $1
action: replace
- sourceLabels: [__meta_kubernetes_pod_name]
separator: ;
regex: (.*)
targetLabel: kubernetes_pod_name
replacement: $1
action: replace
- sourceLabels: [__meta_kubernetes_pod_node_name]
separator: ;
regex: (.*)
targetLabel: node_name
replacement: $1
action: replace
- sourceLabels: [__meta_kubernetes_pod_host_ip]
separator: ;
regex: (.*)
targetLabel: node_ip
replacement: $1
action: replace
Finally, we can display the Grafana dashboard with Kafka metrics visualization. Let’s choose the dashboard with the “Strimzi Kafka” name. Here’s the general view:
There are several other diagrams available. For example, we can take a look at the statistics related to the incoming and outgoing messages.
6. Rebalancing Kafka with Cruise Control
Let’s analyze the typical scenario around Kafka related to increasing the number of brokers in the cluster. Before we do it, we will generate more incoming traffic to the test-1
topic. In order to do it, we can use the Grafana k6
tool. The k6
tool provides several extensions for load testing – including the Kafka plugin. Here’s the Deployment
manifest that runs k6
with the Kafka extension on Kubernetes.
kind: ConfigMap
apiVersion: v1
metadata:
name: load-test-cm
namespace: strimzi
data:
load-test.js: |
import {
Writer,
SchemaRegistry,
SCHEMA_TYPE_JSON,
} from "k6/x/kafka";
const writer = new Writer({
brokers: ["my-cluster-kafka-bootstrap.strimzi:9092"],
topic: "test-1",
});
const schemaRegistry = new SchemaRegistry();
export default function () {
writer.produce({
messages: [
{
value: schemaRegistry.serialize({
data: {
id: 1,
source: "test",
space: "strimzi",
cluster: "c1",
message: "HELLO"
},
schemaType: SCHEMA_TYPE_JSON,
}),
},
],
});
}
export function teardown(data) {
writer.close();
}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: k6-test
namespace: strimzi
spec:
selector:
matchLabels:
app.kubernetes.io/name: k6-test
template:
metadata:
labels:
app.kubernetes.io/name: k6-test
spec:
containers:
- image: mostafamoradian/xk6-kafka:latest
name: xk6-kafka
command:
- "k6"
- "run"
- "--vus"
- "1"
- "--duration"
- "720s"
- "/tests/load-test.js"
env:
- name: KAFKA_URL
value: my-cluster-kafka-bootstrap
- name: CLUSTER
value: c1
- name: TOPIC
value: test-1
- name: POD
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
volumeMounts:
- mountPath: /tests
name: test
volumes:
- name: test
configMap:
name: load-test-cm
Let’s apply the manifest to the strimzi
namespace with the following command:
$ kubectl apply -f k8s/k6.yaml
After that, we can take a look at the k6
Pod logs. As you see, it generates and sends a lot of messages to the test-1
topic on our Kafka cluster:
Now, let’s increase the number of Kafka brokers in our cluster. We can do it by changing the value of the replicas
field in the KafkaNodePool
object:
$ kubectl scale kafkanodepool dual-role --replicas=4 -n strimzi
After a while, Strimzi will start a new pod with another Kafka broker. Although we have a new member of the Kafka cluster, all the partitions are still distributed only across three previous brokers. The situation would be different for the new topic. However, the partitions related to the existing topics won’t be automatically migrated to the new broker instance. Let’s verify the current partition structure for the test-1
topic with kcat
CLI (I’m exposing Kafka API locally with kubectl port-forward
):
$ kcat -b localhost:9092 -L -t test-1
Metadata for test-1 (from broker -1: localhost:9092/bootstrap):
4 brokers:
broker 0 at my-cluster-dual-role-0.my-cluster-kafka-brokers.strimzi.svc:9092
broker 1 at my-cluster-dual-role-1.my-cluster-kafka-brokers.strimzi.svc:9092
broker 2 at my-cluster-dual-role-2.my-cluster-kafka-brokers.strimzi.svc:9092
broker 3 at my-cluster-dual-role-3.my-cluster-kafka-brokers.strimzi.svc:9092 (controller)
1 topics:
topic "test-1" with 12 partitions:
partition 0, leader 0, replicas: 0,1,2, isrs: 1,0,2
partition 1, leader 1, replicas: 1,2,0, isrs: 1,0,2
partition 2, leader 2, replicas: 2,0,1, isrs: 1,0,2
partition 3, leader 0, replicas: 0,1,2, isrs: 1,0,2
partition 4, leader 1, replicas: 1,2,0, isrs: 1,0,2
partition 5, leader 2, replicas: 2,0,1, isrs: 1,0,2
partition 6, leader 0, replicas: 0,1,2, isrs: 1,0,2
partition 7, leader 1, replicas: 1,2,0, isrs: 1,0,2
partition 8, leader 2, replicas: 2,0,1, isrs: 1,0,2
partition 9, leader 0, replicas: 0,2,1, isrs: 1,0,2
partition 10, leader 2, replicas: 2,1,0, isrs: 1,0,2
partition 11, leader 1, replicas: 1,0,2, isrs: 1,0,2
Here comes Cruise Control. Cruise Control makes managing and operating Kafka much easier. For example, it allows us to move partitions across brokers after scaling up the cluster. Let’s see how it works. We have already enabled Cruise Control in the Strimzi Kafka
CRD. In order to begin a rebalancing procedure, we should create the KafkaRebalance
object. This object is responsible for asking Cruise Control to generate an optimization proposal.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
labels:
strimzi.io/cluster: my-cluster
spec: {}
If the optimization proposal is ready you will see the ProposalReady
value in the Status.Conditions.Type
field. I won’t get into the details of Cruise Control. It suggested moving 58 partition replicas between separate brokers in the cluster.
Let’s accept the proposal by annotating the KafkaRebalance
object with strimzi.io/rebalance=approve
:
$ kubectl annotate kafkarebalance my-rebalance \
strimzi.io/rebalance=approve -n strimzi
Finally, we can run the kcat
command on the test-1 topic once again. Now, as you see, partition replicas are spread across all the brokers.
$ kcat -b localhost:9092 -L -t test-1
Metadata for test-1 (from broker -1: localhost:9092/bootstrap):
4 brokers:
broker 0 at my-cluster-dual-role-0.my-cluster-kafka-brokers.strimzi.svc:9092
broker 1 at my-cluster-dual-role-1.my-cluster-kafka-brokers.strimzi.svc:9092
broker 2 at my-cluster-dual-role-2.my-cluster-kafka-brokers.strimzi.svc:9092
broker 3 at my-cluster-dual-role-3.my-cluster-kafka-brokers.strimzi.svc:9092 (controller)
1 topics:
topic "test-1" with 12 partitions:
partition 0, leader 2, replicas: 2,1,3, isrs: 1,2,3
partition 1, leader 1, replicas: 1,2,0, isrs: 1,0,2
partition 2, leader 2, replicas: 0,2,1, isrs: 1,0,2
partition 3, leader 0, replicas: 0,2,3, isrs: 0,2,3
partition 4, leader 1, replicas: 3,2,1, isrs: 1,2,3
partition 5, leader 2, replicas: 2,3,0, isrs: 0,2,3
partition 6, leader 0, replicas: 0,1,2, isrs: 1,0,2
partition 7, leader 1, replicas: 3,1,0, isrs: 1,0,3
partition 8, leader 2, replicas: 2,0,1, isrs: 1,0,2
partition 9, leader 0, replicas: 0,3,1, isrs: 1,0,3
partition 10, leader 2, replicas: 2,3,0, isrs: 0,2,3
partition 11, leader 1, replicas: 1,0,3, isrs: 1,0,3
Final Thoughts
Strimzi allows us not only to install and manage Kafka but also the whole ecosystem around it. In this article, I showed how to export metrics to Prometheus and use the Cruise Control tool to rebalance a cluster after scale-up. We also ran Kafka in KRaft mode and then connected two simple Java apps with the cluster through Kubernetes Service
.
6 COMMENTS