Kafka Offset with Spring Boot
In this article, you will learn how to manage Kafka consumer offset with Spring Boot and Spring Kafka. An inspiration for preparing this article was the feedback I received after publishing the post describing concurrency with Kafka and Spring Boot. You were asking me questions related not only to concurrency but also to the consumer offset committing process. In the previous article, I focused mostly on showing that the way how the app handles Kafka messages may impact the overall performance of our system. I didn’t consider things like message duplicates or losing messages on the consumer side. Today, we are going to discuss exactly those topics.
If you are interested in Kafka and Spring Boot you can find several articles about it on my blog. Except for the already mentioned post about concurrency, you can read e.g. about Kafka transactions here. On the other hand, to read about microservices with Kafka and Spring Boot refer to the following article.
Source Code
If you would like to try it by yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.
Introduction
Before we start, we need to clarify some important things related to committing offsets with Spring Kafka. First of all, by default, Spring Kafka sets the consumer enable.auto.commit
property to false
. It means that the framework, not Kafka, is responsible for committing an offset. Of course, we can change the default behavior by setting that property to true
. By the way, it was the default approach before Spring Kafka 2.3.
Once we stay with Kafka auto committing disabled, we can leverage 7 different commit strategies provided by Spring Kafka. Today, we won’t analyze all of them, but just the most significant. The default strategy is BATCH
. In order to set the different strategy, we need to override the AckMode
e.g. by setting a value of the property spring.kafka.listener.ack-mode
in Spring Boot application properties. However, firstly, let’s focus on the BATCH
mode.
Sample Spring Boot Kafka Apps
In order to test the offset committing with Spring Kafka, we will create two simple apps: producer
and consumer
. Producer sends a defined number of messages to the topic, while the consumer receives and processes them. Here’s the producer @RestController
implementation. It allows us to send a defined number of messages to the transactions
topic on demand:
@RestController
public class TransactionsController {
private static final Logger LOG = LoggerFactory
.getLogger(TransactionsController.class);
long id = 1;
long groupId = 1;
KafkaTemplate<Long, Order> kafkaTemplate;
@PostMapping("/transactions")
public void generateAndSendMessages(@RequestBody InputParameters inputParameters) {
for (long i = 0; i < inputParameters.getNumberOfMessages(); i++) {
Order o = new Order(id++, i+1, i+2, 1000, "NEW", groupId);
CompletableFuture<SendResult<Long, Order>> result =
kafkaTemplate.send("transactions", o.getId(), o);
result.whenComplete((sr, ex) ->
LOG.info("Sent({}): {}", sr.getProducerRecord().key(), sr.getProducerRecord().value()));
}
groupId++;
}
}
Here are the producer
app configuration properties. We need to set the address of a Kafka broker, and serializer classes for a key (Long
) and a value (JSON format).
spring:
application.name: producer
kafka:
bootstrap-servers: ${KAFKA_URL}
producer:
key-serializer: org.apache.kafka.common.serialization.LongSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
We can trigger the process of sending messages, by calling the POST /transactions
endpoint as shown below:
$ curl -X 'POST' 'http://localhost:8080/transactions' \
-H 'Content-Type: application/json' \
-d '{"numberOfMessages":10}'
Here’s the consumer
app listener bean implementation. As you see, it is very simple. It just receives and prints the messages. We are sleeping the thread for 10
seconds, just to be able to easily check the offset on the Kafka topic during the test.
@Service
public class Listener {
private static final Logger LOG = LoggerFactory
.getLogger(NoTransactionsListener.class);
@KafkaListener(
id = "transactions",
topics = "transactions",
groupId = "a"
)
public void listen(@Payload Order order,
@Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition) throws InterruptedException {
LOG.info("[partition={},offset={}] Starting: {}", partition, offset, order);
Thread.sleep(10000L);
LOG.info("[partition={},offset={}] Finished: {}", partition, offset, order);
}
}
In order to see what exactly happens in the consumer
app, we need to increase the default logging level for Spring Kafka to DEBUG
. There are also some other properties related to the serialization and deserialization of messages in the application properties. Here’s the whole application.yml
file for the consumer
app:
spring:
application.name: consumer
output.ansi.enabled: ALWAYS
kafka:
bootstrap-servers: ${KAFKA_URL:localhost}:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
logging.level:
org.springframework.kafka: debug
Understanding How Spring Kafka Commits the Offset
Then, let’s start both our apps by executing the Maven commands visible below. I assume that you already have the Kafka broker running. Once the consumer is connected to the transactions
topic, we can send 10
messages by calling the already-mentioned POST /transactions
endpoint. After that, we will switch the consumer
app logs. We can see all significant information related to the offset committing.
# build the whole project
$ mvn clean package
# run the consumer app
$ cd consumer
$ mvn spring-boot:run
# run the producer app
$ cd producer
$ mvn spring-boot:run
So, here are the logs from our test. I highlighted the most important parts. Of course, your results may differ slightly, but the rules are the same. First of all, the consumer receives a batch of messages. In that case, there are 2
messages, but for example, it consumes 7 in one step in the next partition. Without detailed logs, you won’t even be aware of how it behaves, since the message listener processes a message after a message. However, the offset commit action is performed after processing all the consumed messages. That’s because we have the AckMode
set to BATCH
.
Of course, it doesn’t have any impact on the app… as long as it is running. In case if not a graceful restart or crash occurs in the time between starting processing of batch messages and offset commit action it may cause some problems. Don’t get me wrong – it’s a standard situation that results in message duplicates on the consumer side. So, now our app consumes 7 messages in a batch. Let’s stop it during batch processing as shown below. By the way, with a graceful shutdown, Spring Kafka waits until the last message in the batch is processed. Therefore, I simulated an immediate stop with the SIGKILL
option for the testing purposes.
The consumer offset has not been committed. We can verify it by checking out the current value of the consumer offset on the 1
partition. You can compare that value with the values highlighted in the logs above.
Then, let’s start our consumer app once again. The app starts reading messages from the latest committed offset for each partition. Consequently, it processes several messages already processed previously, before killing the consumer instance. As you see, the consumer
app is processing orders with the 3
, 5
, 6
, 8
, and 10
id once again. We need to take such situations into account during the business logic implementation. After processing the last message in batch, Spring Kafka commits the offset.
Finally, everything works fine. There is no customer lag on any partition.
Using the RECORD Mode to Commit Offset
In the next step, we will compare a similar situation with the AckMode
set to RECORD
. According to the Spring Kafka docs the RECORD
mode “commits the offset when the listener returns after processing the record”. In order to enable the RECORD
mode, we need to set the following in the application.yml
file:
spring.kafka.listener.ack-mode: RECORD
Then, we have to restart the consumer
app. After that, we can trigger the process of sending messages once again, by calling the POST /transactions
endpoint exposed by the producer
app:
$ curl -X 'POST' 'http://localhost:8080/transactions' \
-H 'Content-Type: application/json' \
-d '{\"numberOfMessages\":10}'
Let’s switch to the logs. As you see, each time after processing a single record by the @KafkaListener
method Spring Kafka commits the offset. I guess that some of you assumed that this was the default behavior (not the BATCH
mode) 🙂 That approach decreases the potential number of duplicate messages after the restart, but on the other hand, impacts the overall performance of the consumer.
The latest committed customer offset visible in the logs is 8
. So, if we switch to the GUI client we can verify that the current offset there has the same value.
Graceful Shutdown
Although our app commits the offset each time after processing a single record, in the graceful shutdown Spring Boot waits until the whole batch is processed. As you see, I initiated the shutdown procedure at 15:12:41
, but the container performed a shutdown after a 30-second timeout. That’s because I included 10 seconds of thread sleep in the processing method. It results in the total time of processing the batch of messages higher than 30 seconds.
However, we can change that behavior. We need to set the spring.kafka.listener.immediate-stop
property to true
. That property decides whether the container stops after the current record is processed or after all the records from the previous poll are processed.
spring.kafka.listener.immediate-stop: true
After restarting the consumer app we need to take a look at the logs once again. The Spring container starts a shutdown procedure just after committing the offset after processing the last record.
Spring Kafka Offset and Concurrency
Processing Messages with the Custom Thread Pool
Finally, the last scenario in our article. Let’s consider the case when we are using the custom thread to handle messages received by the @KafkaListener
method. In order to do that, we can define the ExecutorService
object. Once the listenAsync()
method receives the message it delegates processing to the Processor
bean by calling its process()
method using the ExecutorService
object.
@Service
public class Listener {
private static final Logger LOG = LoggerFactory
.getLogger(Listener.class);
ExecutorService executorService = Executors.newFixedThreadPool(30);
@Autowired
private Processor processor;
@KafkaListener(
id = "transactions-async",
topics = "transactions-async",
groupId = "a"
)
public void listenAsync(@Payload Order order,
@Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
LOG.info("[partition={},offset={}] Starting Async: {}", partition, offset, order);
executorService.submit(() -> processor.process(order));
}
}
In the Processor
bean, we are sleeping the thread for 10
seconds for testing purposes. The process()
method doesn’t do anything important, it just prints the log at the start and before finishing.
@Service
public class Processor {
private static final Logger LOG = LoggerFactory
.getLogger(Listener.class);
public void process(Order order) {
LOG.info("Processing: {}", order.getId());
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
LOG.info("Finished: {}", order.getId());
}
}
Let’s analyze what will happen after sending some message to such a consumer. This time we are using the transaction-async
topic. By default, Spring Kafka commits the offset after processing the whole batch of 4 received messages. However, it happens almost immediately after receiving the messages, because we are delegating the further processing to another thread. The asynchronous method finishes processing after 10 seconds. If your app crashes during those 10 seconds, it will result in losing messages. They won’t be processed by the new instance of the app, because the offset has already been committed before message processing has been finished.
Enable Manual Offset Commit
Once again, it is a normal situation, that we can lose messages with the Kafka consumer. However, we can handle such cases slightly differently. Instead of relying on the container-managed offset commitment, we can switch to the manual mode. First of all, let’s add the following property to the Spring Boot application.yml
file:
spring.kafka.listener.ack-mode: MANUAL_IMMEDIATE
Then we need to leverage the Acknowledgment
interface to take a control over the offset commitment process inside the listener. As you see, we have to include such an interface to the @KafkaListener
method parameters. After that, we can pass it to the process()
method running in the different thread.
@Service
public class Listener {
private static final Logger LOG = LoggerFactory
.getLogger(Listener.class);
ExecutorService executorService = Executors.newFixedThreadPool(30);
@Autowired
private Processor processor;
@KafkaListener(
id = "transactions-async",
topics = "transactions-async",
groupId = "a"
)
public void listenAsync(@Payload Order order,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
LOG.info("[partition={},offset={}] Starting Async: {}", partition, offset, order);
executorService.submit(() -> processor.process(order, acknowledgment));
}
}
With the acknowledge()
provided by the Acknowledgment
interface we can manually commit the offset in the selected location in the code. Here, we are making a commit at the of the whole method.
@Service
public class Processor {
private static final Logger LOG = LoggerFactory
.getLogger(Listener.class);
public void process(Order order, Acknowledgment acknowledgment) {
LOG.info("Processing: {}", order.getId());
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
LOG.info("Finished: {}", order.getId());
acknowledgment.acknowledge();
}
}
Let’s switch to the consumer
app logs once again. As you the offset commit happens almost immediately after processing the message. By the way, the MANUAL
(instead of MANUAL_IMMEDIATE
) AckMode
will wait with the commit until the whole batch records will be processed. Another thing worth mentioning here is a possibility of out-of-order commit. It is disabled by default for the Spring Boot app. In order to enable it we need to set the spring.kafka.listener.async-acks
property to true
. If you want to test such a scenario by yourself you can increase the number of messages sent by the producer with the numberOfMessages
field e.g. to 100
. Then verify the consumer lag with or without the asyncAcks
property.
Finally, let’s verify the current committed offset for all the partitions using the GUI client.
Final Thoughts
Kafka consumer offset is a very interesting topic. If you want to understand Kafka, you first need to understand how consumers commit the offset on partitions. In this article, I focused on showing you how to switch between different acknowledgment modes with Spring Kafka and how impacts on your app.
2 COMMENTS