Deep Dive into Saga Transactions with Kafka Streams and Spring Boot
In this article, you will learn how to use Kafka Streams and Spring Boot to perform transactions according to the Saga pattern. To be honest, I was quite surprised by a great deal of attention to my last article about Kafka. I got some questions about streams, transactions, and support for Kafka in Spring Boot. In this article, I’ll try to answer a few of them. I will also show how you can easily set up a cloud-managed Kafka on the Upstash.
Introduction
First of all, let’s recap the approach described in the previous article. We used Kafka Streams to process order transactions on the order-service
side. To handle orders coming to the stock-service
and payment-service
we used a standard Spring @KafkaListener
. There are also two databases – a single database per every service. The stock-service
stores data related to the number of available products and updates them after receiving an order. The same with the payment-service
. It updates the customer’s account on every single order. Both applications receive orders from Kafka topic. They send responses to other topics. But just to simplify, we will skip it as shown in the figure below. We treat the Kafka orders
topic as a stream of events and also as a table with the latest order’s status.
What may go wrong with that approach? In fact, we have two data sources here. We use Kafka as the order store. On the other hand, there are SQL databases (in my case H2, but you can use any other) that store stock and payment data. Once we send an order with a reservation to the Kafka topic, we need to update a database. Since Kafka does not support XA transactions, it may result in data inconsistency. Of course, Kafka doesn’t support XA transactions the same as many other systems including e.g. RabbitMQ.
The question is what can we do with that? One of the possible options you may use is an approach called Change Data Capture (CDC) with the outbox pattern. CDC identifies and tracks changes to data in a database. Then it may emit those changes as events and send them, for example to the Kafka topic. I won’t go into the details of that process. If you are interested in you may read this article written by Gunnar Morling.
Architecture with Kafka Streams
The approach I will describe today is fully based on the Kafka Streams. We won’t use any SQL databases. When the order-service
sends a new order its id
is the message key. With Kafka Streams, we may change a message key in the stream. It results in creating new topics and repartitioning. With new message keys, we may perform calculations just for the specific customerId
or productId
. The result of such calculation may be saved in the persistent store. For example, Kafka automatically creates and manages such state stores when you are calling stateful operations like count()
or aggregate()
. We will aggregate the orders related to the particular customer or product. Here’s the illustration of our architecture. Here’s the visualization of our process.
Now, let’s consider a scenario for the payment-service
in details. In the incoming stream of orders the payment-service
calls the selectKey()
operation. It changes the key from the order’s id
into the order’s customerId
. Then it groups all the orders by the new key and invokes the aggregate()
operation. In the aggregate()
method it calculates the available amount and reserved amount based on the order’s price and status (whether it is a new order or a confirmation order). If there are sufficient funds on the customer account it sends the ACCEPT
order to the payment-orders
topic. Otherwise, it sends the REJECT
order. Then the order-service
process responses by joining streams from payment-orders
and stock-orders
by the order’s id
. As the result, it sends a confirmation or a rollback order.
Finally, let’s proceed to the implementation!
Source Code
If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then switch to the streams-full branch. After that, you should just follow my instructions.
Aggregation with Kafka Streams
Let’s begin with the payment-service
. The implementation of KStream
in not complicated here. In the first step (1), we invoke the selectKey()
method and get the customerId
value of the Order
object as a new key. Then we call groupByKey()
method (2) to receive KGroupedStream
as a result. While we have KGroupedStream
we may invoke one of the calculation methods. In that case, we need to use aggregate()
, since we have a little bit more advanced calculation than just a simple count (3). The last two steps are just for printing the value after calculation.
@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
JsonSerde<Reservation> rsvSerde = new JsonSerde<>(Reservation.class);
KStream<Long, Order> stream = builder
.stream("orders", Consumed.with(Serdes.Long(), orderSerde))
.peek((k, order) -> LOG.info("New: {}", order));
KeyValueBytesStoreSupplier customerOrderStoreSupplier =
Stores.persistentKeyValueStore("customer-orders");
stream.selectKey((k, v) -> v.getCustomerId()) // (1)
.groupByKey(Grouped.with(Serdes.Long(), orderSerde)) // (2)
.aggregate(
() -> new Reservation(random.nextInt(1000)),
aggregatorService,
Materialized.<Long, Reservation>as(customerOrderStoreSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(rsvSerde)) // (3)
.toStream()
.peek((k, trx) -> LOG.info("Commit: {}", trx));
return stream;
}
However, the most important step in the fragment of code visible above is the class called inside the aggregate()
method. The aggregate()
method takes three input arguments. The first of them indicates the starting value of our compute object. That object represents the current state of the customer’s account. It has two fields: amountAvailable
and amountReserved
. To clarify, we use that object instead of the entity that stores available and reserved amounts on the customer account. Each customer is represented by the customerId
(key) and the Reservation
object (value) in Kafka KTable
. Just for the test purpose, we are generating the starting value of amountAvailable
as a random number between 0 and 1000.
public class Reservation {
private int amountAvailable;
private int amountReserved;
public Reservation() {
}
public Reservation(int amountAvailable) {
this.amountAvailable = amountAvailable;
}
// GETTERS AND SETTERS ...
}
Ok, let’s take a look at our aggregation method. It needs to implement the Kafka Aggregate
interface and its method apply()
. It may handle three types of orders. One of them is a confirmation of the order (1). It confirms the distributed transaction, so we just need to cancel a reservation by subtracting the order’s price from the amountReserved
field. On the other, in the case of rollback, we need to increase the value of amountAvailable
by the order’s price and decrease the value amountRerserved
accordingly (2). Finally, if we receive a new order we need to perform a reservation if there are sufficient funds on the customer account, or otherwise, reject an order.
Aggregator<Long, Order, Reservation> aggregatorService = (id, order, rsv) -> {
switch (order.getStatus()) {
case "CONFIRMED" -> // (1)
rsv.setAmountReserved(rsv.getAmountReserved()
- order.getPrice());
case "ROLLBACK" -> { // (2)
if (!order.getSource().equals("PAYMENT")) {
rsv.setAmountAvailable(rsv.getAmountAvailable()
+ order.getPrice());
rsv.setAmountReserved(rsv.getAmountReserved()
- order.getPrice());
}
}
case "NEW" -> { // (3)
if (order.getPrice() <= rsv.getAmountAvailable()) {
rsv.setAmountAvailable(rsv.getAmountAvailable()
- order.getPrice());
rsv.setAmountReserved(rsv.getAmountReserved()
+ order.getPrice());
order.setStatus("ACCEPT");
} else {
order.setStatus("REJECT");
}
template.send("payment-orders", order.getId(), order);
}
}
LOG.info("{}", rsv);
return rsv;
};
State Store with the Kafka Streams Table
The implementation of the stock-service
is pretty similar to the payment-service
. With the difference that we count a number of available products on stock instead of available funds on the customer account. Here’s our Reservation
object:
public class Reservation {
private int itemsAvailable;
private int itemsReserved;
public Reservation() {
}
public Reservation(int itemsAvailable) {
this.itemsAvailable = itemsAvailable;
}
// GETTERS AND SETTERS ...
}
The implementation of the aggregation method is also very similar to the payment-service
. However, this time, let’s focus on another thing. Once we process a new order we need to send a response to the stock-orders
topic. We use KafkaTemplate
for that. In the case of payment-service we also send a response, but to the payment-orders
topic. The send method from the KafkaTemplate
does not block the thread. It returns the ListenableFuture
objects. We may add a callback to the send method using it and the result after sending the message (1). Finally, let’s log the current state of the Reservation
object (2).
Aggregator<Long, Order, Reservation> aggrSrv = (id, order, rsv) -> {
switch (order.getStatus()) {
case "CONFIRMED" -> rsv.setItemsReserved(rsv.getItemsReserved()
- order.getProductCount());
case "ROLLBACK" -> {
if (!order.getSource().equals("STOCK")) {
rsv.setItemsAvailable(rsv.getItemsAvailable()
+ order.getProductCount());
rsv.setItemsReserved(rsv.getItemsReserved()
- order.getProductCount());
}
}
case "NEW" -> {
if (order.getProductCount() <= rsv.getItemsAvailable()) {
rsv.setItemsAvailable(rsv.getItemsAvailable()
- order.getProductCount());
rsv.setItemsReserved(rsv.getItemsReserved()
+ order.getProductCount());
order.setStatus("ACCEPT");
} else {
order.setStatus("REJECT");
}
// (1)
template.send("stock-orders", order.getId(), order)
.addCallback(r -> LOG.info("Sent: {}",
result != null ? result.getProducerRecord().value() : null),
ex -> {});
}
}
LOG.info("{}", rsv); // (2)
return rsv;
};
After that, we are also logging the value of the Reservation
object (1). In order to do that we need to convert KTable
into KStream
and then call the peek
method. This log is printed just after Kafka Streams commits the offset in the source topic.
@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
JsonSerde<Reservation> rsvSerde = new JsonSerde<>(Reservation.class);
KStream<Long, Order> stream = builder
.stream("orders", Consumed.with(Serdes.Long(), orderSerde))
.peek((k, order) -> LOG.info("New: {}", order));
KeyValueBytesStoreSupplier stockOrderStoreSupplier =
Stores.persistentKeyValueStore("stock-orders");
stream.selectKey((k, v) -> v.getProductId())
.groupByKey(Grouped.with(Serdes.Long(), orderSerde))
.aggregate(() -> new Reservation(random.nextInt(100)), aggrSrv,
Materialized.<Long, Reservation>as(stockOrderStoreSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(rsvSerde))
.toStream()
.peek((k, trx) -> LOG.info("Commit: {}", trx)); // (1)
return stream;
}
What will happen if you send the test order? Let’s see the logs. You can see the difference in time between processing the message and offset commit. You won’t have any problems with that until your application is running or it has been stopped gracefully. But if you, for example, kill the process using the kill -9
command? After restart, our application will receive the same messages once again. Since we use KafkaTemplate
to send the response to the stock-orders
topic, we need to commit the offset as soon as possible.
What can we do to avoid such problems? We may override the default value (30000
) of the commit.interval.ms
Kafka Streams property. If you set it to 0, it commits immediately after processing finishes.
spring.kafka:
streams:
properties:
commit.interval.ms: 0
On the other hand, we can also set the property processing.guarantee
to exactly_once
. It also changes the default value of commit.interval.ms
to 100ms and enables idempotence for a producer. You can read more about it here in Kafka documentation.
spring.kafka:
streams:
properties:
processing.guarantee: exactly_once
Running Kafka on Upstash
For the purpose of today’s exercise, we will use a serverless Kafka cluster on Upstash. You can create it with a single click. If you would like to test JAAS authentication for your application I’ve got good news 🙂 The authentication on that cluster is enabled by default. You can find and copy username and password from the cluster’s main panel.
Now, let’s configure Kafka connection settings and credentials for the Spring Boot application. There is a developer free tier on Upstash up to 10k messages per day. It will be enough for our tests.
spring.kafka:
bootstrap-servers: topical-gar-11460-us1-kafka.upstash.io:9092
properties:
security.protocol: SASL_SSL
sasl.mechanism: SCRAM-SHA-256
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${USERNAME}" password="${PASSWORD}";
With Upstash you can easily display a list of topics. In total, there are 10 topics used in our sample system. Three of them are used directly by the Spring Boot applications, while the rest of them by the Kafka Streams in order to process stateful operations.
After starting the order-service
application we can call its REST endpoint to create and send an order to the Kafka topic.
private static final Logger LOG =
LoggerFactory.getLogger(OrderController.class);
private AtomicLong id = new AtomicLong();
private KafkaTemplate<Long, Order> template;
@PostMapping
public Order create(@RequestBody Order order) {
order.setId(id.incrementAndGet());
template.send("orders", order.getId(), order);
LOG.info("Sent: {}", order);
return order;
}
Let’s call the endpoint using the following curl
command. You can use any customerId
or productId
you want.
$ curl -X 'POST' \
'http://localhost:8080/orders' \
-H 'Content-Type: application/json' \
-d '{
"customerId": 20,
"productId": 20,
"productCount": 2,
"price": 10,
"status": "NEW"
}'
All three sample applications use Kafka Streams to process distributed transactions. Once the order is accepted by both stock-service
and payment-service
you should see the following entry in the order-service
logs.
You can easily simulate rejection of transactions with Kafka Streams just by setting e.g. productCount
higher than the value generated by the product-service
as available items.
With Upstash UI you can also easily verify the number of messages incoming to the topics. Let’s see the current statistics for the orders
topic.
Final Thoughts
In order to fully understand what happens in this example, you should be also familiar with the Kafka Streams threading model. It is worth reading the following article, which explains it in a clean manner. First of all, each stream partition is a totally ordered sequence of data records and maps to a Kafka topic partition. It means, that even if we have multiple orders at the same time related to e.g. same product, they are all processed sequentially since they have the same message key (productId
in that case).
Moreover, by default, there is only a single stream thread that handles all the partitions. You can see this in the logs below. However, there are stream tasks that act as the lowest-level units of parallelism. As a result, stream tasks can be processed independently and in parallel without manual intervention.
I hope this article helps you to better understand Kafka Streams. I just wanted to give you a simple example of how you can use Kafka Streams with Saga transactions in order to simplify your current architecture.
8 COMMENTS