Partitioning with Apache Kafka and Vertx

Partitioning with Apache Kafka and Vertx

In this article, you will learn how to implement partitioning with Apache Kafka and Vertx toolkit. Apache Kafka is a distributed streaming platform. It also may act as a messaging system in your architecture. Traditional message brokers provide two models of communication: queuing and publish-subscribe (topics). Queues are used for point-to-point messaging, while topics allow you to broadcast data to multiple target consumers. Kafka does not provide a queuing mechanism directly. However, it introduces the consumer group concept, which generalizes both queuing and publish-subscribe models. The consumer group mechanism guarantees that a single message would be processed by only one consumer that belongs to the given group. It is especially useful when you have more than one instance of your service, which listens for messages incoming to the topic. That feature makes your consumers behave as queuing clients within the same group.

Eclipse Vert.x is a lightweight and fast toolkit for building reactive applications on the JVM. I have already introduced that solution in some of my previous posts, for example Asynchronous Microservices with Vert.x. Vert.x does not force you to implement a reactive application. You may create a standard service, which processes the HTTP requests asynchronously in accordance with Asynchronous I/O concept.

The purpose of this article

The main purpose of this article is to show you the main features of Apache Kafka (partitioning), that may be useful when creating applications consuming messages. The Java client’s library choice is not a key point here. However, in my opinion, Vertx is an asynchronous, high-performance framework that perfectly matches Apache Kafka. It provides Vert.x Kafka client, which allows you to read and send messages from/to a Kafka cluster. Before we proceed to the sample, let’s first dive into the core abstraction of Kafka.

Kafka topic

I’m assuming you know what the topic is and what is its main role. Every message incoming to the topic goes to every subscriber. What is the main difference between Kafka and standard topics provided by other message brokers? Kafka’s topic is partitioned. Each partition is an ordered, immutable sequence of records. Every record can be uniquely identified within the partition by a sequential id number called the offset. The Kafka cluster retains all published records according to the configured retention period.

Consumers may subscribe to the whole topic or only to the selected partition. It can also control the offset from where it starts processing data. For example, it is able to reset offset in order to reprocess data from the past or just or skip ahead to the most recent record to consume only messages currently sent to the topic. Here’s the figure that illustrates a single partition structure with producers and consumers listening for the incoming data.

apache-kafka-partitioning-1

Sample architecture

Let me say some words about the sample system architecture. Its source code is available on GitHub (https://github.com/piomin/sample-vertx-kafka-messaging.git). In accordance with the principle that one picture speaks more than a thousand words, the diagram illustrating the architecture of our system is visible below. We have one topic created on Kafka platform, that consists of two partitions. There is one client application that exposes REST API allowing to send orders into the system and then forwarding them into the topic. The target partition is calculated based on the type of order. We may create orders with types SINGLE and MULTIPLE. There are also some applications that consume data from topics. First of them single-order-processor reads data from partition 0, the second multiple-order-processor from partition 1, and the last all-order-processor does not choose any partition.

apache-kafka-partitioning-2

Running Apache Kafka

To run Apache Kafka on the local machine we may use its Docker image. The image shared by Spotify also starts the ZooKeeper server, which is used by Kafka. If you run Docker on Windows the default address of its virtual machine is 192.168.99.100.

$ docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=192.168.99.100 --env ADVERTISED_PORT=9092 spotify/kafka

However, that option assumes the topics would be automatically created during application startup. I’ve got some problems with it while creating a multi-partitions topic. There is also another image ches/kafka, which requires starting ZooKeeper separately, but provides Kafka client interface.

$ docker run -d --name zookeeper -p 2181:2181 zookeeper
$ docker run -d --name kafka -p 9092:9092 -p 7203:7203 --network kafka-net --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env ZOOKEEPER_IP=192.168.99.100 ches/kafka

Finally, we can run ches/kafka container in client mode and then create topic orders-out with two partitions.


docker run --rm --network kafka-net ches/kafka kafka-topics.sh --create --topic orders-out --replication-factor 1 --partitions 2 --zookeeper 192.168.99.100:2181
Created topic "orders-out".

Building producer application with Vertx

First, we need to include Maven dependencies to enable Vert.x framework for the application. If the application exposes RESTful HTTP API you should include vertx-web. Library vertx-kafka-client has to be included to all the sample modules.

To start Vert.x as Java application we have to create verticle by extending AbstractVerticle. Then the verticle needs to be deployed in the main method using Vertx object. For more details about Vert.x and verticles concept you may refer to one of my previous article mentioned in the preface.

public class OrderVerticle extends AbstractVerticle {
   public static void main(String[] args) {
      Vertx vertx = Vertx.vertx();
      vertx.deployVerticle(new OrderVerticle());
   }
}

The next step is to define producer using KafkaProducer interface. We have to provide connection settings and serializer implementation class. You can choose between various built-in serializer implemementations. The most suitable for me was JsonObjectSerializer, which requires JsonObject as an input parameter.

Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonObjectSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "1");
KafkaProducer producer = KafkaProducer.create(vertx, config);

The producer is invoked inside the POST method route definition. It returns an asynchronous response with a status after sending a message to the topic. The message is created using KafkaProducerRecord interface. It takes the topic’s name, request object, and partition number as the parameters. As you may see in the fragment of code below, partition number is calculated on the basis order type (o.getType().ordinal()).

Router router = Router.router(vertx);
router.route("/order/*").handler(ResponseContentTypeHandler.create());
router.route(HttpMethod.POST, "/order").handler(BodyHandler.create());
router.post("/order").produces("application/json").handler(rc -> {
   Order o = Json.decodeValue(rc.getBodyAsString(), Order.class);
   KafkaProducerRecord record = KafkaProducerRecord.create("orders", null, rc.getBodyAsJson(), o.getType().ordinal());
   producer.write(record, done -> {
      if (done.succeeded()) {
         RecordMetadata recordMetadata = done.result();
         LOGGER.info("Record sent: msg={}, destination={}, partition={}, offset={}", record.value(), recordMetadata.getTopic(), recordMetadata.getPartition(), recordMetadata.getOffset());
         o.setId(recordMetadata.getOffset());
         o.setStatus(OrderStatus.PROCESSING);
      } else {
         Throwable t = done.cause();
         LOGGER.error("Error sent to topic: {}", t.getMessage());
         o.setStatus(OrderStatus.REJECTED);
      }
      rc.response().end(Json.encodePrettily(o));
   });
});
vertx.createHttpServer().requestHandler(router::accept).listen(8090);

Partitioning on consumer applications with Vertx

The consumer configuration is very similar to that for producer. We also have to set connection settings and class using for deserialization. There is one interesting setting, which has been defined for the consumer in the fragment of code visible below. It is auto.offset.reset (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG). It sets the initial offset in Kafka for the customer during initialization. If you would like to read all records from the beginning of stream use value earliest. If you would like to processes only the newest records (received after application startup) set that property to latest. Because in our case Kafka acts as a message broker, it is set to latest.

Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer consumer = KafkaConsumer.create(vertx, config);

As you probably remember we have three different application that subscribes to the topic. The first of them, implemented under the module all-order-processor consumes all the events incoming to the topic. This implementation is relatively the simplest. We only need to invoke subscribe method and pass the name of the topic as a parameter. Then every incoming message is processed by handler method.

consumer.subscribe("orders-out", ar -> {
   if (ar.succeeded()) {
      LOGGER.info("Subscribed");
   } else {
      LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
   }
});

consumer.handler(record -> {
   LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), 
record.partition(), record.offset());
   Order order = Json.decodeValue(record.value(), Order.class);
   order.setStatus(OrderStatus.DONE);
   LOGGER.info("Order processed: id={}, price={}", order.getId(), order.getPrice());
});

The implementation of consuming method for the other applications is a little more complicated. Besides defining target topic, every consumer can ask for a specific partition. The application multiple-order-processor subscribes to partition 1, while multiple-order-processor to partition 0.

TopicPartition tp = new TopicPartition().setPartition(1).setTopic("orders-out");
consumer.assign(tp, ar -> {
   if (ar.succeeded()) {
      LOGGER.info("Subscribed");
      consumer.assignment(done1 -> {
         if (done1.succeeded()) {
            for (TopicPartition topicPartition : done1.result()) {
               LOGGER.info("Partition: topic={}, number={}", topicPartition.getTopic(), topicPartition.getPartition());
            }
         } else {
            LOGGER.error("Could not assign partition: err={}", done1.cause().getMessage());
         }
      });
   } else {
      LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
   }
});

The implementation of handle method inside multiple-order-processor is pretty interesting. If it receives an order with a non-empty field relatedOrderId it tries to find it in the historical records stored in the topic. It may be achieved by calling seek method on KafkaConsumer.

consumer.handler(record -> {
   LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), record.partition(), record.offset());
   Order order = Json.decodeValue(record.value(), Order.class);
   if (ordersWaiting.containsKey(record.offset())) {
      LOGGER.info("Related order found: id={}, price={}", order.getId(), order.getPrice());
      LOGGER.info("Current price: price={}", order.getPrice() + ordersWaiting.get(record.offset()).getPrice());
      consumer.seekToEnd(tp);
   }

   if (order.getRelatedOrderId() != null && !ordersWaiting.containsKey(order.getRelatedOrderId())) {
      ordersWaiting.put(order.getRelatedOrderId(), order);
      consumer.seek(tp, order.getRelatedOrderId());
   }
});

Apache Kafka partitioning testing

Now it is time to launch our applications. You may run the main classes from your IDE or build the whole project using mvn clean install command and then run it with java -jar. Also run two instances of all-order-processor in order to check out how a consumer groups mechanism works in practice.

Let’s send some test requests to the order-service in the following sequence.

$ curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":200}' http://localhost:8090/order
{"id":0,"type":"SINGLE","status":"PROCESSING","price":200}
$ curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":300}' http://localhost:8090/order
{"id":1,"type":"SINGLE","status":"PROCESSING","price":300}
$ curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":400}' http://localhost:8090/order
{"id":0,"type":"MULTIPLE","status":"PROCESSING","price":400}
$ curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId" :0}' http://localhost:8090/order
{"id":1,"type":"MULTIPLE","status":"PROCESSING","price":500}

Here’s log from producer application.

2018-01-30 11:08:48 [INFO ]  Record sent: msg={"type":"SINGLE","status":"NEW","price":200}, destination=orders-out, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Record sent: msg={"type":"SINGLE","status":"NEW","price":300}, destination=orders-out, partition=0, offset=1
2018-01-30 11:09:08 [INFO ]  Record sent: msg={"type":"MULTIPLE","status":"NEW","price":400}, destination=orders-out, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Record sent: msg={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, destination=orders-out, partition=1, offset=1

Here’s log from single-order-processor. It has processed only messages from partition 0.

2018-01-30 11:08:48 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1

Here’s log from multiple-order-processor. It has processed only messages from partition 1.

2018-01-30 11:09:08 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1

Here’s log from first instance of all-order-processor.

2018-01-30 11:08:48 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1

Here’s log from second instance of all-order-processor. It may be a little bit surprising for you. But, if you run two instances of consumer, which listens for the whole topic each instance would process message from the single partition.

2018-01-30 11:09:08 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1

Summary

In this article I was trying to give you a little bit of messaging with Apache Kafka. Such concepts like consumer groups or partitioning are something that makes Apache Kafka different from traditional messaging solutions. It is a widely adopted product, which can act as storage, messaging system, or stream processor. Together with the popular JVM-based toolkit Vertx, it may be a really powerful, fast, and lightweight solution for your applications that exchange messages. The key concepts introduced by Kafka has been adopted by Spring Cloud Stream, which makes them an opinionated choice for creating messaging microservices.

0 COMMENTS

Leave a Reply