Kafka Streams with Quarkus
In this article, you will learn how to use Kafka Streams with Quarkus. The same as in my previous article we will create a simple application that simulates the stock market. But this time, we are going to use Quarkus instead of Spring Cloud. If you would like to figure out what is a streaming platform and how it differs from a traditional message broker this article is for you. Moreover, we will study useful improvements related to Apache Kafka provided by Quarkus.
Source Code
If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.
Architecture
In our case, there are two incoming streams of events. Both of them represent incoming orders. These orders are generated by the order-service
application. It sends buy orders to the orders.buy
topic and sell orders to the orders.sell
topic. Then, the stock-service
application receives and handles incoming events. In the first step, it needs to change the key of each message from the orderId
to the productId
. That’s because it has to join orders from different topics related to the same product in order to execute transactions. Finally, the transaction price is an average of sale and buy prices.
We are building a simplified version of the stock market platform. Each buy order contains a maximum price at which a customer is expecting to buy a product. On the other hand, each sale order contains a minimum price a customer is ready to sell his product. If the sell order price is not greater than a buy order price for a particular product we are performing a transaction.
Each order is valid for 10 seconds. After that time the stock-service
application will not handle such an order since it is considered as expired. Each order contains a number of products for a transaction. For example, we may sell 100 for 10 or buy 200 for 11. Therefore, an order may be fully or partially realized. The stock-service
application tries to join partially realized orders to other new or partially realized orders. You can see the visualization of that process in the picture below.
Run Apache Kafka locally
Before we jump to the implementation, we need to run a local instance of Apache Kafka. If you don’t want to install it on your laptop, the best way to run it is with Redpanda. Redpanda is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. Normally, you would have to install Redpanda on your laptop and then create a cluster using their CLI. But with Quarkus you don’t need to do that! The only requirement is to have Docker installed. Thanks to the Quarkus Kafka extension and feature called Dev Services it automatically starts a Kafka broker in dev mode and when running tests. Moreover, the application is configured automatically.
The only thing you need to do in order to enable that feature is NOT to provide any Kafka address in configuration properties. Dev Services uses Testcontainers to run Kafka, so if you have Docker or any other environment supporting Testcontainers running you get a containerized instance of Kafka out-of-the-box. Another important thing. Firstly, start the order-service
application. It automatically creates all the required topics in Kafka. Then run the stock-service
application. It uses the Quarkus Kafka Streams extension and verifies if the required topics exist. Let’s visualize it.
Send events to Kafka with Quarkus
There are several ways to send events to Kafka with Quarkus. Because we need to send key/value pair we will use the io.smallrye.reactive.messaging.kafka.Record
object for that. Quarkus is able to generate and send data continuously. In the fragment of code visible below, we send a single Order
event per 500 ms. Each Order
contains a random productId
, price
and productCount
.
@Outgoing("orders-buy")
public Multi<Record<Long, Order>> buyOrdersGenerator() {
return Multi.createFrom().ticks().every(Duration.ofMillis(500))
.map(order -> {
Integer productId = random.nextInt(10) + 1;
int price = prices.get(productId) + random.nextInt(200);
Order o = new Order(
incrementOrderId(),
random.nextInt(1000) + 1,
productId,
100 * (random.nextInt(5) + 1),
LocalDateTime.now(),
OrderType.BUY,
price);
log.infof("Sent: %s", o);
return Record.of(o.getId(), o);
});
}
@Outgoing("orders-sell")
public Multi<Record<Long, Order>> sellOrdersGenerator() {
return Multi.createFrom().ticks().every(Duration.ofMillis(500))
.map(order -> {
Integer productId = random.nextInt(10) + 1;
int price = prices.get(productId) + random.nextInt(200);
Order o = new Order(
incrementOrderId(),
random.nextInt(1000) + 1,
productId,
100 * (random.nextInt(5) + 1),
LocalDateTime.now(),
OrderType.SELL,
price);
log.infof("Sent: %s", o);
return Record.of(o.getId(), o);
});
}
We will also define a single @Incoming
channel in order to receive transactions produced by the stock-service
. Thanks to that Quarkus will automatically create the topic transactions
used by Quarkus Kafka Streams in stock-service
. To be honest, I was not able to force the Quarkus Kafka Streams extension to create the topic automatically. It seems we need to use the SmallRye Reactive Messaging extension for that.
@Incoming("transactions")
public void transactions(Transaction transaction) {
log.infof("New: %s", transaction);
}
Of course, we need to include the SmallRye Reactive Messaging dependency to the Maven pom.xml
.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
Finally, let’s provide configuration settings. We have two outgoing topics and a single incoming topic. We can set their names. Otherwise, Quarkus uses the same name as the name of the channel. The names of our topics are orders.buy
, order.sell
and transactions
.
mp.messaging.outgoing.orders-buy.connector = smallrye-kafka
mp.messaging.outgoing.orders-buy.topic = orders.buy
mp.messaging.outgoing.orders-buy.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-buy.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer
mp.messaging.outgoing.orders-sell.connector = smallrye-kafka
mp.messaging.outgoing.orders-sell.topic = orders.sell
mp.messaging.outgoing.orders-sell.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-sell.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer
mp.messaging.incoming.transactions.connector = smallrye-kafka
mp.messaging.incoming.transactions.topic = transactions
mp.messaging.incoming.transactions.value.deserializer = pl.piomin.samples.streams.order.model.deserializer.TransactionDeserializer
That’s all. Our orders generator is ready. If you the order-service
application Quarkus will also run Kafka (Redpanda) instance. But first, let’s switch to the second sample application – stock-service
.
Consume Kafka Streams with Quarkus
In the previous section, we were sending messages to the Kafka broker. Therefore, we used a standard Quarkus library for integration with Kafka based on the SmallRye Reactive Messaging framework. The stock-service
application consumes messages as streams, so now we will use a module for Kafka Streams integration.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
</dependency>
Our application also uses a database, an ORM layer and includes some other useful modules.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-h2</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
</dependency>
In the first step, we are going to merge both streams of orders (buy and sell), insert the Order
into the database, and print the event message. You could ask – why I use the database and ORM layer here since I have Kafka KTable
? Well, I need transactions with lock support in order to coordinate the status of order realization (refer to the description in the introduction – fully and partially realized orders). I will give you more details about it in the next sections.
In order to process streams with Quarkus, we need to declare the org.apache.kafka.streams.Topology
bean. It contains all the KStream
and KTable
definitions. Let’s start just with the part responsible for creating and emitting transactions from incoming orders. There are two KStream
definitions created. The first of them is responsible for merging two order streams into a single one and then inserting a new Order
into a database. The second of them creates and executes transactions by joining two streams using the productId
key. But more about it in the next section.
@Produces
public Topology buildTopology() {
ObjectMapperSerde<Order> orderSerde =
new ObjectMapperSerde<>(Order.class);
ObjectMapperSerde<Transaction> transactionSerde =
new ObjectMapperSerde<>(Transaction.class);
StreamsBuilder builder = new StreamsBuilder();
KStream<Long, Order> orders = builder.stream(
ORDERS_SELL_TOPIC,
Consumed.with(Serdes.Long(), orderSerde));
builder.stream(ORDERS_BUY_TOPIC,
Consumed.with(Serdes.Long(), orderSerde))
.merge(orders)
.peek((k, v) -> {
log.infof("New: %s", v);
logic.add(v);
});
builder.stream(ORDERS_BUY_TOPIC,
Consumed.with(Serdes.Long(), orderSerde))
.selectKey((k, v) -> v.getProductId())
.join(orders.selectKey((k, v) -> v.getProductId()),
this::execute,
JoinWindows.of(Duration.ofSeconds(10)),
StreamJoined.with(Serdes.Integer(), orderSerde, orderSerde))
.filterNot((k, v) -> v == null)
.map((k, v) -> new KeyValue<>(v.getId(), v))
.peek((k, v) -> log.infof("Done -> %s", v))
.to(TRANSACTIONS_TOPIC, Produced.with(Serdes.Long(), transactionSerde));
}
To process the streams we need to add configuration properties. A list of input topics is required. We can also override a default application id and enable Kafka health check.
quarkus.kafka-streams.application-id = stock
quarkus.kafka-streams.topics = orders.buy,orders.sell
quarkus.kafka.health.enabled = true
Operations on Kafka Streams
Now, we may use some more advanced operations on Kafka Streams than just merging two different streams. In fact, that’s a key logic in our application. We need to join two different order streams into a single one using the productId
as a joining key. Since the producer sets orderId
as a message key, we first need to invoke the selectKey
method for both order.sell
and orders.buy
streams. In our case, joining buy and sell orders related to the same product is just a first step. Then we need to verify if the maximum price in the buy order is not greater than the minimum price in the sell order.
The next step is to verify if both these have not been realized previously, as they also may be paired with other orders in the stream. If all the conditions are met we may create a new transaction. Finally, we may change a stream key from productId to the transactionId
and send it to the dedicated transactions
topic.
Each time we successfully join two orders we are trying to create a transaction. The execute(...)
method is called within the KStream
join
method. Firstly, we are comparing the prices of both orders. Then we verify the realization status of both orders by accessing the H2 database. If the orders are still not fully realized we may create a transaction and update orders records in the database.
private Transaction execute(Order orderBuy, Order orderSell) {
if (orderBuy.getAmount() >= orderSell.getAmount()) {
int count = Math.min(orderBuy.getProductCount(),
orderSell.getProductCount());
boolean allowed = logic
.performUpdate(orderBuy.getId(), orderSell.getId(), count);
if (!allowed)
return null;
else
return new Transaction(
++transactionId,
orderBuy.getId(),
orderSell.getId(),
count,
(orderBuy.getAmount() + orderSell.getAmount()) / 2,
LocalDateTime.now(),
"NEW"
);
} else {
return null;
}
}
Let’s take a closer look at the performUpdate()
method called inside the execute()
method. It initiates a transaction and locks both Order
entities. Then it verifies each order realization status and updates it with the current values if possible. Only if the performUpdate()
method finishes successfully the stock-service
application creates a new transaction.
@ApplicationScoped
public class OrderLogic {
@Inject
Logger log;
@Inject
OrderRepository repository;
@Transactional
public Order add(Order order) {
repository.persist(order);
return order;
}
@Transactional
public boolean performUpdate(Long buyOrderId, Long sellOrderId, int amount) {
Order buyOrder = repository.findById(buyOrderId,
LockModeType.PESSIMISTIC_WRITE);
Order sellOrder = repository.findById(sellOrderId,
LockModeType.PESSIMISTIC_WRITE);
if (buyOrder == null || sellOrder == null)
return false;
int buyAvailableCount =
buyOrder.getProductCount() - buyOrder.getRealizedCount();
int sellAvailableCount =
sellOrder.getProductCount() - sellOrder.getRealizedCount();
if (buyAvailableCount >= amount && sellAvailableCount >= amount) {
buyOrder.setRealizedCount(buyOrder.getRealizedCount() + amount);
sellOrder.setRealizedCount(sellOrder.getRealizedCount() + amount);
repository.persist(buyOrder);
repository.persist(sellOrder);
return true;
} else {
return false;
}
}
}
Nice 🙂 That’s all that we need to do in the first part of our exercise. Now we can run both our sample applications.
Run and manage Kafka Streams application with Quarkus
As I mentioned before, we first need to start the order-service
. It runs a new Kafka instance and creates all required topics. Immediately after startup, it is ready to send new orders. To run the Quarkus app locally just go to the order-service
directory and execute the following command:
$ mvn quarkus:dev
Just to verify you can display a list running Docker containers with the docker ps
command. Here’s my result:
As you see the instance of Redpanda is running and it is available on a random port 49724
. Quarkus did it for us. However, if you have Redpanda installed on your laptop you check out the list of created topics with their CLI rpk
:
$ rpk topic list --brokers=127.0.0.1:49724
Then let’s run the stock-service
. Go to the stock-service
directory and run mvn quarkus:dev
once again. After startup, it just works. Both applications share the same instance thanks to the Quarkus Dev Services. Now let’s access the Quarkus Dev UI console available at http://localhost:8080/q/dev/
. Find the tile with the “Apache Kafka Streams” title.
You can check a visualization of our Kafka Streams topology. I will divide the image into two parts for better visibility.
Use Kafka KTable with Quarkus
We have already finished the implementation of the logic responsible for creating transactions from incoming orders. In the next step, we are going to perform analytical operations on the transactions stream. Our main goal is to calculate total number of transactions, total number of products sold/bought, and total value of transactions (price * productsCount
) per each product. Here’s the object class used in calculations.
@RegisterForReflection
public class TransactionTotal {
private int count;
private int amount;
private int productCount;
// GETTERS AND SETTERS
}
Because the Transaction
object does not contain information about the product, we first need to join the order to access it. Then we produce a KTable
by per productId
grouping and aggregation. After that, we may invoke an aggregate method that allows us to perform some more complex calculations. In that particular case, we are calculating the number of all executed transactions, their volume of products, and total value. The result KTable
can be materialized as the state store. Thanks to that we will be able to query it by the name defined by the TRANSACTIONS_PER_PRODUCT_SUMMARY
variable.
KeyValueBytesStoreSupplier storePerProductSupplier = Stores.persistentKeyValueStore(
TRANSACTIONS_PER_PRODUCT_SUMMARY);
builder.stream(TRANSACTIONS_TOPIC, Consumed.with(Serdes.Long(), transactionSerde))
.selectKey((k, v) -> v.getSellOrderId())
.join(orders.selectKey((k, v) -> v.getId()),
(t, o) -> new TransactionWithProduct(t, o.getProductId()),
JoinWindows.of(Duration.ofSeconds(10)),
StreamJoined.with(Serdes.Long(), transactionSerde, orderSerde))
.groupBy((k, v) -> v.getProductId(), Grouped.with(Serdes.Integer(), transactionWithProductSerde))
.aggregate(
TransactionTotal::new,
(k, v, a) -> {
a.setCount(a.getCount() + 1);
a.setProductCount(a.getAmount() + v.getTransaction().getAmount());
a.setAmount(a.getProductCount() +
(v.getTransaction().getAmount() * v.getTransaction().getPrice()));
return a;
},
Materialized.<Integer, TransactionTotal> as(storePerProductSupplier)
.withKeySerde(Serdes.Integer())
.withValueSerde(transactionTotalSerde))
.toStream()
.peek((k, v) -> log.infof("Total per product(%d): %s", k, v))
.to(TRANSACTIONS_PER_PRODUCT_AGGREGATED_TOPIC,
Produced.with(Serdes.Integer(), transactionTotalSerde));
Here’s the class responsible for interactive queries implementation. It injects KafkaStreams
bean. Then it tries to obtain persistent store basing on the StockService.TRANSACTIONS_PER_PRODUCT_SUMMARY
variable. As a result there is a ReadOnlyKeyValueStore
with Integer
as a key, and TransactionTotal
as a value. We may return a single value related with the particular productId
(getTransactionsPerProductData
) or just return a list with results for all available products (getAllTransactionsPerProductData
).
@ApplicationScoped
public class InteractiveQueries {
@Inject
KafkaStreams streams;
public TransactionTotal getTransactionsPerProductData(Integer productId) {
return getTransactionsPerProductStore().get(productId);
}
public Map<Integer, TransactionTotal> getAllTransactionsPerProductData() {
Map<Integer, TransactionTotal> m = new HashMap<>();
KeyValueIterator<Integer, TransactionTotal> it = getTransactionsPerProductStore().all();
while (it.hasNext()) {
KeyValue<Integer, TransactionTotal> kv = it.next();
m.put(kv.key, kv.value);
}
return m;
}
private ReadOnlyKeyValueStore<Integer, TransactionTotal> getTransactionsPerProductStore() {
return streams.store(
StoreQueryParameters
.fromNameAndType(StockService.TRANSACTIONS_PER_PRODUCT_SUMMARY, QueryableStoreTypes.keyValueStore()));
}
}
Finally, we can create a REST controller responsible for exposing data retrieved by the interactive queries.
@ApplicationScoped
@Path("/transactions")
public class TransactionResource {
@Inject
InteractiveQueries interactiveQueries;
@GET
@Path("/products/{id}")
public TransactionTotal getByProductId(@PathParam("id") Integer productId) {
return interactiveQueries.getTransactionsPerProductData(productId);
}
@GET
@Path("/products")
public Map<Integer, TransactionTotal> getAllPerProductId() {
return interactiveQueries.getAllTransactionsPerProductData();
}
}
Now you can easily check out statistics related to the transactions created by the stock-service
. You just need to call the following REST endpoints e.g.:
$ curl http://localhost:8080/transactions/products
$ curl http://localhost:8080/transactions/products/3
$ curl http://localhost:8080/transactions/products/5
Final Thoughts
Quarkus simplifies working with Kafka Streams and interactive queries. It provides useful improvements for developers like auto-start of Kafka in dev and test modes or Kafka streams visualization in dev UI console. You can easily compare the Quarkus approach with the Spring Cloud Stream Kafka support since I implemented the same logic for both those frameworks. Here’s the GitHub repository with Spring Cloud Stream Kafka Streams example.
4 COMMENTS