Micronaut Tutorial: Reactive

Micronaut Tutorial: Reactive

This is the fourth part of my tutorial to Micronaut Framework – created after a longer period of time. In this article I’m going to show you some examples of reactive programming on the server and client side. By default, Micronaut support to reactive APIs and streams is built on top of RxJava. If you are interested in some previous parts of my tutorial and would like to read it before starting with this part you can learn about basics, security and server-side applications here:

Reactive programming is becoming increasingly popular recently. Therefore, all the newly created web frameworks supports it by default. There is no difference for Micronaut. In this part of tutorial you will learn how to:

  • Use RxJava framework with Micronaut on the server and client side
  • Streaming JSON over HTTP
  • Use low-level HTTP client and declarative HTTP for retrieving reactive stream
  • Regulate back pressure on the server and client side
  • Test reactive API with JUnit

1. Dependencies

Support for reactive programming with RxJava is enabled by default on Micronaut. The dependency io.reactivex.rxjava2:rxjava is included together with some core micronaut libraries like micronaut-runtime or micronaut-http-server-netty on the server side and with micronaut-http-client on the client side. So, the set of dependencies is the same as for example from Part 2 of my tutorial, which has been describing an approach to building classic web application using Micronaut Framework. Just to recap, here’s the list of the most important dependencies in this tutorial:

<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-inject</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-runtime</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-http-server-netty</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-management</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-http-client</artifactId>
   <scope>test</scope>
</dependency>
<dependency>
   <groupId>io.micronaut.test</groupId>
   <artifactId>micronaut-test-junit5</artifactId>
   <scope>test</scope>
</dependency>

2. Controller

Let’s begin from server-side application and a controller implementation. There are some optional annotation for enabling validation (@Validated) and ignoring security constraints (@Secured(SecurityRule.IS_ANONYMOUS)) described in the previous parts of this tutorial. However, the most important thing in the implementation of controller visible below are the RxJava objects used in the return statements: Single, Maybe and Flowable. The rest of implementation is pretty similar to a standard REST controller.

@Controller("/persons/reactive")
@Secured(SecurityRule.IS_ANONYMOUS)
@Validated
public class PersonReactiveController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveController.class);

    List<Person> persons = new ArrayList<>();

    @Post
    public Single<Person> add(@Body @Valid Person person) {
        person.setId(persons.size() + 1);
        persons.add(person);
        return Single.just(person);
    }

    @Get("/{id}")
    public Maybe<Person> findById(@PathVariable Integer id) {
       return Maybe.just(persons.stream().filter(person -> person.getId().equals(id)).findAny().get());
   }

    @Get(value = "/stream", produces = MediaType.APPLICATION_JSON_STREAM)
    public Flowable<Person> findAllStream() {
        return Flowable.fromIterable(persons).doOnNext(person -> LOGGER.info("Server: {}", person));
    }

}

In the implementation of controller visible above I used 3 of 5 available RxJava2 types that can be observed:

  • Single – an observable that emits only one item and then completes. It ensures that one item will be sent, so it’s for non empty outputs.
  • Maybe – works very similar to a Single, but with a particular difference: it can complete without emitting a value. This is useful when we have optional emissions.
  • Flowable – it emits a stream of elements and supports back pressure mechanism

Now, if you understand the meaning of basic observable types in RxJava the example controller should become pretty easy for you. We have three methods: add for adding new element into the list, findById for searching by id that may not return any element and findAllStream that emits all elements as a stream. The last method has to produce application/x-json-stream in order to take an advantage of reactive streams also on the client side. When using that type of content type, events are retrieved continuously by the HTTP client thanks to the Flowable type.

3. Using Low-level Reactive HTTP Client

Micronaut Reactive offers two type of clients for accessing HTTP APIs: low-level client and declarative client. We can choose between HttpClient, RxHttpClient and RxStreamingHttpClient with support for streaming data over HTTP. The recommended way for accessing reference to a client is through injecting it with @Client annotation. However, the most suitable way inside JUnit test is with the create static method of the RxHttpClient, since we may dynamically set port number.
Here’s the implementation of JUnit tests with low-level HTTP client. To read a Single or Maybe we are using method retrieve of RxHttpClient. After subscribing to an observable I’m using using ConcurrentUnit library for handling asynchronous results in the test. For accessing Flowable returned as application/x-json-stream on the server side we need to use RxStreamingHttpClient. It provides method jsonStream, which is dedicated for reading a non-blocking stream of JSON objects.

@MicronautTest
public class PersonReactiveControllerTests {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveControllerTests.class);

    @Inject
    EmbeddedServer server;

    @Test
    public void testAdd() throws MalformedURLException, TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        final Person person = new Person(null, "Name100", "Surname100", 22, Gender.MALE);
        RxHttpClient client = RxHttpClient.create(new URL("https://" + server.getHost() + ":" + server.getPort()));
        Single<Person> s = client.retrieve(HttpRequest.POST("/persons/reactive", person), Person.class).firstOrError();
        s.subscribe(person1 -> {
            LOGGER.info("Retrieved: {}", person1);
            waiter.assertNotNull(person1);
            waiter.assertNotNull(person1.getId());
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testFindById() throws MalformedURLException, TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        RxHttpClient client = RxHttpClient.create(new URL("https://" + server.getHost() + ":" + server.getPort()));
        Maybe<Person> s = client.retrieve(HttpRequest.GET("/persons/reactive/1"), Person.class).firstElement();
        s.subscribe(person1 -> {
            LOGGER.info("Retrieved: {}", person1);
            waiter.assertNotNull(person1);
            waiter.assertEquals(1, person1.getId());
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testFindAllStream() throws MalformedURLException, TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        RxStreamingHttpClient client = RxStreamingHttpClient.create(new URL("https://" + server.getHost() + ":" + server.getPort()));
        client.jsonStream(HttpRequest.GET("/persons/reactive/stream"), Person.class)
                .subscribe(s -> {
                    LOGGER.info("Client: {}", s);
                    waiter.assertNotNull(s);
                    waiter.resume();
                });
        waiter.await(3000, TimeUnit.MILLISECONDS, 9);
    }
   
}

4. Back Pressure

One of the main term related to reactive programming is back pressure. Backpressure is resistance or force opposing the desired flow of data through software. In simple words, if a producer sends more events than a consumer is able to handle in a specific period of time, the consumer should be able to regulate the frequency of sending events on the producer side. Of course, Micronaut Reactive supports backpressure. Let’s analyze how we may control it for Micronaut application on some simple examples.
We may “somehow” control back pressure on the server-side and also on the client-side. However, the client is not able to regulate emission parameters on the server side due to the TCP transport layer. Ok, now let’s implement additional methods in our sample controller. I’m using fromCallable for emitting elements within Flowable stream. We are producing 9 elements by calling method repeat method. The second newly created method is findAllStreamWithCallableDelayed, which additionally delay each element on the stream 100 milliseconds. In this way, we can control back pressure on the server-side.

@Controller("/persons/reactive")
@Secured(SecurityRule.IS_ANONYMOUS)
@Validated
public class PersonReactiveController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveController.class);

    List<Person> persons = new ArrayList<>();
   
    @Get(value = "/stream/callable", produces = MediaType.APPLICATION_JSON_STREAM)
    public Flowable<Person> findAllStreamWithCallable() {
        return Flowable.fromCallable(() -> {
            int r = new Random().nextInt(100);
            Person p = new Person(r, "Name"+r, "Surname"+r, r, Gender.MALE);
            return p;
        }).doOnNext(person -> LOGGER.info("Server: {}", person))
                .repeat(9);
    }

    @Get(value = "/stream/callable/delayed", produces = MediaType.APPLICATION_JSON_STREAM)
    public Flowable<Person> findAllStreamWithCallableDelayed() {
        return Flowable.fromCallable(() -> {
            int r = new Random().nextInt(100);
            Person p = new Person(r, "Name"+r, "Surname"+r, r, Gender.MALE);
            return p;
        }).doOnNext(person -> LOGGER.info("Server: {}", person))
                .repeat(9).delay(100, TimeUnit.MILLISECONDS);
    }
   
}

We can also control back pressure on the client-side. However, as I have mentioned before it does not have any effect on the emission on the server-side. Now, let’s consider the following test that verifies the delayed stream on the server-side.

@Test
public void testFindAllStreamDelayed() throws MalformedURLException, TimeoutException, InterruptedException {
   final Waiter waiter = new Waiter();
   RxStreamingHttpClient client = RxStreamingHttpClient.create(new URL("https://" + server.getHost() + ":" + server.getPort()));
   client.jsonStream(HttpRequest.GET("/persons/reactive/stream/callable/delayed"), Person.class)
      .subscribe(s -> {
         LOGGER.info("Client: {}", s);
         waiter.assertNotNull(s);
         waiter.resume();
      });
   waiter.await(3000, TimeUnit.MILLISECONDS, 9);
}

Here’s the result of the test. Since the server delay every element 100 milliseconds, the client receives and prints the element just after emission.

micronaut-tut-4-1

For a comparison let’s consider the following test. This time we are implementing our custom Subscriber that requests a single element just after processing the previous one. The following test verifies not delayed stream.

@Test
public void testFindAllStreamWithCallable() throws MalformedURLException, TimeoutException, InterruptedException {
   final Waiter waiter = new Waiter();
   RxStreamingHttpClient client = RxStreamingHttpClient.create(new URL("https://" + server.getHost() + ":" + server.getPort()));
   client.jsonStream(HttpRequest.GET("/persons/reactive/stream/callable"), Person.class)
      .subscribe(new Subscriber<Person>() {

         Subscription s;

         @Override
         public void onSubscribe(Subscription subscription) {
            subscription.request(1);
            s = subscription;
         }

         @Override
         public void onNext(Person person) {
            LOGGER.info("Client: {}", person);
            waiter.assertNotNull(person);
            waiter.resume();
            s.request(1);
         }

         @Override
         public void onError(Throwable throwable) {

         }

         @Override
         public void onComplete() {

         }
      });
   waiter.await(3000, TimeUnit.MILLISECONDS, 9);
}

Here’s the result of our test. It finishes succesfully, so the subscriber works properly. However, as you can see the back pressure is controlled just on the client side, since server emits all the elements, and then client retrieves them.

micronaut-tut-4-2

5. Declarative HTTP Client

Besides low-level HTTP client Micronaut Reactive allows to create declarative clients via the @Client annotation. If you are familiar with for example OpenFeign project or you have read the second part of my tutorial to Micronaut Framework you probably understand the concept of declarative client. In fact, we just need to create an interface with similar methods to the methods defined inside the controller and annotate them properly. The client interface should be annotated with @Client as shown below. The interface is placed inside src/test/java since it is used just in JUnit tests.

@Client("/persons/reactive")
public interface PersonReactiveClient {

    @Post
    Single<Person> add(@Body Person person);

    @Get("/{id}")
    Maybe<Person> findById(@PathVariable Integer id);

    @Get(value = "/stream", produces = MediaType.APPLICATION_JSON_STREAM)
    Flowable<Person> findAllStream();

}

The declarative client need to be injected into the test. Here are the same test methods as implemented for low-level client, but now with declarative client.

@MicronautTest
public class PersonReactiveControllerTests {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveControllerTests.class);

    @Inject
    EmbeddedServer server;
    @Inject
    PersonReactiveClient client;

    @Test
    public void testAddDeclarative() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        final Person person = new Person(null, "Name100", "Surname100", 22, Gender.MALE);
        Single<Person> s = client.add(person);
        s.subscribe(person1 -> {
            LOGGER.info("Retrieved: {}", person1);
            waiter.assertNotNull(person1);
            waiter.assertNotNull(person1.getId());
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testFindByIdDeclarative() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Maybe<Person> s = client.findById(1);
        s.subscribe(person1 -> {
            LOGGER.info("Retrieved: {}", person1);
            waiter.assertNotNull(person1);
            waiter.assertEquals(1, person1.getId());
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testFindAllStreamDeclarative() throws MalformedURLException, TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flowable<Person> persons = client.findAllStream();
        persons.subscribe(s -> {
            LOGGER.info("Client: {}", s);
            waiter.assertNotNull(s);
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS, 9);
    }

}

Source Code

We were using the same repository as for two previous parts of my Micronaut tutorial: https://github.com/piomin/sample-micronaut-applications.git.

Leave a Reply