Local Development with Redpanda, Quarkus and Testcontainers

Local Development with Redpanda, Quarkus and Testcontainers

In this article, you will learn how to speed up your local development with Redpanda and Quarkus. The main goal is to show that you can replace Apache Kafka with Redpanda without any changes in the source code. Instead, you will get a fast way to run your existing Kafka applications without Zookeeper and JVM. You will also see how Quarkus uses Redpanda as a local instance for development. Finally, we are going to run all containers in the Testcontainers Cloud.

For the current exercise, we use the same examples as described in one of my previous articles about Quarkus and Kafka Streams. Just to remind you: we are building a simplified version of the stock market platform. The stock-service application receives and handles incoming orders. There are two types of orders: purchase (BUY) and sale (SELL). While the stock-service consumes Kafka streams, the order-service generates and sends events to the orders.buy and orders.sell topics. Here’s the diagram with our architecture. As you see, the stock-service also uses PostgreSQL as a database.

quarkus-redpanda-arch

Source Code

If you would like to try this exercise yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. Then switch to the dev branch. After that, you should just follow my instructions. Let’s begin.

Install Redpanda

This step is not required. However, it is worth installing Redpanda since it provides a useful CLI called Redpanda Keeper (rpk) to manage a cluster. To install Redpanda on macOS just run the following command:

$ brew install redpanda-data/tap/redpanda

Now, we can easily create and run a new cluster. For the development purpose, we only need a single-node Redpanda cluster. In order to run, you need to have Docker on your laptop.

$ rpk container start

Before proceeding to the next steps let’s just remove a current cluster. Quarkus will create everything for us automatically.

$ rpk container purge

Quarkus with Kafka and Postgres

Let’s begin with the stock-service. It consumes streams from Kafka topics and connects to the PostgreSQL database, as I mentioned before. So, the first step is to include the following dependencies:

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-kafka-streams</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-jdbc-postgresql</artifactId>
</dependency>

Now, we may proceed to the implementation. The topology for all the streams is provided inside the following method:

@Produces
public Topology buildTopology() {
   ...
}

There are some different streams defined there. But let’s just take a look at the fragment of topology responsible for creating transactions from incoming orders

final String ORDERS_BUY_TOPIC = "orders.buy";
final String ORDERS_SELL_TOPIC = "orders.sell";
final String TRANSACTIONS_TOPIC = "transactions";

// ... other streams

KStream<Long, Order> orders = builder.stream(
   ORDERS_SELL_TOPIC,
   Consumed.with(Serdes.Long(), orderSerde));

builder.stream(ORDERS_BUY_TOPIC, Consumed.with(Serdes.Long(), orderSerde))
   .merge(orders)
   .peek((k, v) -> {
      log.infof("New: %s", v);
      logic.add(v);
   });

builder.stream(ORDERS_BUY_TOPIC, Consumed.with(Serdes.Long(), orderSerde))
   .selectKey((k, v) -> v.getProductId())
   .join(orders.selectKey((k, v) -> v.getProductId()),
      this::execute,
      JoinWindows.of(Duration.ofSeconds(10)),
      StreamJoined.with(Serdes.Integer(), orderSerde, orderSerde))
   .filterNot((k, v) -> v == null)
   .map((k, v) -> new KeyValue<>(v.getId(), v))
   .peek((k, v) -> log.infof("Done -> %s", v))
   .to(TRANSACTIONS_TOPIC, Produced.with(Serdes.Long(), transactionSerde));

The whole implementation is more advanced. For the details, you may refer to the article I mentioned in the introduction. Now, let’s imagine we are still developing our stock market app. Firstly, we should run PostgreSQL and a local Kafka cluster. We use Redpanda, which is easy to run locally. After that, we would typically provide addresses of both the database and broker in the application.properties. But using a feature called Quarkus Dev Services, the only thing we need to configure now, are the names of topics used for consuming Kafka Streams and the application id. Both of these are required by Kafka Streams.

Now, the most important thing: you just need to start the Quarkus app. Nothing more. DO NOT run any external tools by yourself and DO NOT provide any addresses for them in the configuration settings. Just add the two lines you see below:

quarkus.kafka-streams.application-id = stock
quarkus.kafka-streams.topics = orders.buy,orders.sell

Run Quarkus in dev mode with Redpanda

Before you run the Quarkus app, make sure you have Docker running on your laptop. When you do, the only thing you need is to start both test apps. Let’s begin with the stock-service since it receives orders generated by the order-service. Go to the stock-service directory and run the following command:

$ cd stock-service
$ mvn quarkus:dev

If you see the following logs, it means that everything went well. Our application has been started in 13 seconds. During this time, Quarkus also started Kafka, PostgreSQL on Docker, and built Kafka Streams. Everything in 13 seconds with a single command and without any additional configuration. Nice, right? Let’s check out what happened in the background:

Firstly, let’s find the following line of logs beginning with the sentence “Dev Services for Kafka started”. It perfectly describes the feature of Quarkus called Dev Services. Our Kafka instance has been started as a Docker container and is available under a dynamically generated port. The application connects to it. All other Quarkus apps you would run now will share the same instance of a broker. You can disable that feature by setting the property quarkus.kafka.devservices.shared to false.

It may be a little surprising, but Quarkus Dev Services for Kafka uses Redpanda to run a broker. Of course, Redpanda is a Kafka-compatible solution. Since it starts in ~one second and does not require Zookeeper, it is a great choice for local development.

In order to run tools like brokers or databases on Docker, Quarkus uses Testcontainers. If you are interested in more details about Quarkus Dev Services for Kafka, read the following documentation. For now, let’s display a list of running containers using the docker ps command. There is a container with Redpanda, PostgreSQL, and Testcontainers.

quarkus-redpanda-containers

Manage Kafka Streams with Redpanda and Quarkus

Let’s verify how everything works on the application side. After running the application, we can take advantage of another useful Quarkus feature called Dev UI. Our UI console is available under the address http://localhost:8080/q/dev/. After accessing it, you can display a topology of Kafka Streams by clicking the button inside the Apache Kafka Streams tile.

Here you will see a summary of available streams. For me, it is 12 topics and 15 state stores. You may also see a visualization of Kafka Streams’ topology. The following picture shows the fragment of topology. You can download the full image by clicking the green download button, visible on the right side of the screen.

quarkus-redpanda-dev

Now, let’s use the Redpanda CLI to display a list of created topics. In my case, Redpanda is available under the port 55001 locally. All the topics are automatically created by Quarkus during application startup. We need to define the names of topics used in communication between both our test apps. Those topics are: orders.buy, orders.sell and transactions. They are configured and created by the order-service. The stock-service is creating all other topics visible below, which are responsible for handling streams.

$ rpk topic list --brokers localhost:55001
NAME                                                    PARTITIONS  REPLICAS
orders.buy                                              1           1
orders.sell                                             1           1
stock-KSTREAM-JOINOTHER-0000000016-store-changelog      1           1
stock-KSTREAM-JOINOTHER-0000000043-store-changelog      1           1
stock-KSTREAM-JOINOTHER-0000000065-store-changelog      1           1
stock-KSTREAM-JOINTHIS-0000000015-store-changelog       1           1
stock-KSTREAM-JOINTHIS-0000000042-store-changelog       1           1
stock-KSTREAM-JOINTHIS-0000000064-store-changelog       1           1
stock-KSTREAM-KEY-SELECT-0000000005-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000006-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000032-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000033-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000054-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000055-repartition         1           1
stock-transactions-all-summary-changelog                1           1
stock-transactions-all-summary-repartition              1           1
stock-transactions-per-product-summary-30s-changelog    1           1
stock-transactions-per-product-summary-30s-repartition  1           1
stock-transactions-per-product-summary-changelog        1           1
stock-transactions-per-product-summary-repartition      1           1
transactions                                            1           1

In order to do a full test, we also need to run order-service. It is generating orders continuously and sending them to the orders.buy or orders.sell topics. Let’s do that.

Send messages to Redpanda with Quarkus

Before we run order-service, let’s see some implementation details. On the producer side, we need to include a single dependency responsible for integration with a Kafka broker:

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

Our application generates and sends random orders to the orders.buy or orders.sell topics. There are two methods for that, each of them dedicated to a single topic. Let’s just see a method for generating BUY orders. We need to annotate it with @Outgoing and set the channel name (orders-buy). Our method generates a single order per 500 milliseconds.

@Outgoing("orders-buy")
public Multi<Record<Long, Order>> buyOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
            incrementOrderId(),
            random.nextInt(1000) + 1,
            productId,
            100 * (random.nextInt(5) + 1),
            LocalDateTime.now(),
            OrderType.BUY,
            price);
         log.infof("Sent: %s", o);
      return Record.of(o.getId(), o);
   });
}

After that, we need to map the channel name into a target topic name. Another required operation is to set the serializer for the message key and value.

mp.messaging.outgoing.orders-buy.connector = smallrye-kafka
mp.messaging.outgoing.orders-buy.topic = orders.buy
mp.messaging.outgoing.orders-buy.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-buy.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Finally, go to the order-service directory and run the application.

$ cd order-service
$ mvn quarkus:dev

Once you start order-service, it will create topics and start sending orders. It uses the same instance of Redpanda as stock-service. You can run the docker ps command once again to verify it.

Now, just do a simple change in stock-service to reload the application. It will also reload the Kafka Streams topology. After that, it is starting to receive orders from the topics created by the order-service. Finally, it will create transactions from incoming orders and store them in the transactions topic.

Use Testcontainers Cloud

In our development process, we need to have a locally installed Docker ecosystem. But, what if we don’t have it? That’s where Testcontainers Cloud comes in. Testcontainers Cloud is the developer-first SaaS platform for modern integration testing with real databases, message brokers, cloud services, or any other component of application infrastructure. To simplify, we will do the same thing as before but our instances of Redpanda and PostgreSQL will not run on the local Docker, but on the remote Testcointainers platform.

What do you need to do to enable Testcontainers Cloud? Firstly, download the agent from the following site. You also need to be a beta tester to obtain an authorization token. Finally, just run the agent and kill your local Docker daemon. You should see the Testcontainers icon in the running apps with information about the connection to the cloud.

quarkus-redpanda-testcontainers

Docker should not run locally.

The same as before, just run both applications with the quarkus:dev command. Your Redpanda broker is running on the Testcontainers Cloud but, thanks to the agent, you may access it over localhost.

Once again you can verify a list of topics using the following command for the new broker:

$ rpk topic list --brokers localhost:59779

Final Thoughts

In this article, I focused on showing you how new and exciting technologies like Quarkus, Redpanda, and Testcontainers can work together. Local development is one of the use cases, but you may as well use them to write integration tests.

2 COMMENTS

comments user
Clebio

Nice job man.

    comments user
    piotr.minkowski

    Thanks 🙂

Leave a Reply