Micronaut Tutorial: Reactive
This is the fourth part of my tutorial to Micronaut Framework – created after a longer period of time. In this article I’m going to show you some examples of reactive programming on the server and client side. By default, Micronaut support to reactive APIs and streams is built on top of RxJava. If you are interested in some previous parts of my tutorial and would like to read it before starting with this part you can learn about basics, security and server-side applications here:
- Part 1: Micronaut Tutorial: Beans and Scopes
- Part 2: Micronaut Tutorial: Server Application
- Part 3: Micronaut Tutorial: Security
Reactive programming is becoming increasingly popular recently. Therefore, all the newly created web frameworks supports it by default. There is no difference for Micronaut. In this part of tutorial you will learn how to:
- Use RxJava framework with Micronaut on the server and client side
- Streaming JSON over HTTP
- Use low-level HTTP client and declarative HTTP for retrieving reactive stream
- Regulate back pressure on the server and client side
- Test reactive API with JUnit
1. Dependencies
Support for reactive programming with RxJava is enabled by default on Micronaut. The dependency io.reactivex.rxjava2:rxjava
is included together with some core micronaut libraries like micronaut-runtime
or micronaut-http-server-netty
on the server side and with micronaut-http-client
on the client side. So, the set of dependencies is the same as for example from Part 2 of my tutorial, which has been describing an approach to building classic web application using Micronaut Framework. Just to recap, here’s the list of the most important dependencies in this tutorial:
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-inject</artifactId>
</dependency>
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-runtime</artifactId>
</dependency>
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-http-server-netty</artifactId>
</dependency>
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-management</artifactId>
</dependency>
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-http-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micronaut.test</groupId>
<artifactId>micronaut-test-junit5</artifactId>
<scope>test</scope>
</dependency>
2. Controller
Let’s begin from server-side application and a controller implementation. There are some optional annotation for enabling validation (@Validated
) and ignoring security constraints (@Secured(SecurityRule.IS_ANONYMOUS)
) described in the previous parts of this tutorial. However, the most important thing in the implementation of controller visible below are the RxJava objects used in the return statements: Single
, Maybe
and Flowable
. The rest of implementation is pretty similar to a standard REST controller.
@Controller("/persons/reactive")
@Secured(SecurityRule.IS_ANONYMOUS)
@Validated
public class PersonReactiveController {
private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveController.class);
List<Person> persons = new ArrayList<>();
@Post
public Single<Person> add(@Body @Valid Person person) {
person.setId(persons.size() + 1);
persons.add(person);
return Single.just(person);
}
@Get("/{id}")
public Maybe<Person> findById(@PathVariable Integer id) {
return Maybe.just(persons.stream().filter(person -> person.getId().equals(id)).findAny().get());
}
@Get(value = "/stream", produces = MediaType.APPLICATION_JSON_STREAM)
public Flowable<Person> findAllStream() {
return Flowable.fromIterable(persons).doOnNext(person -> LOGGER.info("Server: {}", person));
}
}
In the implementation of controller visible above I used 3 of 5 available RxJava2 types that can be observed:
- Single – an observable that emits only one item and then completes. It ensures that one item will be sent, so it’s for non empty outputs.
- Maybe – works very similar to a
Single
, but with a particular difference: it can complete without emitting a value. This is useful when we have optional emissions. - Flowable – it emits a stream of elements and supports back pressure mechanism
Now, if you understand the meaning of basic observable types in RxJava the example controller should become pretty easy for you. We have three methods: add
for adding new element into the list, findById
for searching by id that may not return any element and findAllStream
that emits all elements as a stream. The last method has to produce application/x-json-stream
in order to take an advantage of reactive streams also on the client side. When using that type of content type, events are retrieved continuously by the HTTP client thanks to the Flowable
type.
3. Using Low-level Reactive HTTP Client
Micronaut Reactive offers two type of clients for accessing HTTP APIs: low-level client and declarative client. We can choose between HttpClient
, RxHttpClient
and RxStreamingHttpClient
with support for streaming data over HTTP. The recommended way for accessing reference to a client is through injecting it with @Client
annotation. However, the most suitable way inside JUnit test is with the create static method of the RxHttpClient
, since we may dynamically set port number.
Here’s the implementation of JUnit tests with low-level HTTP client. To read a Single
or Maybe
we are using method retrieve
of RxHttpClient
. After subscribing to an observable I’m using using ConcurrentUnit
library for handling asynchronous results in the test. For accessing Flowable
returned as application/x-json-stream
on the server side we need to use RxStreamingHttpClient
. It provides method jsonStream
, which is dedicated for reading a non-blocking stream of JSON objects.
@MicronautTest
public class PersonReactiveControllerTests {
private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveControllerTests.class);
@Inject
EmbeddedServer server;
@Test
public void testAdd() throws MalformedURLException, TimeoutException, InterruptedException {
final Waiter waiter = new Waiter();
final Person person = new Person(null, "Name100", "Surname100", 22, Gender.MALE);
RxHttpClient client = RxHttpClient.create(new URL("https://" + server.getHost() + ":" + server.getPort()));
Single<Person> s = client.retrieve(HttpRequest.POST("/persons/reactive", person), Person.class).firstOrError();
s.subscribe(person1 -> {
LOGGER.info("Retrieved: {}", person1);
waiter.assertNotNull(person1);
waiter.assertNotNull(person1.getId());
waiter.resume();
});
waiter.await(3000, TimeUnit.MILLISECONDS);
}
@Test
public void testFindById() throws MalformedURLException, TimeoutException, InterruptedException {
final Waiter waiter = new Waiter();
RxHttpClient client = RxHttpClient.create(new URL("https://" + server.getHost() + ":" + server.getPort()));
Maybe<Person> s = client.retrieve(HttpRequest.GET("/persons/reactive/1"), Person.class).firstElement();
s.subscribe(person1 -> {
LOGGER.info("Retrieved: {}", person1);
waiter.assertNotNull(person1);
waiter.assertEquals(1, person1.getId());
waiter.resume();
});
waiter.await(3000, TimeUnit.MILLISECONDS);
}
@Test
public void testFindAllStream() throws MalformedURLException, TimeoutException, InterruptedException {
final Waiter waiter = new Waiter();
RxStreamingHttpClient client = RxStreamingHttpClient.create(new URL("https://" + server.getHost() + ":" + server.getPort()));
client.jsonStream(HttpRequest.GET("/persons/reactive/stream"), Person.class)
.subscribe(s -> {
LOGGER.info("Client: {}", s);
waiter.assertNotNull(s);
waiter.resume();
});
waiter.await(3000, TimeUnit.MILLISECONDS, 9);
}
}
4. Back Pressure
One of the main term related to reactive programming is back pressure. Backpressure is resistance or force opposing the desired flow of data through software. In simple words, if a producer sends more events than a consumer is able to handle in a specific period of time, the consumer should be able to regulate the frequency of sending events on the producer side. Of course, Micronaut Reactive supports backpressure. Let’s analyze how we may control it for Micronaut application on some simple examples.
We may “somehow” control back pressure on the server-side and also on the client-side. However, the client is not able to regulate emission parameters on the server side due to the TCP transport layer. Ok, now let’s implement additional methods in our sample controller. I’m using fromCallable
for emitting elements within Flowable
stream. We are producing 9 elements by calling method repeat
method. The second newly created method is findAllStreamWithCallableDelayed
, which additionally delay each element on the stream 100 milliseconds. In this way, we can control back pressure on the server-side.
@Controller("/persons/reactive")
@Secured(SecurityRule.IS_ANONYMOUS)
@Validated
public class PersonReactiveController {
private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveController.class);
List<Person> persons = new ArrayList<>();
@Get(value = "/stream/callable", produces = MediaType.APPLICATION_JSON_STREAM)
public Flowable<Person> findAllStreamWithCallable() {
return Flowable.fromCallable(() -> {
int r = new Random().nextInt(100);
Person p = new Person(r, "Name"+r, "Surname"+r, r, Gender.MALE);
return p;
}).doOnNext(person -> LOGGER.info("Server: {}", person))
.repeat(9);
}
@Get(value = "/stream/callable/delayed", produces = MediaType.APPLICATION_JSON_STREAM)
public Flowable<Person> findAllStreamWithCallableDelayed() {
return Flowable.fromCallable(() -> {
int r = new Random().nextInt(100);
Person p = new Person(r, "Name"+r, "Surname"+r, r, Gender.MALE);
return p;
}).doOnNext(person -> LOGGER.info("Server: {}", person))
.repeat(9).delay(100, TimeUnit.MILLISECONDS);
}
}
We can also control back pressure on the client-side. However, as I have mentioned before it does not have any effect on the emission on the server-side. Now, let’s consider the following test that verifies the delayed stream on the server-side.
@Test
public void testFindAllStreamDelayed() throws MalformedURLException, TimeoutException, InterruptedException {
final Waiter waiter = new Waiter();
RxStreamingHttpClient client = RxStreamingHttpClient.create(new URL("https://" + server.getHost() + ":" + server.getPort()));
client.jsonStream(HttpRequest.GET("/persons/reactive/stream/callable/delayed"), Person.class)
.subscribe(s -> {
LOGGER.info("Client: {}", s);
waiter.assertNotNull(s);
waiter.resume();
});
waiter.await(3000, TimeUnit.MILLISECONDS, 9);
}
Here’s the result of the test. Since the server delay every element 100 milliseconds, the client receives and prints the element just after emission.
For a comparison let’s consider the following test. This time we are implementing our custom Subscriber
that requests a single element just after processing the previous one. The following test verifies not delayed stream.
@Test
public void testFindAllStreamWithCallable() throws MalformedURLException, TimeoutException, InterruptedException {
final Waiter waiter = new Waiter();
RxStreamingHttpClient client = RxStreamingHttpClient.create(new URL("https://" + server.getHost() + ":" + server.getPort()));
client.jsonStream(HttpRequest.GET("/persons/reactive/stream/callable"), Person.class)
.subscribe(new Subscriber<Person>() {
Subscription s;
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(1);
s = subscription;
}
@Override
public void onNext(Person person) {
LOGGER.info("Client: {}", person);
waiter.assertNotNull(person);
waiter.resume();
s.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
waiter.await(3000, TimeUnit.MILLISECONDS, 9);
}
Here’s the result of our test. It finishes succesfully, so the subscriber works properly. However, as you can see the back pressure is controlled just on the client side, since server emits all the elements, and then client retrieves them.
5. Declarative HTTP Client
Besides low-level HTTP client Micronaut Reactive allows to create declarative clients via the @Client
annotation. If you are familiar with for example OpenFeign
project or you have read the second part of my tutorial to Micronaut Framework you probably understand the concept of declarative client. In fact, we just need to create an interface with similar methods to the methods defined inside the controller and annotate them properly. The client interface should be annotated with @Client
as shown below. The interface is placed inside src/test/java
since it is used just in JUnit tests.
@Client("/persons/reactive")
public interface PersonReactiveClient {
@Post
Single<Person> add(@Body Person person);
@Get("/{id}")
Maybe<Person> findById(@PathVariable Integer id);
@Get(value = "/stream", produces = MediaType.APPLICATION_JSON_STREAM)
Flowable<Person> findAllStream();
}
The declarative client need to be injected into the test. Here are the same test methods as implemented for low-level client, but now with declarative client.
@MicronautTest
public class PersonReactiveControllerTests {
private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveControllerTests.class);
@Inject
EmbeddedServer server;
@Inject
PersonReactiveClient client;
@Test
public void testAddDeclarative() throws TimeoutException, InterruptedException {
final Waiter waiter = new Waiter();
final Person person = new Person(null, "Name100", "Surname100", 22, Gender.MALE);
Single<Person> s = client.add(person);
s.subscribe(person1 -> {
LOGGER.info("Retrieved: {}", person1);
waiter.assertNotNull(person1);
waiter.assertNotNull(person1.getId());
waiter.resume();
});
waiter.await(3000, TimeUnit.MILLISECONDS);
}
@Test
public void testFindByIdDeclarative() throws TimeoutException, InterruptedException {
final Waiter waiter = new Waiter();
Maybe<Person> s = client.findById(1);
s.subscribe(person1 -> {
LOGGER.info("Retrieved: {}", person1);
waiter.assertNotNull(person1);
waiter.assertEquals(1, person1.getId());
waiter.resume();
});
waiter.await(3000, TimeUnit.MILLISECONDS);
}
@Test
public void testFindAllStreamDeclarative() throws MalformedURLException, TimeoutException, InterruptedException {
final Waiter waiter = new Waiter();
Flowable<Person> persons = client.findAllStream();
persons.subscribe(s -> {
LOGGER.info("Client: {}", s);
waiter.assertNotNull(s);
waiter.resume();
});
waiter.await(3000, TimeUnit.MILLISECONDS, 9);
}
}
Source Code
We were using the same repository as for two previous parts of my Micronaut tutorial: https://github.com/piomin/sample-micronaut-applications.git.
Leave a Reply