Using Reactive WebClient with Spring WebFlux


Reactive APIs and generally reactive programming become increasingly popular lately. You have a change to observe more and more new frameworks and toolkits supporting reactive programming, or just dedicated for this. Today, in the era of microservices architecture, where the network communication through APIs becomes critical for applications, reactive APIs seems to be an attractive alternative to a traditional, synchronous approach. It should be definitely considered as a primary approach if you are working with large streams of data exchanged via network communication.
Spring supports reactive programming and reactive APIs too. You could have a change to read about it in some of my previous articles where I focused on introducing that support. In the article Reactive Microservices with Spring WebFlux and Spring Cloud you can read more about building microservices architecture using Spring WebFlux together with Spring Cloud projects. In turn, in the articles Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux and Reactive Elasticsearch with Spring Boot I have introduced reactive Spring Data repositories on an example of PostgreSQL and Elasticsearch. Those articles should be treated as an introduction to reactive programming with Spring. Today, I would like to go deeply into that topic and discuss some aspects related to the network communication between service that exposes reactive stream via API and service that consumes this API using Spring WebClient.

1. Access reactive stream using WebClient

First, let’s consider the typical scenario of reading reactive API on the consumer side. e have the following implementation of reactive stream containing Person objects on the producer side:

@RestController
@RequestMapping("/persons")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @GetMapping("/json")
    public Flux<Person> findPersonsJson() {
        return Flux.fromStream(this::prepareStream)
                .doOnNext(person -> LOGGER.info("Server produces: {}", person));
    }
}

We can easily access it with non-blocking WebClient. The following test starts sample Spring WebFlux application, defines WebClient instance and subscribes to the response stream.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class SampleSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxTest.class);
    final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();

    @Test
    public void testFindPersonsJson() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flux<Person> persons = client.get().uri("/persons/json").retrieve().bodyToFlux(Person.class);
        persons.subscribe(person -> {
            waiter.assertNotNull(person);
            LOGGER.info("Client subscribes: {}", person);
            waiter.resume();
        });
        waiter.await(3000, 9);
    }
}

In reactive programming with Reactor and Spring WebFlux you first need to subscribe to the stream in order to be able to access emitted objects. Assuming that our test stream has 9 Person elements you will receive the following log output:

webclient-1

Let’s think what happened here. Our reactive stream on the server side has been returned as a JSON array response to the consumer. It means that our reactive client is able to start reading the stream only after completion of the elements emission on the server side. That’s not exactly what we wanted to achieve, because we would like to take an advantage of reactive streams also on the client side, and be able to read every element on stream just after it has been emitted by the producer. But with application/json content type it is just not possible, what is perfectly seen on the fragment of response below.

webclient-3

2. Expose event stream using Spring WebFlux

The problem defined in the previous section lies obviously on the server side. WebClient is able to read reactive stream continuously, but the producer needs to “properly” emit it. To achieve it we should set content type to application/json+stream or text/event-stream.

@RestController
@RequestMapping("/persons")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @GetMapping(value = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Person> findPersonsStream() {
        return Flux.fromStream(this::prepareStream).delaySequence(Duration.ofMillis(100))
                .doOnNext(person -> LOGGER.info("Server produces: {}", person));
    }
}

Now, our response looks slightly different than before as shown below.

webclient-4

To see the effect during the test I delayed the whole stream a little (around 100 milliseconds), because it takes a time before subscribes starts to receive elements from the stream after subscribing to it. The current test is the same as the previous test, we are subscribing to the response stream and printing all the elements.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class SampleSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxTest.class);
    final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();

    @Test
    public void testFindPersonsStream() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flux<Person> persons = client.get().uri("/persons/stream").retrieve().bodyToFlux(Person.class);
        persons.subscribe(person -> {
            LOGGER.info("Client subscribes: {}", person);
            waiter.assertNotNull(person);
            waiter.resume();
        });
        waiter.await(3000, 9);
    }
}

Here are the results. As you see the elements are processing on the client side just after being emitted by the producer. What is worth to note – all the elements are sending within the same thread.

webclient-2

3. Implementing backpressure

Backpressure is one of the most important reason you would decide to use reactive programming. Following Spring WebFlux documentation it supports backpressure, since Project Reactor is a Reactive Streams library and, therefore, all of its operators support non-blocking back pressure. The whole sentence is of course conform to the truth, but only on the server-side. Maybe the next fragment of documentation shall shed some light on things: Reactor has a strong focus on server-side Java.. We should remember that Spring WebClient and WebFlux uses TCP transport for communication between a client and the server. And therefore, a client is not able to regulate the frequency of elements emission on the server side. I don’t want to go into the details right now, for the explanation of that situation you may refer to the following post https://stackoverflow.com/questions/52244808/backpressure-mechanism-in-spring-web-flux.
Ok, before proceeding let’s recap the definition of backpressure term. Backpressure (or back pressure) is a resistance or force opposing the desired flow of data through software. In simple words, if a producer send more events than a consumer is able to handle in the specific period of time, the consumer should be able to regulate the frequency of sending events on the producer side. Let’s consider the following test example.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class SampleSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxTest.class);
    final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();

    @Test
    public void testFindPersonsStreamBackPressure() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flux<Person> persons = client.get().uri("/persons/stream/back-pressure").retrieve().bodyToFlux(Person.class);
        persons.map(this::doSomeSlowWork).subscribe(person -> {
            waiter.assertNotNull(person);
            LOGGER.info("Client subscribes: {}", person);
            waiter.resume();
        });
        waiter.await(3000, 9);
    }

    private Person doSomeSlowWork(Person person) {
        try {
            Thread.sleep(90);
        }
        catch (InterruptedException e) { }
        return person;
    }
}

After receiving a stream of elements our test calls a time-expensive mapping method on each element. So, it is not able to handle so many elements as has been sent by the producer. In this case the only way to somehow “regulate” backpressure is through delayElements method on the server side. I also tried to use limitRate method on the service side and implement my own custom Subscriber on the client side, but I wasn’t successful. Here’s the current implementation of our API method for returning stream of Person objects.

@RestController
@RequestMapping("/persons")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @GetMapping(value = "/stream/back-pressure", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Person> findPersonsStreamBackPressure() {
        return Flux.fromStream(this::prepareStream).delayElements(Duration.ofMillis(100))
                .doOnNext(person -> LOGGER.info("Server produces: {}", person));
    }
}

After running the test we can see that every element is 100 milliseconds delayed, and also producer uses the whole pool of threads for emitting stream of objects.

webclient-3

Source Code

The source code of sample application and JUnit tests is as always available on GitHub. The repository address is https://github.com/piomin/sample-spring-webflux.git.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.