Part 1: Testing Kafka Microservices With Micronaut

Part 1: Testing Kafka Microservices With Micronaut

I have already described how to build microservices architecture entirely based on message-driven communication through Apache Kafka in one of my previous articles Kafka In Microservices With Micronaut. As you can see in the article title the sample applications and integration with Kafka has been built on top of Micronaut Framework. I described some interesting features of Micronaut, that can be used for building message-driven microservices, but I didn’t specifically write anything about testing. In this article I’m going to show you example of testing your Kafka microservices using Micronaut Test core features (Component Tests), Testcontainers (Integration Tests) and Pact (Contract Tests).

Generally, automated testing is one of the biggest challenges related to microservices architecture. Therefore the most popular microservice frameworks like Micronaut or Spring Boot provide some useful features for that. There are also some dedicated tools which help you to use Docker containers in your tests or provide mechanisms for verifying the contracts between different applications. For the purpose of current article demo applications I’m using the same repository as for the previous article: https://github.com/piomin/sample-kafka-micronaut-microservices.git.

Sample Architecture

The architecture of sample applications has been described in the previous article but let me perform a quick recap. We have 4 microservices: order-service, trip-service, driver-service and passenger-service. The implementation of these applications is very simple. All of them have in-memory storage and connect to the same Kafka instance.
A primary goal of our system is to arrange a trip for customers. The order-service application also acts as a gateway. It is receiving requests from customers, saving history and sending events to orders topic. All the other microservices are listening on this topic and processing orders sent by order-service. Each microservice has its own dedicated topic, where it sends events with information about changes. Such events are received by some other microservices. The architecture is presented in the picture below.

micronaut-kafka-1

Embedded Kafka – Component Testing with Micronaut

After a short description of the architecture we may proceed to the key point of this article – testing. Micronaut allows you to start an embedded Kafka instance for the purpose of testing. To do that you should first include the following dependencies to your Maven pom.xml:

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.3.0</version>
   <classifier>test</classifier>
</dependency>
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.12</artifactId>
   <version>2.3.0</version>
</dependency>
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.12</artifactId>
   <version>2.3.0</version>
   <classifier>test</classifier>
</dependency>

To enable embedded Kafka for a test class we have to set property kafka.embedded.enabled to true. Because I have run Kafka on Docker container, which is by default available on address 192.168.99.100 I also need to change dynamically the value of property kafka.bootstrap.servers to localhost:9092 for a given test. The test implementation class uses embedded Kafka for testing three basic scenarios for order-service: sending orders with new trip, and receiving orders for trip cancellation and completion from other microservices. Here’s the full code of my OrderKafkaEmbeddedTest

@MicronautTest
@Property(name = "kafka.embedded.enabled", value = "true")
@Property(name = "kafka.bootstrap.servers", value = "localhost:9092")
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class OrderKafkaEmbeddedTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderKafkaEmbeddedTest.class);

    @Inject
    OrderClient client;
    @Inject
    OrderInMemoryRepository repository;
    @Inject
    OrderHolder orderHolder;
    @Inject
    KafkaEmbedded kafkaEmbedded;

    @BeforeAll
    public void init() {
        LOGGER.info("Topics: {}", kafkaEmbedded.getKafkaServer().get().zkClient().getAllTopicsInCluster());
    }

    @Test
    @org.junit.jupiter.api.Order(1)
    public void testAddNewTripOrder() throws InterruptedException {
        Order order = new Order(OrderType.NEW_TRIP, 1L, 50, 30);
        order = repository.add(order);
        client.send(order);
        Order orderSent = waitForOrder();
        Assertions.assertNotNull(orderSent);
        Assertions.assertEquals(order.getId(), orderSent.getId());
    }

    @Test
    @org.junit.jupiter.api.Order(2)
    public void testCancelTripOrder() throws InterruptedException {
        Order order = new Order(OrderType.CANCEL_TRIP, 1L, 50, 30);
        client.send(order);
        Order orderReceived = waitForOrder();
        Optional<Order> oo = repository.findById(1L);
        Assertions.assertTrue(oo.isPresent());
        Assertions.assertEquals(OrderStatus.REJECTED, oo.get().getStatus());
    }

    @Test
    @org.junit.jupiter.api.Order(3)
    public void testPaymentTripOrder() throws InterruptedException {
        Order order = new Order(OrderType.PAYMENT_PROCESSED, 1L, 50, 30);
        order.setTripId(1L);
        order = repository.add(order);
        client.send(order);
        Order orderSent = waitForOrder();
        Optional<Order> oo = repository.findById(order.getId());
        Assertions.assertTrue(oo.isPresent());
        Assertions.assertEquals(OrderStatus.COMPLETED, oo.get().getStatus());
    }

    private Order waitForOrder() throws InterruptedException {
        Order orderSent = null;
        for (int i = 0; i < 10; i++) {
            orderSent = orderHolder.getCurrentOrder();
            if (orderSent != null)
                break;
            Thread.sleep(1000);
        }
        orderHolder.setCurrentOrder(null);
        return orderSent;
    }

}

At that stage some things require clarification – especially the mechanism of verifying sending and receiving messages. I’ll describe it in the example of driver-service. When a message is incoming to the order topic it is received by OrderListener, which is annotated with @KafkaListener as shown below. It gets the order type and forwards the NEW_TRIP request to DriverService bean.

@KafkaListener(groupId = "driver")
public class OrderListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private DriverService service;

    public OrderListener(DriverService service) {
        this.service = service;
    }

    @Topic("orders")
    public void receive(@Body Order order) {
        LOGGER.info("Received: {}", order);
        switch (order.getType()) {
            case NEW_TRIP -> service.processNewTripOrder(order);
        }
    }
}

The DriverService is processing order. It is trying to find the driver located closest to the customer, changing found driver’s status to unavailable and sending events with change with the current driver state.

@Singleton
public class DriverService {

    private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class);

    private DriverClient client;
    private OrderClient orderClient;
    private DriverInMemoryRepository repository;

    public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) {
        this.client = client;
        this.orderClient = orderClient;
        this.repository = repository;
    }

    public void processNewTripOrder(Order order) {
        LOGGER.info("Processing: {}", order);
        Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY());
        if (driver.isPresent()) {
            Driver driverLocal = driver.get();
            driverLocal.setStatus(DriverStatus.UNAVAILABLE);
            repository.updateDriver(driverLocal);
            client.send(driverLocal, String.valueOf(order.getId()));
            LOGGER.info("Message sent: {}", driverLocal);
        }
    }
   
   // OTHER METHODS ...
}

To verify that a final message with change notification has been sent to the drivers topic we have to create our own listener for the test purposes. It receives the message and writes it in @Singleton holder class which is then accessed by a single-thread test class. The described process is visualized in the picture below.
kafka-micronaut-testing-1.png
Here’s the implementation of test listener which is responsible just for receiving the message sent to drivers topic and writing it to DriverHolder bean.

@KafkaListener(groupId = "driverTest")
public class DriverConfirmListener {

   private static final Logger LOGGER = LoggerFactory.getLogger(DriverConfirmListener.class);

   @Inject
   DriverHolder driverHolder;

   @Topic("orders")
   public void receive(@Body Driver driver) {
      LOGGER.info("Confirmed: {}", driver);
      driverHolder.setCurrentDriver(driver);
   }

}

Here’s the implementation of DriverHolder class.

@Singleton
public class DriverHolder {

   private Driver currentDriver;

   public Driver getCurrentDriver() {
      return currentDriver;
   }

   public void setCurrentDriver(Driver currentDriver) {
      this.currentDriver = currentDriver;
   }

}

No matter if you are using embedded Kafka, Testcontainers or just manually started a Docker container you can use the verification mechanism described above.

Kafka with Testcontainers

We will use the Testcontainers framework for running Docker containers of Zookeeper and Kafka during JUnit tests. Testcontainers is a Java library that provides lightweight, throwaway instances of common databases, Selenium web browsers, or anything else that can run in a Docker container. To use it in your project together with JUnit 5, which is already used for our sample Micronaut application, you have to add the following dependencies to your Maven pom.xml:

<dependency>
   <groupId>org.testcontainers</groupId>
   <artifactId>kafka</artifactId>
   <version>1.12.2</version>
   <scope>test</scope>
</dependency>
<dependency>
   <groupId>org.testcontainers</groupId>
   <artifactId>junit-jupiter</artifactId>
   <version>1.12.2</version>
   <scope>test</scope>
</dependency>

The declared library org.testcontainers:kafka:1.12.2 provides KafkaContainer class that allows to define and start a Kafka container with embedded Zookeeper in your tests. However, I decided to use GenericContainer class and run two containers wurstmeister/zookeeper and wurstmeister/kafka. Because Kafka needs to communicate with Zookeeper both containers should be run in the same network. We will also have to override Zookeeper container’s name and host name to allow Kafka to call it by the hostname.
When running a Kafka container we need to set some important environment variables. Variable KAFKA_ADVERTISED_HOST_NAME sets the hostname under which Kafka is visible for external client and KAFKA_ZOOKEEPER_CONNECT Zookeeper lookup address. Although it is not recommended we should disable dynamic exposure port generation by setting static port number equal to the container binding port 9092. That helps us to avoid some problems with setting Kafka advertised port and injecting it into Micronaut configuration.

@MicronautTest
@Testcontainers
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class OrderKafkaContainerTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderKafkaContainerTest.class);

    static Network network = Network.newNetwork();

   @Container
   public static final GenericContainer ZOOKEEPER = new GenericContainer("wurstmeister/zookeeper")
      .withCreateContainerCmdModifier(it -> ((CreateContainerCmd) it).withName("zookeeper").withHostName("zookeeper"))
      .withExposedPorts(2181)
      .withNetworkAliases("zookeeper")
      .withNetwork(network);

   @Container
   public static final GenericContainer KAFKA_CONTAINER = new GenericContainer("wurstmeister/kafka")
      .withCreateContainerCmdModifier(it -> ((CreateContainerCmd) it).withName("kafka").withHostName("kafka")
         .withPortBindings(new PortBinding(Ports.Binding.bindPort(9092), new ExposedPort(9092))))
      .withExposedPorts(9092)
      .withNetworkAliases("kafka")
      .withEnv("KAFKA_ADVERTISED_HOST_NAME", "192.168.99.100")
      .withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181")
      .withNetwork(network);
      
   // TESTS ...
   
}

The test scenarios may be the same as for embedded Kafka or we may attempt to define some more advanced integration tests. To do that we first create a Docker image of every microservice during the build. We can use io.fabric8:docker-maven-plugin for that. Here’s the example for driver-service.

<plugin>
   <groupId>io.fabric8</groupId>
   <artifactId>docker-maven-plugin</artifactId>
   <version>0.31.0</version>
   <configuration>
      <images>
         <image>
            <name>piomin/driver-service:${project.version}</name>
            <build>
               <dockerFile>${project.basedir}/Dockerfile</dockerFile>
               <tags>
                  <tag>latest</tag>
                  <tag>${project.version}</tag>
               </tags>
            </build>
         </image>
      </images>
   </configuration>
   <executions>
      <execution>
         <id>start</id>
         <phase>pre-integration-test</phase>
         <goals>
            <goal>build</goal>
            <goal>start</goal>
         </goals>
      </execution>
      <execution>
         <id>stop</id>
         <phase>post-integration-test</phase>
         <goals>
            <goal>stop</goal>
         </goals>
      </execution>
   </executions>
</plugin>

If we have a Docker image of every microservice we can easily run it using Testcontainers during our integration tests. In the fragment of test class visible below I’m running the container with driver-service in addition to Kafka and Zookeeper containers. The test is implemented inside order-service. We are building the same scenario as in the test with embedded Kafka – sending the NEW_TRIP order. But this time we are verifying if the message has been received and processed by the driver-service. This verification is performed by listening for notification events sent by driver-service started on Docker container to the drivers topic. Normally, order-service does not listen for messages incoming to drivers topic, but we created such integration just for the integration test purpose.

@Container
public static final GenericContainer DRIVER_CONTAINER = new GenericContainer("piomin/driver-service")
   .withNetwork(network);

@Inject
OrderClient client;
@Inject
OrderInMemoryRepository repository;
@Inject
DriverHolder driverHolder;

@Test
@org.junit.jupiter.api.Order(1)
public void testNewTrip() throws InterruptedException {
   Order order = new Order(OrderType.NEW_TRIP, 1L, 50, 30);
   order = repository.add(order);
   client.send(order);
   Driver driverReceived = null;
   for (int i = 0; i < 10; i++) {
      driverReceived = driverHolder.getCurrentDriver();
      if (driverReceived != null)
         break;
      Thread.sleep(1000);
   }
   driverHolder.setCurrentDriver(null);
   Assertions.assertNotNull(driverReceived);
}

Summary

In this article, I have described an approach to component testing with embedded Kafka, and Micronaut, and also integration tests with Docker and Testcontainers. This is the first part of the article, in the second I’m going to show you how to build contract tests for Micronaut applications with Pact.

Leave a Reply