Kafka Offset with Spring Boot

Kafka Offset with Spring Boot

In this article, you will learn how to manage Kafka consumer offset with Spring Boot and Spring Kafka. An inspiration for preparing this article was the feedback I received after publishing the post describing concurrency with Kafka and Spring Boot. You were asking me questions related not only to concurrency but also to the consumer offset committing process. In the previous article, I focused mostly on showing that the way how the app handles Kafka messages may impact the overall performance of our system. I didn’t consider things like message duplicates or losing messages on the consumer side. Today, we are going to discuss exactly those topics.

If you are interested in Kafka and Spring Boot you can find several articles about it on my blog. Except for the already mentioned post about concurrency, you can read e.g. about Kafka transactions here. On the other hand, to read about microservices with Kafka and Spring Boot refer to the following article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Introduction

Before we start, we need to clarify some important things related to committing offsets with Spring Kafka. First of all, by default, Spring Kafka sets the consumer enable.auto.commit property to false. It means that the framework, not Kafka, is responsible for committing an offset. Of course, we can change the default behavior by setting that property to true. By the way, it was the default approach before Spring Kafka 2.3. 

Once we stay with Kafka auto committing disabled, we can leverage 7 different commit strategies provided by Spring Kafka. Today, we won’t analyze all of them, but just the most significant. The default strategy is BATCH. In order to set the different strategy, we need to override the AckMode e.g. by setting a value of the property spring.kafka.listener.ack-mode in Spring Boot application properties. However, firstly, let’s focus on the BATCH mode.

Sample Spring Boot Kafka Apps

In order to test the offset committing with Spring Kafka, we will create two simple apps: producer and consumer. Producer sends a defined number of messages to the topic, while the consumer receives and processes them. Here’s the producer @RestController implementation. It allows us to send a defined number of messages to the transactions topic on demand:

@RestController
public class TransactionsController {

   private static final Logger LOG = LoggerFactory
            .getLogger(TransactionsController.class);

   long id = 1;
   long groupId = 1;
   KafkaTemplate<Long, Order> kafkaTemplate;

   @PostMapping("/transactions")
   public void generateAndSendMessages(@RequestBody InputParameters inputParameters) {
      for (long i = 0; i < inputParameters.getNumberOfMessages(); i++) {
         Order o = new Order(id++, i+1, i+2, 1000, "NEW", groupId);
         CompletableFuture<SendResult<Long, Order>> result =
                 kafkaTemplate.send("transactions", o.getId(), o);
         result.whenComplete((sr, ex) ->
                 LOG.info("Sent({}): {}", sr.getProducerRecord().key(), sr.getProducerRecord().value()));
      }
      groupId++;
   }

}

Here are the producer app configuration properties. We need to set the address of a Kafka broker, and serializer classes for a key (Long) and a value (JSON format).

spring:
  application.name: producer
  kafka:
    bootstrap-servers: ${KAFKA_URL}
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

We can trigger the process of sending messages, by calling the POST /transactions endpoint as shown below:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d '{"numberOfMessages":10}'

Here’s the consumer app listener bean implementation. As you see, it is very simple. It just receives and prints the messages. We are sleeping the thread for 10 seconds, just to be able to easily check the offset on the Kafka topic during the test.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
          .getLogger(NoTransactionsListener.class);

   @KafkaListener(
          id = "transactions",
          topics = "transactions",
          groupId = "a"
   )
   public void listen(@Payload Order order,
                      @Header(KafkaHeaders.OFFSET) Long offset,
                      @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) throws InterruptedException {
      LOG.info("[partition={},offset={}] Starting: {}", partition, offset, order);
      Thread.sleep(10000L);
      LOG.info("[partition={},offset={}] Finished: {}", partition, offset, order);
   }

}

In order to see what exactly happens in the consumer app, we need to increase the default logging level for Spring Kafka to DEBUG. There are also some other properties related to the serialization and deserialization of messages in the application properties. Here’s the whole application.yml file for the consumer app:

spring:
  application.name: consumer
  output.ansi.enabled: ALWAYS
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

logging.level:
  org.springframework.kafka: debug

Understanding How Spring Kafka Commits the Offset

Then, let’s start both our apps by executing the Maven commands visible below. I assume that you already have the Kafka broker running. Once the consumer is connected to the transactions topic, we can send 10 messages by calling the already-mentioned POST /transactions endpoint. After that, we will switch the consumer app logs. We can see all significant information related to the offset committing.

# build the whole project
$ mvn clean package

# run the consumer app
$ cd consumer
$ mvn spring-boot:run

# run the producer app
$ cd producer
$ mvn spring-boot:run

So, here are the logs from our test. I highlighted the most important parts. Of course, your results may differ slightly, but the rules are the same. First of all, the consumer receives a batch of messages. In that case, there are 2 messages, but for example, it consumes 7 in one step in the next partition. Without detailed logs, you won’t even be aware of how it behaves, since the message listener processes a message after a message. However, the offset commit action is performed after processing all the consumed messages. That’s because we have the AckMode set to BATCH.

spring-kafka-offset-batch

Of course, it doesn’t have any impact on the app… as long as it is running. In case if not a graceful restart or crash occurs in the time between starting processing of batch messages and offset commit action it may cause some problems. Don’t get me wrong – it’s a standard situation that results in message duplicates on the consumer side. So, now our app consumes 7 messages in a batch. Let’s stop it during batch processing as shown below. By the way, with a graceful shutdown, Spring Kafka waits until the last message in the batch is processed. Therefore, I simulated an immediate stop with the SIGKILL option for the testing purposes.

spring-kafka-offset-batch-kill

The consumer offset has not been committed. We can verify it by checking out the current value of the consumer offset on the 1 partition. You can compare that value with the values highlighted in the logs above.

Then, let’s start our consumer app once again. The app starts reading messages from the latest committed offset for each partition. Consequently, it processes several messages already processed previously, before killing the consumer instance. As you see, the consumer app is processing orders with the 3, 5, 6, 8, and 10 id once again. We need to take such situations into account during the business logic implementation. After processing the last message in batch, Spring Kafka commits the offset.

spring-kafka-offset-batch-duplicates

Finally, everything works fine. There is no customer lag on any partition.

Using the RECORD Mode to Commit Offset

In the next step, we will compare a similar situation with the AckMode set to RECORD. According to the Spring Kafka docs the RECORD mode “commits the offset when the listener returns after processing the record”. In order to enable the RECORD mode, we need to set the following in the application.yml file:

spring.kafka.listener.ack-mode: RECORD

Then, we have to restart the consumer app. After that, we can trigger the process of sending messages once again, by calling the POST /transactions endpoint exposed by the producer app:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d '{\"numberOfMessages\":10}'

Let’s switch to the logs. As you see, each time after processing a single record by the @KafkaListener method Spring Kafka commits the offset. I guess that some of you assumed that this was the default behavior (not the BATCH mode) 🙂 That approach decreases the potential number of duplicate messages after the restart, but on the other hand, impacts the overall performance of the consumer.

spring-kafka-offset-record

The latest committed customer offset visible in the logs is 8. So, if we switch to the GUI client we can verify that the current offset there has the same value.

Graceful Shutdown

Although our app commits the offset each time after processing a single record, in the graceful shutdown Spring Boot waits until the whole batch is processed. As you see, I initiated the shutdown procedure at 15:12:41, but the container performed a shutdown after a 30-second timeout. That’s because I included 10 seconds of thread sleep in the processing method. It results in the total time of processing the batch of messages higher than 30 seconds.

spring-kafka-offset-batch-shutdown

However, we can change that behavior. We need to set the spring.kafka.listener.immediate-stop property to true. That property decides whether the container stops after the current record is processed or after all the records from the previous poll are processed.

spring.kafka.listener.immediate-stop: true

After restarting the consumer app we need to take a look at the logs once again. The Spring container starts a shutdown procedure just after committing the offset after processing the last record.

Spring Kafka Offset and Concurrency

Processing Messages with the Custom Thread Pool

Finally, the last scenario in our article. Let’s consider the case when we are using the custom thread to handle messages received by the @KafkaListener method. In order to do that, we can define the ExecutorService object. Once the listenAsync() method receives the message it delegates processing to the Processor bean by calling its process() method using the ExecutorService object.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   ExecutorService executorService = Executors.newFixedThreadPool(30);

   @Autowired
   private Processor processor;

   @KafkaListener(
          id = "transactions-async",
          topics = "transactions-async",
          groupId = "a"
   )
   public void listenAsync(@Payload Order order,
                     @Header(KafkaHeaders.OFFSET) Long offset,
                     @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("[partition={},offset={}] Starting Async: {}", partition, offset, order);
      executorService.submit(() -> processor.process(order));
   }
}

In the Processor bean, we are sleeping the thread for 10 seconds for testing purposes. The process() method doesn’t do anything important, it just prints the log at the start and before finishing.

@Service
public class Processor {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   public void process(Order order) {
      LOG.info("Processing: {}", order.getId());
      try {
         Thread.sleep(10000L);
      } catch (InterruptedException e) {
         throw new RuntimeException(e);
      }
      LOG.info("Finished: {}", order.getId());
   }

}

Let’s analyze what will happen after sending some message to such a consumer. This time we are using the transaction-async topic. By default, Spring Kafka commits the offset after processing the whole batch of 4 received messages. However, it happens almost immediately after receiving the messages, because we are delegating the further processing to another thread. The asynchronous method finishes processing after 10 seconds. If your app crashes during those 10 seconds, it will result in losing messages. They won’t be processed by the new instance of the app, because the offset has already been committed before message processing has been finished.

Enable Manual Offset Commit

Once again, it is a normal situation, that we can lose messages with the Kafka consumer. However, we can handle such cases slightly differently. Instead of relying on the container-managed offset commitment, we can switch to the manual mode. First of all, let’s add the following property to the Spring Boot application.yml file:

spring.kafka.listener.ack-mode: MANUAL_IMMEDIATE

Then we need to leverage the Acknowledgment interface to take a control over the offset commitment process inside the listener. As you see, we have to include such an interface to the @KafkaListener method parameters. After that, we can pass it to the process() method running in the different thread.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   ExecutorService executorService = Executors.newFixedThreadPool(30);

   @Autowired
   private Processor processor;

   @KafkaListener(
          id = "transactions-async",           
          topics = "transactions-async",
          groupId = "a"
   )
   public void listenAsync(@Payload Order order,
                     Acknowledgment acknowledgment,
                     @Header(KafkaHeaders.OFFSET) Long offset,
                     @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("[partition={},offset={}] Starting Async: {}", partition, offset, order);
      executorService.submit(() -> processor.process(order, acknowledgment));
   }
}

With the acknowledge() provided by the Acknowledgment interface we can manually commit the offset in the selected location in the code. Here, we are making a commit at the of the whole method.

@Service
public class Processor {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   public void process(Order order, Acknowledgment acknowledgment) {
      LOG.info("Processing: {}", order.getId());
      try {
         Thread.sleep(10000L);
      } catch (InterruptedException e) {
         throw new RuntimeException(e);
      }
      LOG.info("Finished: {}", order.getId());
      acknowledgment.acknowledge();
   }

}

Let’s switch to the consumer app logs once again. As you the offset commit happens almost immediately after processing the message. By the way, the MANUAL (instead of MANUAL_IMMEDIATE) AckMode will wait with the commit until the whole batch records will be processed. Another thing worth mentioning here is a possibility of out-of-order commit. It is disabled by default for the Spring Boot app. In order to enable it we need to set the spring.kafka.listener.async-acks property to true. If you want to test such a scenario by yourself you can increase the number of messages sent by the producer with the numberOfMessages field e.g. to 100. Then verify the consumer lag with or without the asyncAcks property.

Finally, let’s verify the current committed offset for all the partitions using the GUI client.

Final Thoughts

Kafka consumer offset is a very interesting topic. If you want to understand Kafka, you first need to understand how consumers commit the offset on partitions. In this article, I focused on showing you how to switch between different acknowledgment modes with Spring Kafka and how impacts on your app.

2 COMMENTS

comments user
Maciej Kaminski

Nice overview.
Didn’t realize BATCH is the default ack mode.
Regards.

    comments user
    piotr.minkowski

    Thanks. Yes, in Spring Kafka it’s a default mode.

Leave a Reply