Autoscaling on Kubernetes with KEDA and Kafka

Autoscaling on Kubernetes with KEDA and Kafka

In this article, you will learn how to autoscale your application that consumes messages from the Kafka topic with KEDA. The full name that stands behind that shortcut is Kubernetes Event Driven Autoscaling. In order to explain the idea behind it, I will create two simple services. The first of them is sending events to the Kafka topic, while the second is receiving them. We will run both these applications on Kubernetes. To simplify the exercise, we may use Spring Cloud Stream, which offers a smart integration with Kafka.

Architecture

Before we start, let’s take a moment to understand our scenario for today. We have a single Kafka topic used by both our applications to exchange events. This topic consists of 10 partitions. There is also a single instance of the producer that sends events at regular intervals. We are going to scale down and scale up the number of pods for the consumer service. All the instances of the consumer service are assigned to the same Kafka consumer group. It means that only a single instance with the group may receive the particular event.

Each consumer instance has only a single receiving thread. Therefore, we can easily simulate an event processing time. We will sleep the main thread for 1 second. On the other hand, the producer will send events with a variable speed. Also, it will split the messages across all available partitions. Such behavior may result in consumer lag on partitions because Spring Cloud Stream commits offset only after handling a message. In our case, the value of lag depends on producer speed and the number of running consumer instances. To clarify let’s take a look at the diagram below.

keda-kafka-arch1

Our goal is very simple. We need to adjust the number of consumer instances to the traffic rate generated by the producer service. The value of offset lag can’t exceed the desired threshold. If we increase the traffic rate on the producer side KEDA should scale up the number of consumer instances. Consequently, if we decrease the producer traffic rate it should scale down the number of consumer instances. Here’s the diagram with our scenario.

keda-kafka-arch2

Use Kafka with Spring Cloud Stream

In order to use Spring Cloud Stream for Kafka, we just need to include a single dependency in Maven pom.xml:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

After that, we can use a standard Spring Cloud Stream model. However, in the background, it integrates with Kafka through a particular binder implementation. I will not explain the details, but if you are interested please read the following article. It explains the basics at the example of RabbitMQ.

Both our applications are very simple. The producer just generates and sends events (by default in JSON format). The only thing we need to do in the code is to declare the Supplier bean. In the background, there is a single thread that generates and sends CallmeEvent every second. Each time it only increases the id field inside the message:

@SpringBootApplication
public class ProducerApp {

   private static int id = 0;

   public static void main(String[] args) {
      SpringApplication.run(ProducerApp.class, args);
   }

   @Bean
   public Supplier<CallmeEvent> eventSupplier() {
      return () -> new CallmeEvent(++id, "Hello" + id, "PING");
   }

}

We can change a default fixed delay between the Supplier ticks with the following property. Let’s say we want to send an event every 100 ms:

spring.cloud.stream.poller.fixedDelay = 100

We should also provide basic configuration like the Kafka address, topic name (if different than the name of the Supplier function), number of partitions, and a partition key. Spring Cloud Stream automatically creates topics on application startup.

spring.cloud.stream.bindings.eventSupplier-out-0.destination = test-topic
spring.cloud.stream.bindings.eventSupplier-out-0.producer.partitionKeyExpression = payload.id
spring.cloud.stream.bindings.eventSupplier-out-0.producer.partitionCount = 10
spring.kafka.bootstrap-servers = one-node-cluster.redpanda:9092

Now, the consumer application. It is also not very complicated. As I mentioned before, we will sleep the main thread inside the receiving method in order to simulate processing time.

public class ConsumerApp {

   private static final Logger LOG = LoggerFactory.getLogger(ConsumerAApp.class);

   public static void main(String[] args) {
      SpringApplication.run(ConsumerApp.class, args);
   }

   @Bean
   public Consumer<Message<CallmeEvent>> eventConsumer() {
      return event -> {
         LOG.info("Received: {}", event.getPayload());
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) { }
      };
   }

}

Finally, the configuration on the consumer side. It is important to set the consumer group and enable partitioning.

spring.cloud.stream.bindings.eventConsumer-in-0.destination = test-topic
spring.cloud.stream.bindings.eventConsumer-in-0.group = a
spring.cloud.stream.bindings.eventConsumer-in-0.consumer.partitioned = true
spring.kafka.bootstrap-servers = one-node-cluster.redpanda:9092

Now, we should deploy both applications on Kubernetes. But before we do that, let’s install Kafka and KEDA on Kubernetes.

Install Kafka on Kubernetes

To perform this part you need to install helm. Instead of Kafka directly, we can install Redpanda. It is a Kafka API compatible platform. However, the Redpanda operator requires cert-manager to create certificates for TLS communication. So, let’s install it first. We use the latest version of the cert-manager. It requires adding CRDs:

$ kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.6.1/cert-manager.crds.yaml

Then you need to add a new Helm repository:

$ helm repo add jetstack https://charts.jetstack.io

And finally, install it cert-manager:

$ helm install cert-manager \
   --namespace cert-manager \
   --version v1.6.1 \
   jetstack/cert-manager

Now, we can proceed to the Redpanda installation. The same as before, let’s add the Helm repository:

$ helm repo add redpanda https://charts.vectorized.io/
$ helm repo update

We can obtain the latest version of Redpanda:

$ export VERSION=$(curl -s https://api.github.com/repos/vectorizedio/redpanda/releases/latest | jq -r .tag_name)

Then, let’s add the CRDs:

$ kubectl apply -f https://github.com/vectorizedio/redpanda/src/go/k8s/config/crd

After that, we can finally install the Redpanda operator:

$ helm install \
   redpanda-operator \
   redpanda/redpanda-operator \
   --namespace redpanda-system \
   --create-namespace \
   --version $VERSION

We will install a single-node cluster in the redpanda namespace. To do that we need to apply the following manifest:

apiVersion: redpanda.vectorized.io/v1alpha1
kind: Cluster
metadata:
  name: one-node-cluster
spec:
  image: "vectorized/redpanda"
  version: "latest"
  replicas: 1
  resources:
    requests:
      cpu: 1
      memory: 1.2Gi
    limits:
      cpu: 1
      memory: 1.2Gi
  configuration:
    rpcServer:
      port: 33145
    kafkaApi:
    - port: 9092
    pandaproxyApi:
    - port: 8082
    adminApi:
    - port: 9644
    developerMode: true

Once you did that, you can verify a list of pods in the redpanda namespace:

$ kubectl get pod -n redpanda                      
NAME                 READY   STATUS    RESTARTS   AGE
one-node-cluster-0   1/1     Running   0          4s

If you noticed, I have already set a Kafka bootstrap server address in the application.properties. For me, it is one-node-cluster.redpanda:9092. You can verify it using the following command:

$ kubectl get svc -n redpanda
NAME               TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                      AGE
one-node-cluster   ClusterIP   None         <none>        9644/TCP,9092/TCP,8082/TCP   23h

Install KEDA and integrate it with Kafka

The same as before we will install KEDA on Kubernetes with Helm. Let’s add the following Helm repo:

$ helm repo add kedacore https://kedacore.github.io/charts

Don’t forget to update the repository. We will install the operator in the keda namespace. Let’s create the namespace first:

$ kubectl create namespace keda

Finally, we can install the operator:

$ helm install keda kedacore/keda --namespace keda

I will run both example applications in the default namespace, so I will create a KEDA object also in this namespace. The main object responsible for configuring autoscaling with KEDA is ScaledObject. Here’s the definition:

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: consumer-scaled
spec:
  scaleTargetRef:
    name: consumer-deployment # (1)
  cooldownPeriod: 30 # (2)
  maxReplicaCount:  10 # (3)
  advanced:
    horizontalPodAutoscalerConfig: # (4)
      behavior:
        scaleDown:
          stabilizationWindowSeconds: 30
          policies:
            - type: Percent
              value: 50
              periodSeconds: 30
  triggers: # (5)
    - type: kafka
      metadata:
        bootstrapServers: one-node-cluster.redpanda:9092
        consumerGroup: a
        topic: test-topic
        lagThreshold: '5'

Let’s analyze the configuration in the details:

(1) We are setting autoscaler for the consumer application, which is deployed under the consumer-deployment name (see the next section for the Deployment manifest)

(2) We decrease the default value of the cooldownPeriod parameter from 300 seconds to 30 in order to test the scale-to-zero mechanism

(3) The maximum number of running pods is 10 (the same as the number of partitions in the topic) instead of the default 100

(4) We can customize the behavior of the Kubernetes HPA. Let’s do that for the scale-down operation. We could as well configure that for the scale-up operation. We allow to scale down 50% of current running replicas.

(5) The last and the most important part – a trigger configuration. We should set the address of the Kafka cluster, the name of the topic, and the consumer group used by our application. The lag threshold is 10. It sets the average target value of offset lags to trigger scaling operations.

Before applying the manifest containing ScaledObject we need to deploy the consumer application. Let’s proceed to the next section.

Test Autoscaling with KEDA and Kafka

Let’s deploy the consumer application first. It is prepared to be deployed with Skaffold, so you can just run the command skaffold dev from the consumer directory. Anyway, here’s the Deployment manifest:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-deployment
spec:
  selector:
    matchLabels:
      app: consumer
  template:
    metadata:
      labels:
        app: consumer
    spec:
      containers:
      - name: consumer
        image: piomin/consumer
        ports:
        - containerPort: 8080

Once we created it, we can also apply the KEDA ScaledObject. After creating let’s display its status with the kubectl get so command.

keda-kafka-scaledobject

Ok, but… it is inactive. If you think about that’s logical since there are no incoming events in Kafka’s topic. Right? So, KEDA has performed a scale-to-zero operation as shown below:

Now, let’s deploy the producer application. For now, DO NOT override the default value of the spring.cloud.stream.poller.maxMessagesPerPoll parameter. The producer will send one event per second.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
        - name: producer
          image: piomin/producer
          ports:
            - containerPort: 8080

After some time you can run the kubectl get so once again. Now the status in the ACTIVE column should be true. And there is a single instance of the consumer application (1 event per second sent by a producer and received by a consumer).

We can also verify offsets and lags for consumer groups on the topic partitions. Just run the command rpk group describe a inside the Redpanda container.

Now, we will change the traffic rate on the producer side. It will send 5 events per second instead of 1 event before. To do that we have to define the following property in the application.properties file.

spring.cloud.stream.poller.fixedDelay = 200

Before KEDA performs autoscaling we still have a single instance of the consumer application. Therefore, after some time the lag on partitions will exceed the desired threshold as shown below.

Once autoscaling occurs we can display a list of deployments. As you see, now there are 5 running pods of the consumer service.

keda-kafka-scaling

Once again let’s verify the status of our Kafka consumer group. There 5 consumers on the topic partitions.

Finally, just to check it out – let’s undeploy the producer application. What happened? The consumer-deployment has been scaled down to zero.

Final Thoughts

You can use KEDA not only with Kafka. There are a lot of other options available including databases, different message brokers, or even cron. Here’s a full list of the supported tools. In this article, I showed how to use Kafka consumer offset and lag as a criterium for autoscaling with KEDA. I tried to explain this process in the detail. Hope it helps you to understand how KEDA exactly works 🙂

20 COMMENTS

comments user
amir choubani

Do you have link to github repo for source code ?

comments user
Pratik

If consumer will increase then kafka rebalancing will also happen… Then what is the need to this

    comments user
    piotr.minkowski

    Yes, rebalancing will happen always if run a new instance in the consumer group no matter if scale it manually or automatically. But what’s the problem with that?

comments user
Ramesh

Excellent

    comments user
    piotr.minkowski

    Thanks 🙂

comments user
shruti

Wonder how “scale in” would work for long processing tasks . As KEDA will scale down or kill instances of services, even if service is in middle of doing task .

    comments user
    piotr.minkowski

    It is not a KEDA case. The same rules apply to any application running on Kubernetes. Your application is shutdown gracefully

comments user
Shibanshu Ghosh

What will be the effect if multiple scalers are used to auto scale a service? For example, Kafka, Redis and Prometheus all together. Will there be a conflict?

    comments user
    piotr.minkowski

    Well, it depends. What happens, for example, if lag in Kafka decreases, while queue length in Redis increases. What would you like to achieve?

comments user
adry

nice post, but too bad that the example and command lines don’t work anymore

    comments user
    piotr.minkowski

    Which commands do not work?

comments user
Marek

Hello Piotr! Great work. Thank you for sharing! I’ve been doing similar KEDA + Kafka experiments just based on [kedacore samples](https://github.com/kedacore/sample-python-kafka-azure-function) with custom consumer and producer written in Python, now thanks to your article I’ve also tried with Redpanda and it too works great.
BTW I needed to add “?ref=$VERSION” in `kubectl apply \
-k https://github.com/redpanda-data/redpanda/src/go/k8s/config/crd?ref=$VERSION` [found it in the redpanda docs](https://docs.redpanda.com/docs/quickstart/kubernetes-qs-cloud/). Cheers

comments user
Tim

I think rebalancing cost matters. Do you have any thoughts on that? ref https://github.com/kedacore/keda/issues/2401

    comments user
    piotr.minkowski

    In my opinion, two different things are described there: the nature of how KEDA and HPA work and the nature of Kafka’s rebalancing. Yes, with Kafka scaling costs, due to the rebalancing mechanism. Currently, with KEDA we can just analyze the current consumer lag, and based on that value perform autoscaling.

comments user
Sanyam

Let’s say I have 6 partitions and 2 consumers (each consumer listening to 3 partitions). If lag increases to 6 on any of the partition and let’s assume using above KEDA, it increases consumers from 2 to 3. In that case will Kafka automatically re-map partitions (2 partitions per consumer) ?

    comments user
    piotr.minkowski

    rebalance. yes

comments user
Alex

Good example, thanks.
My question is about the situation when consumer application is listening to 10 different topics. What is the correct approach to configure deployment? Should I create ScaledObjecs for each topic or I can combine the logic for all topics somehow, let say default config for all existing topics.

    comments user
    piotr.minkowski

    Well, defining multiple autoscalers may result in conflicts, e.g. for one topic it will scale up the app, while for the second topic it will do the scale down.So, it is not the best idea.

Leave a Reply