Reactive programming with Project Reactor
If you are building reactive microservices you would probably have to merge data streams from different source APIs into a single result stream. It inspired me to create this article containing some most common scenarios of using reactive streams in microservice-based architecture during inter-service communication.
I have already described some aspects related to reactive programming with Spring based on Spring WebFlux and Spring Data JDBC projects in the following articles:
- Reactive Microservices with Spring WebFlux and Spring Cloud
- Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux
Spring Framework supports reactive programming since version 5. That support is built on top of Project Reactor – https://projectreactor.io. Reactor is a fourth-generation Reactive programming library for building non-blocking applications on the JVM based on the Reactive Streams Specification. Working with this library can be difficult at first, especially if you don’t have any experience with reactive streams. Reactive Core gives us two data types that enable us to produce a stream of data: Mono
and Flux
. With Flux
we can emit 0..nelements, while with Mono
we can create a stream of 0..1elements. Both those types implement Publisher
interface. Both these types are lazy, which means they won’t be executed until you consume it. Therefore, when building reactive APIs it is important not to block the stream. Spring WebFlux doesn’t allow that.
Introduction
The sample project is available on GitHub in repository reactive-playground https://github.com/piomin/reactive-playground.git. It is written in Kotlin. In addition to some Kotlin libraries, only a single dependency that needs to be added in order to use Project Reactor is reactor-core
.
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.1.RELEASE</version>
</dependency>
I would not like to show you the features of Project Reactor based on simple String objects like in many other articles. Therefore, I have created the following class hierarchy for our tests, that allows us to simulate APIs built for three different domain objects.
Class Organization
contains a list of Employee
and Department
. Each department contains a list of Employee
assigned only to the given department inside the organization. Class Employee
has properties: organizationId
that assigns it to the organization and departmentId
that assigns it to the department.
data class Employee(var id: Int, var name: String, var salary: Int) {
var organizationId: Int? = null
var departmentId: Int? = null
constructor(id: Int, name: String, salary: Int, organizationId: Int, departmentId: Int) : this(id, name, salary) {
this.organizationId = organizationId
this.departmentId = departmentId
}
constructor(id: Int, name: String, salary: Int, organizationId: Int) : this(id, name, salary) {
this.organizationId = organizationId
}
}
Here’s the implementation of Department
class.
class Department(var id: Int, var name: String, var organizationId: Int) {
var employees: MutableList<Employee> = ArrayList()
constructor(id: Int, name: String, organizationId: Int, employees: MutableList<Employee>) : this(id, name, organizationId) {
this.employees.addAll(employees)
}
fun addEmployees(employees: MutableList<Employee>) : Department {
this.employees.addAll(employees)
return this
}
fun addEmployee(employee: Employee) : Department {
this.employees.add(employee)
return this
}
}
Here’s the implementation of Organization
class.
class Organization(var id: Int, var name: String) {
var employees: MutableList<Employee> = ArrayList()
var departments: MutableList<Department> = ArrayList()
constructor(id: Int, name: String, employees: MutableList<Employee>, departments: MutableList<Department>) : this(id, name){
this.employees.addAll(employees)
this.departments.addAll(departments)
}
constructor(id: Int, name: String, employees: MutableList<Employee>) : this(id, name){
this.employees.addAll(employees)
}
}
Scenario 1
We have API methods that return data streams. First of them return Flux
emitting employees assigned to the given organization. Second of them just returns Mono
with the current organization.
private fun getOrganizationByName(name: String) : Mono<Organization> {
return Mono.just(Organization(1, name))
}
private fun getEmployeesByOrganization(id: Int) : Flux<Employee> {
return Flux.just(Employee(1, "Employee1", 1000, id),
Employee(2, "Employee2", 2000, id))
}
We would like to return the single stream emitting organization that contains a list of employees as shown below.
Here’s the solution. We use the zipWhen
method that waits for the result from source Mono
, and then calls the second Mono
. Because we can zip only the same stream types (in that case these are Mono
) we need to convert Flux<Employee>
returned by getEmployeesByOrganization
method into Mono<MutableList<Employee>>
using collectList
function. Thanks to zipWhen
we can then combine two Mono
streams and create new objects inside map
function.
@Test
fun testScenario1() {
val organization : Mono<Organization> = getOrganizationByName("test")
.zipWhen { organization ->
getEmployeesByOrganization(organization.id!!).collectList()
}
.map { tuple ->
Organization(tuple.t1.id, tuple.t1.name, tuple.t2)
}
}
Scenario 2
Let’s consider another scenario. Now, we have Flux streams that emit employees and departments. Every employee has property departmentId
responsible for assignment to the department.
private fun getDepartments() : Flux<Department> {
return Flux.just(Department(1, "X", 1),
Department(2, "Y", 1))
}
private fun getEmployees() : Flux<Employee> {
return Flux.just(Employee(1, "Employee1", 1000, 1, 1),
Employee(2, "Employee2", 2000, 1, 1),
Employee(3, "Employee3", 1000, 1, 2),
Employee(4, "Employee4", 2000, 1, 2))
}
The goal is to merge those two streams and return the single Flux
stream emitting departments that contains all employees assigned to the given department. Here’s the picture that illustrates the transformation described above.
We can do that in two ways as shown below. First calls flatMap
function on stream with departments. Inside flatMap
we zip every single Department
with a stream of employees. That stream is then filtered by departmentId
and converted into Mono
type. Finally, we are creating a Mono
type using map
function that emits a department containing a list of employees.
The second way groups Flux
with employees by departmentId
. Then it invokes zipping and mapping functions similar to the previous way.
@Test
fun testScenario2() {
val departments: Flux<Department> = getDepartments()
.flatMap { department ->
Mono.just(department)
.zipWith(getEmployees().filter { it.departmentId == department.id }.collectList())
.map { t -> t.t1.addEmployees(t.t2) }
}
val departments2: Flux<Department> = getEmployees()
.groupBy { it.departmentId }
.flatMap { t -> getDepartments().filter { it.id == t.key() }.elementAt(0)
.zipWith(t.collectList())
.map { it.t1.addEmployees(it.t2) }
}
}
Scenario 3
This scenario is simpler than two previous scenarios. We have two API methods that emit Flux
with the same object types. First of them contains list of employees having id
, name
, salary
properties, while the second id
, organizationId
, departmentId
properties.
private fun getEmployeesBasic() : Flux<Employee> {
return Flux.just(Employee(1, "AA", 1000),
Employee(2, "BB", 2000))
}
private fun getEmployeesRelationships() : Flux<Employee> {
return Flux.just(Employee(1, 1, 1),
Employee(2, 1, 2))
}
We want to convert it into a single stream emitting employees with a full set of properties. The following picture illustrates the described transformation.
In that case the solution is pretty simple. We are zipping two Flux
streams using zipWith
function, and then map two zipped objects into a single containing the full set of properties.
@Test
fun testScenario3() {
val employees : Flux<Employee> = getEmployeesBasic()
.zipWith(getEmployeesRelationships())
.map { t -> Employee(t.t1.id, t.t1.name, t.t1.salary, t.t2.organizationId!!, t.t2.departmentId!!) }
}
Scenario 4
In this scenario we have two independent Flux
streams that emit the same type of objects – Employee
.
private fun getEmployeesFirstPart() : Flux<Employee> {
return Flux.just(Employee(1, "AA", 1000), Employee(3, "BB", 3000))
}
private fun getEmployeesSecondPart() : Flux<Employee> {
return Flux.just(Employee(2, "CC", 2000), Employee(4, "DD", 4000))
}
We would like to merge those two streams into a single stream ordered by id
. The following picture shows that transformation.
Here’s the solution. We use mergeOrderedWith
function with a comparator that compares id
. Then we can perform some transformations on every object, but it is only an option that shows the usage on map
function.
@Test
fun testScenario4() {
val persons: Flux<Employee> = getEmployeesFirstPart()
.mergeOrderedWith(getEmployeesSecondPart(), Comparator { o1, o2 -> o1.id.compareTo(o2.id) })
.map {
Employee(it.id, it.name, it.salary, 1, 1)
}
}
Scenario 5
And the last scenario in this article. We have a single input stream Mono
with Organization
that contains a list of departments. Each of department inside that list also contains the list of all employees assigned to the given department. Here’s our API method implementation.
private fun getDepartmentsByOrganization(id: Int) : Flux<Department> {
val dep1 = Department(1, "A", id, mutableListOf(
Employee(1, "Employee1", 1000, id, 1),
Employee(2, "Employee2", 2000, id, 1)
)
)
val dep2 = Department(2, "B", id, mutableListOf(
Employee(3, "Employee3", 1000, id, 2),
Employee(4, "Employee4", 2000, id, 2)
)
)
return Flux.just(dep1, dep2)
}
The goal is to convert the stream to the same stream Flux
with Department
, but containing a list of all employees in the department. The following picture visualizes the described transformation.
Here’s the solution. We invoke flatMapIterable
function that converts Flux
with Department> into
Flux
with Employees
by returning List
of Employee
. Then we convert it to Mono
and add to the newly created Organization
object inside map
function.
@Test
fun testScenario5() {
var organization: Mono<Organization> = getDepartmentsByOrganization(1)
.flatMapIterable { department -> department.employees }
.collectList()
.map { t -> Organization(1, "X", t) }
}
Leave a ReplyCancel reply