Distributed Transactions in Microservices with Kafka Streams and Spring Boot

Distributed Transactions in Microservices with Kafka Streams and Spring Boot

In this article, you will learn how to use Kafka Streams with Spring Boot. We will rely on the Spring Kafka project. In order to explain well how it works, we are going to implement a saga pattern. The saga pattern is a way to manage distributed transactions across microservices. The key phase of that process is to publish an event that triggers local transactions. Microservices exchanges such events through a message broker. It turns out that Kafka Streams may help us here. Let’s see how!

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions.

Instead of Spring Kafka, you could as well use Spring Cloud Stream for Kafka. You can read more about it in this article. Spring Cloud Stream provides several useful features like DLQ support, serialization to JSON by default, or interactive queries.

Architecture

We will create a simple system that consists of three microservices. The order-service sends orders to the Kafka topic called orders. Both other microservices stock-service and payment-service listen for the incoming events. After receiving them they verify if it is possible to execute the order. For example, if there are no sufficient funds on the customer account the order is rejected. Otherwise, the payment-service accepts the order and sends a response to the payment-orders topic. The same with stock-service except that it verifies a number of products in stock and sends a response to the stock-orders topic.

Then, the order-service joins two streams from the stock-orders and payment-orders topics by the order’s id. If both orders were accepted it confirms a distributed transaction. On the other hand, if one order has been accepted and the second rejected it performs rollback. In that case, it just generates o new order event and sends it to the orders topic. We may treat the orders topic as a stream of the order’s status changes or just like a table with the last status. Here’s the picture that visualizes our scenario.

kafka-streams-spring-boot-arch

Kafka Streams with Spring Boot

Let’s begin our implementation from the order-service. Surprisingly there is no Spring Boot starter for Kafka (unless we use Spring Cloud Stream). Therefore we need to include the spring-kafka dependency. In order to process streams we also need to include the kafka-streams module directly. Since the order-service exposes some REST endpoints it is required to add the Spring Boot Web starter.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
</dependency>

The order-service is the most important microservice in our scenario. It acts as an order gateway and a saga pattern orchestrator. It requires all the three topics used in our architecture. In order to automatically create topics on application startup we need to define the following beans:

@Bean
public NewTopic orders() {
   return TopicBuilder.name("orders")
         .partitions(3)
         .compact()
         .build();
}

@Bean
public NewTopic paymentTopic() {
   return TopicBuilder.name("payment-orders")
         .partitions(3)
         .compact()
         .build();
}

@Bean
public NewTopic stockTopic() {
   return TopicBuilder.name("stock-orders")
         .partitions(3)
         .compact()
         .build();
}

Then, let’s define our first Kafka stream. To do that we need to use the StreamsBuilder bean. The order-service receives events from the payment-service (in the payment-events topic) and from the stock-service (in the stock-events topic). Every single event contains the id previously set by the order-service. If we join both these streams into a single stream by order’s id we will be able to determine the status of our transaction. The algorithm is pretty simple. If both payment-service and stock-service accepted the order the final status of transaction is CONFIRMED. If both services rejected the order the final status is REJECTED. The last option is ROLLBACK – when one service accepted the order, and one service rejected it. Here’s the described method inside the OrderManageService bean.

@Service
public class OrderManageService {

   public Order confirm(Order orderPayment, Order orderStock) {
      Order o = new Order(orderPayment.getId(),
             orderPayment.getCustomerId(),
             orderPayment.getProductId(),
             orderPayment.getProductCount(),
             orderPayment.getPrice());
      if (orderPayment.getStatus().equals("ACCEPT") &&
                orderStock.getStatus().equals("ACCEPT")) {
         o.setStatus("CONFIRMED");
      } else if (orderPayment.getStatus().equals("REJECT") &&
                orderStock.getStatus().equals("REJECT")) {
         o.setStatus("REJECTED");
      } else if (orderPayment.getStatus().equals("REJECT") ||
                orderStock.getStatus().equals("REJECT")) {
         String source = orderPayment.getStatus().equals("REJECT")
                    ? "PAYMENT" : "STOCK";
         o.setStatus("ROLLBACK");
         o.setSource(source);
      }
      return o;
   }

}

Finally, the implementation of our stream. We need to define the KStream bean. We are joining two streams using the join method of KStream. The joining window is 10 seconds. As the result, we are setting the status of the order and sending a new order to the orders topic. We use the same topic as for sending new orders.

@Autowired
OrderManageService orderManageService;

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   KStream<Long, Order> stream = builder
         .stream("payment-orders", Consumed.with(Serdes.Long(), orderSerde));

   stream.join(
         builder.stream("stock-orders"),
         orderManageService::confirm,
         JoinWindows.of(Duration.ofSeconds(10)),
         StreamJoined.with(Serdes.Long(), orderSerde, orderSerde))
      .peek((k, o) -> LOG.info("Output: {}", o))
      .to("orders");

   return stream;
}

Let’s see it also in the picture.

kafka-streams-spring-boot-join

Configuration for Spring Boot

In Spring Boot the name of the application is by default the name of the consumer group for Kafka Streams. Therefore, we should set in application.yml. Of course, we also need to set the address of the Kafka bootstrap server. Finally, we have to configure the default key and value for events serialization. It applies to both standard and streams processing.

spring.application.name: orders
spring.kafka:
  bootstrap-servers: 127.0.0.1:56820
  producer:
    key-serializer: org.apache.kafka.common.serialization.LongSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  streams:
    properties:
      default.key.serde: org.apache.kafka.common.serialization.Serdes$LongSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.trusted.packages: "*"

Sending and receiving events from Kafka topics

In the previous section, we discussed how to create a new Kafka stream as a result of joining two other streams. Now, let’s see how to process incoming messages. We can consider it on the example of the payment-service. It listens for the incoming orders. If it receives a new order it performs reservation on the customer’s account and sends a response with a reservation status to the payment-orders topic. If it receives confirmation of the transaction from the order-service, it commits the transaction or rollbacks it. In order to enable Kafka listener, we should annotate the main class with @EnableKafka. Additionally, the listening method must be annotated with the @KafkaListener. The following method listens for events on the orders topic and runs in the payment consumer group.

@SpringBootApplication
@EnableKafka
public class PaymentApp {

    private static final Logger LOG = LoggerFactory.getLogger(PaymentApp.class);

    public static void main(String[] args) {
        SpringApplication.run(PaymentApp.class, args);
    }

    @Autowired
    OrderManageService orderManageService;

    @KafkaListener(id = "orders", topics = "orders", groupId = "payment")
    public void onEvent(Order o) {
        LOG.info("Received: {}" , o);
        if (o.getStatus().equals("NEW"))
            orderManageService.reserve(o);
        else
            orderManageService.confirm(o);
    }
}

Here’s the implementation of the OrderManageService used in the previous code snippet. If it receives the order in the NEW status it performs reservation. During the reservation, it subtracts the order’s price from the amountAvailable field and adds the same value to the amountReserved field. Then it sets the status of the order and sends a response to the payment-orders topic using KafkaTemplate. During the confirmation phase, it doesn’t send any response event. It can perform a rollback, which means – subtracting the order’s price from the amountReserved field and adding it to the amountAvailable field. Otherwise it just “commits” the transaction by subtracting the price from the amountReserved field.

@Service
public class OrderManageService {

   private static final String SOURCE = "payment";
   private static final Logger LOG = LoggerFactory.getLogger(OrderManageService.class);
   private CustomerRepository repository;
   private KafkaTemplate<Long, Order> template;

   public OrderManageService(CustomerRepository repository, KafkaTemplate<Long, Order> template) {
      this.repository = repository;
      this.template = template;
   }

   public void reserve(Order order) {
      Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
      LOG.info("Found: {}", customer);
      if (order.getPrice() < customer.getAmountAvailable()) {
         order.setStatus("ACCEPT");
         customer.setAmountReserved(customer.getAmountReserved() + order.getPrice());
         customer.setAmountAvailable(customer.getAmountAvailable() - order.getPrice());
      } else {
         order.setStatus("REJECT");
      }
      order.setSource(SOURCE);
      repository.save(customer);
      template.send("payment-orders", order.getId(), order);
      LOG.info("Sent: {}", order);
   }

   public void confirm(Order order) {
      Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
      LOG.info("Found: {}", customer);
      if (order.getStatus().equals("CONFIRMED")) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getPrice());
         repository.save(customer);
      } else if (order.getStatus().equals("ROLLBACK") && !order.getSource().equals(SOURCE)) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getPrice());
         customer.setAmountAvailable(customer.getAmountAvailable() + order.getPrice());
         repository.save(customer);
      }

   }
}

A similar logic is implemented on the stock-service side. However, instead of the order’s price, it uses the field productCount and performs reservation for the desired number of ordered products. Here’s the implementation of the OrderManageService class in the stock-service.

@Service
public class OrderManageService {

   private static final String SOURCE = "stock";
   private static final Logger LOG = LoggerFactory.getLogger(OrderManageService.class);
   private ProductRepository repository;
   private KafkaTemplate<Long, Order> template;

   public OrderManageService(ProductRepository repository, KafkaTemplate<Long, Order> template) {
      this.repository = repository;
      this.template = template;
   }

   public void reserve(Order order) {
      Product product = repository.findById(order.getProductId()).orElseThrow();
      LOG.info("Found: {}", product);
      if (order.getStatus().equals("NEW")) {
         if (order.getProductCount() < product.getAvailableItems()) {
            product.setReservedItems(product.getReservedItems() + order.getProductCount());
            product.setAvailableItems(product.getAvailableItems() - order.getProductCount());
            order.setStatus("ACCEPT");
            repository.save(product);
         } else {
            order.setStatus("REJECT");
         }
         template.send("stock-orders", order.getId(), order);
         LOG.info("Sent: {}", order);
      }
   }

   public void confirm(Order order) {
      Product product = repository.findById(order.getProductId()).orElseThrow();
      LOG.info("Found: {}", product);
      if (order.getStatus().equals("CONFIRMED")) {
         product.setReservedItems(product.getReservedItems() - order.getProductCount());
         repository.save(product);
      } else if (order.getStatus().equals("ROLLBACK") && !order.getSource().equals(SOURCE)) {
         product.setReservedItems(product.getReservedItems() - order.getProductCount());
         product.setAvailableItems(product.getAvailableItems() + order.getProductCount());
         repository.save(product);
      }
   }

}

Query Kafka Stream with Spring Boot

Now, let’s consider the following scenario. Firstly, the order-service receives a new order (via REST API) and sends it to the Kafka topic. This order is then received by both other microservices. Once they send back a positive response (or negative) the order-service process them as streams and change the status of the order. The order with a new status is emitted to the same topic as before. So, where we are storing the data with the current status of an order? In Kafka topic. Once again we will use Kafka Streams in our Spring Boot application. But this time, we take advantage of KTable. Let’s at the visualization of our scenario.

kafka-streams-spring-boot-ktable

Ok, so let’s define another Kafka Streams bean in the order-service. We are getting the same orders topic as a stream. We will convert it to the Kafka table and materialize it in a persistent store. Thanks to that we will be able to easily query the store from our REST controller.

@Bean
public KTable<Long, Order> table(StreamsBuilder builder) {
   KeyValueBytesStoreSupplier store =
         Stores.persistentKeyValueStore("orders");
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   KStream<Long, Order> stream = builder
         .stream("orders", Consumed.with(Serdes.Long(), orderSerde));
   return stream.toTable(Materialized.<Long, Order>as(store)
         .withKeySerde(Serdes.Long())
         .withValueSerde(orderSerde));
}

If we run more than one instance of the order-service on the same machine it is also important to override the default location of the state store. To do that we should define the following property unique per every instance:

spring.kafka.streams.state-dir: /tmp/kafka-streams/1

Unfortunately, there is no built-in support in Spring Boot for interactive queries of Kafka Streams. However, we can use auto-configured StreamsBuilderFactoryBean to inject KafkaStreams instance into the controller. Then we can query the state store under the “materialized” name. That’s of course very trivial sample. We are just getting all orders from KTable.

@GetMapping
public List<Order> all() {
   List<Order> orders = new ArrayList<>();
      ReadOnlyKeyValueStore<Long, Order> store = kafkaStreamsFactory
         .getKafkaStreams()
         .store(StoreQueryParameters.fromNameAndType(
                "orders",
                QueryableStoreTypes.keyValueStore()));
   KeyValueIterator<Long, Order> it = store.all();
   it.forEachRemaining(kv -> orders.add(kv.value));
   return orders;
}

In the same OrderController there is also a method for sending a new order to the Kafka topic.

@PostMapping
public Order create(@RequestBody Order order) {
   order.setId(id.incrementAndGet());
   template.send("orders", order.getId(), order);
   LOG.info("Sent: {}", order);
   return order;
}

Testing Scenario

Before we run our sample microservices, we need to start the local instance of Kafka. Usually, I’m using Redpanda for that. It is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. All you need to do is to install the rpk CLI locally (here is the instruction for macOS). After that, you can create a single-node instance using the following command:

$ rpk container start

After running, it will print the address of your node. For me, it is 127.0.0.1:56820. You should put that address as a value of the spring.kafka.bootstrap-servers property. You can also display a list of created topics using the following command:

$ rpk topic list --brokers 127.0.0.1:56820

Then, let’s run our microservices. Begin from the order-service since it is creating all the required topics and building the Kafka Streams instance. You can send a single order using the REST endpoint:

$ curl -X 'POST' \
  'http://localhost:8080/orders' \
  -H 'Content-Type: application/json' \
  -d '{
  "customerId": 10,
  "productId": 10,
  "productCount": 5,
  "price": 100,
  "status": "NEW"
}'

There is some test data inserted while payment-service and stock-service start. So, you can set the value of customerId or productId between 1 and 100 and it will work for you. However, you can use as well a method for generating a random stream of data. The following bean is responsible for generating 10000 random orders:

@Service
public class OrderGeneratorService {

   private static Random RAND = new Random();
   private AtomicLong id = new AtomicLong();
   private Executor executor;
   private KafkaTemplate<Long, Order> template;

   public OrderGeneratorService(Executor executor, KafkaTemplate<Long, Order> template) {
      this.executor = executor;
      this.template = template;
   }

   @Async
   public void generate() {
      for (int i = 0; i < 10000; i++) {
         int x = RAND.nextInt(5) + 1;
         Order o = new Order(id.incrementAndGet(), RAND.nextLong(100) + 1, RAND.nextLong(100) + 1, "NEW");
         o.setPrice(100 * x);
         o.setProductCount(x);
         template.send("orders", o.getId(), o);
      }
   }
}

You can start that process by calling the endpoint POST /orders/generate.

@PostMapping("/generate")
public boolean create() {
   orderGeneratorService.generate();
   return true;
}

No matter if decide to send single order or generate multiple random orders you can easily query the status of orders using the following endpoint:

$ curl http://localhost:8080/orders

Here’s a structure of topics generated by the application and by Kafka Streams to perform join operation and save the orders KTable as a state store.

26 COMMENTS

comments user
Dealus

Won’t the join fail if any of the 2 service happen to process the data 10 seconds or more latter than the other one ?

    comments user
    piotr.minkowski

    Yes, that simulates transaction timeout. You can always increase this value

comments user
Revolut Kamil Gregorczyk

where I’m struggling is with that fact that it couples highly very specific message broker along with very specific framework. I think kafka streams are only available within spring boot, you can’t replace it with quarkus/micronaut or something else which is less complicated and bloated, not mentioning different language.

    comments user
    piotr.minkowski

    Yes, you don’t have to use Spring Boot. I don’t think that Spring Boot Kafka support is complicated, but personally, I prefer Quarkus support. You can some posts about Quarkus and Kafka also in my blog

comments user
Ada

In this architecture it’s not possible to have multiple instances of the microservices, is it?

    comments user
    piotr.minkowski

    Yes. However, with Kafka Streams you need to override state store location per a single instance (details in the post) if you run it locally

comments user
sylleryum

Thanks for writing this article, this is very insightful.
While sending single orders on endpoint http://localhost:8080/orders it works as expected, however using the order generator on endpoint /orders/generate it is generating orders with null status/throwing null pointer exception:
17:42:56.719 — [ orders-0-C-1] : Received: Order{id=709, customerId=81, productId=89, productCount=4, price=400, status=’null’, source=’null’}
17:42:56.732 — [ orders-0-C-1] : [Consumer clientId=consumer-payment-1, groupId=payment] Seeking to offset 3136 for partition orders-2
17:42:56.732 — [ orders-0-C-1] : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method ‘public void pl.piomin.payment.PaymentApp.onEvent(pl.piomin.base.domain.Order)’ threw exception; nested exception is java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “pl.piomin.base.domain.Order.getStatus()” is null; nested exception is java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “pl.piomin.base.domain.Order.getStatus()” is null
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:206)
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:133)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2625)…

    comments user
    piotr.minkowski

    I have already fixed that. Did you still get that error?

comments user
Khaled

Hi piotr.minkowski,

Thanks for this nice tutorial.

I have a issue running the project.

When I run command line
mvn spring-boot:run

I get the below error

[INFO] BUILD FAILURE
[INFO] ————————————————————————
[INFO] Total time: 3.027 s
[INFO] Finished at: 2022-01-25T23:06:46+02:00
[INFO] ————————————————————————
[ERROR] No plugin found for prefix ‘spring-boot’ in the current project and in the plugin groups [org.apache.maven.plugins, org.codehaus.mojo] available from the repositories [local (/Users/khaled/.m2/repository), central (https://repo.maven.apache.org/maven2)] -> [Help 1]

    comments user
    piotr.minkowski

    Hi,
    I guess you might run the command from the root directory. There 3 apps there. Go to the particular app directory and run that command there.

comments user
Deryl Spielman

In the payment app what happens in the new order flow if the order manager in the middle of reserving updates the product in the repository but sending the order to the Kafka stream fails? If an exception is thrown from the service is the Kafka listener somehow aware that the full subscriber didn’t fully handle the message and continue to retry?

    comments user
    piotr.minkowski

    Retries are enabled, transactional support no. Wait for my next article. I’ll do a deep dive into Spring Boot Kafka transactional support and some other cases related to the scenario in this article 🙂

comments user
Cesar

Hi, excellent post but, you can write a post about architecture of microservice using Kafka, WebSocket or RSocket and Spring Cloud Gateway

    comments user
    piotr.minkowski

    Hi. Good idea – thanks. I’ll add it to my queue 😉

comments user
Khaled AbdElSalam

Thanks for your help

Now I can run all three projects and all connected to kafka.

But when I send an order to order service, I get null pointer exception in logs of payment and stock services

Note: I used same curl command in you tutorial.

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method ‘public void pl.piomin.payment.PaymentApp.onEvent(pl.piomin.base.domain.Order)’ threw exception; nested exception is java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “pl.piomin.base.domain.Order.getStatus()” is null; nested exception is java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “pl.piomin.base.domain.Order.getStatus()” is null

    comments user
    piotr.minkowski

    I have already fixed some problems with the code. Did you still receive that error?

comments user
Oleh

In your opinion, what are the benefits of such an approach comparing with other implementations of saga pattern?

    comments user
    piotr.minkowski

    If you use Kafka Streams? It is fast and simple, and the results are stored in Kafka without the necessity of using an external db

comments user
amir choubani

Other question, what are the topics created with ‘*-changelog’ name ?

comments user
Sıddık Açıl

How would you change this solution if `Reservation` was a JPA Entity and you had to get the actual account balances from a data source? One problem I see is that initializer arg for aggregate does not have a id param which would make it impossible for you to get the current status from the DB.

comments user
Harish Raja

It is excellent post when I try to do it from scratch when I try to execute payment-service I am getting following error.
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method ‘public void pl.piomin.payment.PaymentApp.onEvent(pl.piomin.base.domain.Order)’ threw exception; nested exception is java.util.NoSuchElementException: No value present; nested exception is java.util.NoSuchElementException: No value present

I am trying to execute it with the following command:
java -jar payment-service-1.0-SNAPSHOT.jar

    comments user
    piotr.minkowski

    It should work fine now

comments user
Dhiraj Chaudhari

Hi,
This Course is very helpful for us
But when I am traying to test the code getting below exception
Could you please help me
I appreciated your quick response

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void pl.piomin.payment.PaymentApp.onEvent(pl.piomin.base.domain.Order)]
Bean [pl.piomin.payment.PaymentApp$$EnhancerBySpringCGLIB$$2e99b107@1c618295]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [pl.piomin.base.domain.Order] for GenericMessage [payload={“id”:4,”customerId”:23,”productId”:10,”productCount”:5,”price”:100,”status”:”NEW”,”source”:null}, headers={kafka_offset=12, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1354a24d, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=

    comments user
    piotr.minkowski

    Hi,
    Thanks! It should work now

Leave a Reply