Reactive Spring Boot with WebFlux, R2DBC and Postgres
In this article, you will learn how to implement and test reactive Spring Boot apps using Spring WebFlux, R2DBC, and Postgres database. We will create two simple apps written in Kotlin using the latest version of Spring Boot 3. Our apps expose some REST endpoints over HTTP. In order to test the communication between them and integration with the Postgres database, we will use Testcontainers and Netty Mock Server.
If you are looking for more guides to Spring Boot 3, you can look at other posts on my blog. In that article, I’m describing how to build a microservices architecture with Spring Boot 3 and Spring Cloud. You can also read about the latest changes in observability with Spring Boot 3 and learn how to integrate your app with Grafana Stack in that article. Of course, these are just a few examples – you can find more content in this area on my blog.
Source Code
If you would like to try it by yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. It contains two apps inside employee-service
and organization-service
directories. After that, you should just follow my instructions.
Dependencies
In the first step, we will add several dependencies related to Kotlin. Besides the standard libraries, we can include Kotlin support for Jackson (JSON serialization/deserialization):
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
</dependency>
We also need to include two Spring Boot Starters. In order to create a reactive Spring @Controller
, we need to use the Spring WebFlux module. With Spring Boot Data R2DBC Starter we can use Spring Data Repositories in a reactive way. Finally, we have to include the Postgres driver provided by R2DBC.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<scope>runtime</scope>
</dependency>
There are several testing dependencies in our project. We need to include a standard Spring Boot Test Starter, Testcontainers with JUnit 5, Postgres, and R2DBC support, and finally Mock Server Netty module for mocking reactive API. It is also worth adding the spring-boot-testcontainers
module to take advantage of built-in integration between Spring Boot and Testcontainers.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>r2dbc</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty</artifactId>
<version>5.15.0</version>
<scope>test</scope>
</dependency>
The last dependency is optional. We can include Spring Boot Actuator in our apps. It adds R2DBC connection status to health checks and several metrics with the pool status.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Implement the Spring Reactive App
Here’s our model class for the first app – employee-service
:
class Employee(val name: String,
val salary: Int,
val organizationId: Int) {
@Id var id: Int? = null
}
Here’s the repository interface. It needs to extend the R2dbcRepository
interface. The same as with the standard Spring Data Repositories we can define several find methods. However, instead of the entities they wrap the return objects with the Reactor Mono
or Flux
.
interface EmployeeRepository: R2dbcRepository<Employee, Int> {
fun findByOrganizationId(organizationId: Int): Flux<Employee>
}
Here’s the implementation of our @RestController
. We need to inject the EmployeeRepository
bean. Then we use the repository bean to interact with the database in a reactive way. Our endpoints also return objects wrapped by the Reactor Mono
and Flux
. There are three find endpoints and a single POST endpoint:
- Searching all the employees (1)
- Searching by the employee
id
(2) - Searching all employees by the organization
id
(3) - Adding new employees (4)
@RestController
@RequestMapping("/employees")
class EmployeeController {
@Autowired
lateinit var repository : EmployeeRepository
@GetMapping // (1)
fun findAll() : Flux<Employee> = repository.findAll()
@GetMapping("/{id}") // (2)
fun findById(@PathVariable id: Int) : Mono<Employee> =
repository.findById(id)
@GetMapping("/organization/{organizationId}") // (3)
fun findByOrganizationId(@PathVariable organizationId: Int):
Flux<Employee> = repository.findByOrganizationId(organizationId)
@PostMapping // (4)
fun add(@RequestBody employee: Employee) : Mono<Employee> =
repository.save(employee)
}
We also need to configure database connection settings in the Spring Boot application.yml
:
spring:
application:
name: employee-service
r2dbc:
url: r2dbc:postgresql://localhost:5432/spring
username: spring
password: spring123
Here’s our main class. We want our app to create a table in the database on startup. With R2DBC we need to prepare a fragment of code for populating the schema with the schema.sql
file.
@SpringBootApplication
class EmployeeApplication {
@Bean
fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? {
val initializer = ConnectionFactoryInitializer()
initializer.setConnectionFactory(connectionFactory)
initializer.setDatabasePopulator(
ResourceDatabasePopulator(ClassPathResource("schema.sql")))
return initializer
}
}
fun main(args: Array<String>) {
runApplication<EmployeeApplication>(*args)
}
Then just place the schema.sql file in the src/main/resources
directory.
CREATE TABLE employee (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
salary INT,
organization_id INT
);
Let’s switch to the organization-service
. The implementation is pretty similar. Hore’s our domain model class:
class Organization(var name: String) {
@Id var id: Int? = null
}
Our app is communicating with the employee-service
. Therefore, we need to define the WebClient bean. It gets the address of the target service from application properties.
@SpringBootApplication
class OrganizationApplication {
@Bean
fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? {
val initializer = ConnectionFactoryInitializer()
initializer.setConnectionFactory(connectionFactory)
initializer.setDatabasePopulator(
ResourceDatabasePopulator(ClassPathResource("schema.sql")))
return initializer
}
@Value("\${employee.client.url}")
private lateinit var employeeUrl: String
@Bean
fun webClient(builder: WebClient.Builder): WebClient {
return builder.baseUrl(employeeUrl).build()
}
}
fun main(args: Array<String>) {
runApplication<OrganizationApplication>(*args)
}
There is also the repository interface OrganizationRepository
. Our @RestController
uses a repository bean to interact with the database and the WebClient bean to call the endpoint exposed by the employee-service
. As the response from the GET /employees/{id}/with-employees
it returns the OrganizationDTO
.
@RestController
@RequestMapping("/organizations")
class OrganizationController {
@Autowired
lateinit var repository : OrganizationRepository
@Autowired
lateinit var client : WebClient
@GetMapping
fun findAll() : Flux<Organization> = repository.findAll()
@GetMapping("/{id}")
fun findById(@PathVariable id : Int): Mono<Organization> =
repository.findById(id)
@GetMapping("/{id}/with-employees")
fun findByIdWithEmployees(@PathVariable id : Int) : Mono<OrganizationDTO> {
val employees : Flux<Employee> = client.get().uri("/employees/organization/$id")
.retrieve().bodyToFlux(Employee::class.java)
val org : Mono<Organization> = repository.findById(id)
return org.zipWith(employees.collectList()).log()
.map { tuple -> OrganizationDTO(tuple.t1.id as Int, tuple.t1.name, tuple.t2) }
}
@PostMapping
fun add(@RequestBody employee: Organization) : Mono<Organization> =
repository.save(employee)
}
Here’s the implementation of our DTO:
data class OrganizationDTO(var id: Int?, var name: String) {
var employees : MutableList<Employee> = ArrayList()
constructor(employees: MutableList<Employee>) : this(null, "") {
this.employees = employees
}
constructor(id: Int, name: String, employees: MutableList<Employee>) : this(id, name) {
this.employees = employees
}
}
Testing with Integrations
Once we finished the implementation we can prepare several integration tests. As I mentioned at the begging, we will use Testcontainers for running the Postgres container during the tests. Our test runs the app and leverages the auto-configured instance of WebTestClient
to call the API endpoints (1). We need to start the Postgres container before the tests. So we need to define the container bean inside the companion object
section (2). With the @ServiceConnection
annotation we don’t have to manually set the properties – Spring Boot will do it for us (3).
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class EmployeeControllerTests {
@Autowired
private lateinit var webTestClient: WebTestClient // (1)
companion object { // (2)
@Container
@ServiceConnection // (3)
val container = PostgreSQLContainer<Nothing>("postgres:14").apply {
withDatabaseName("spring")
withUsername("spring")
withPassword("spring123")
}
}
@Test
@Order(1)
fun shouldStart() {
}
@Test
@Order(2)
fun shouldAddEmployee() {
webTestClient.post().uri("/employees")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(Employee("Test", 1000, 1))
.exchange()
.expectStatus().is2xxSuccessful
.expectBody()
.jsonPath("$.id").isNotEmpty
}
@Test
@Order(3)
fun shouldFindEmployee() {
webTestClient.get().uri("/employees/1")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().is2xxSuccessful
.expectBody()
.jsonPath("$.id").isNotEmpty
}
@Test
@Order(3)
fun shouldFindEmployees() {
webTestClient.get().uri("/employees")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().is2xxSuccessful
.expectBody().jsonPath("$.length()").isEqualTo(1)
.jsonPath("$[0].id").isNotEmpty
}
}
The test class for the organization-service
is a little bit more complicated. That’s because we need to mock the communication with the employee-service
. In order to do that we use the ClientAndServer
object (1). It is started once before all the tests (2) and stopped after the tests (3). We are mocking the GET /employees/organization/{id}
endpoint, which is invoked by the organization-service
(4). Then we are calling the organization-service GET /organizations/{id}/with-employees
endpoint (5). Finally, we are verifying if it returns the list of employees inside the JSON response.
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class OrganizationControllerTests {
@Autowired
private lateinit var webTestClient: WebTestClient
companion object {
@Container
@ServiceConnection
val container = PostgreSQLContainer<Nothing>("postgres:14").apply {
withDatabaseName("spring")
withUsername("spring")
withPassword("spring123")
}
private var mockServer: ClientAndServer? = null // (1)
@BeforeAll
@JvmStatic
internal fun beforeAll() { // (2)
mockServer = ClientAndServer.startClientAndServer(8090);
}
@AfterAll
@JvmStatic
internal fun afterAll() { // (3)
mockServer!!.stop()
}
}
@Test
@Order(1)
fun shouldStart() {
}
@Test
@Order(2)
fun shouldAddOrganization() {
webTestClient.post().uri("/organizations")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(Organization("Test"))
.exchange()
.expectStatus().is2xxSuccessful
.expectBody()
.jsonPath("$.id").isNotEmpty
}
@Test
@Order(3)
fun shouldFindOrganization() {
webTestClient.get().uri("/organizations/1")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().is2xxSuccessful
.expectBody()
.jsonPath("$.id").isNotEmpty
}
@Test
@Order(3)
fun shouldFindOrganizations() {
webTestClient.get().uri("/organizations")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().is2xxSuccessful
.expectBody().jsonPath("$.length()").isEqualTo(1)
.jsonPath("$[0].id").isNotEmpty
}
@Test
@Order(3)
fun shouldFindOrganizationWithEmployees() { // (4)
mockServer!!.`when`(request()
.withMethod("GET")
.withPath("/employees/organization/1"))
.respond(response()
.withStatusCode(200)
.withContentType(MediaType.APPLICATION_JSON)
.withBody(createEmployees()))
webTestClient.get().uri("/organizations/1/with-employees")
.accept(MediaType.APPLICATION_JSON) // (5)
.exchange()
.expectStatus().is2xxSuccessful
.expectBody()
.jsonPath("$.id").isNotEmpty
.jsonPath("$.employees.length()").isEqualTo(2)
.jsonPath("$.employees[0].id").isEqualTo(1)
.jsonPath("$.employees[1].id").isEqualTo(2)
}
private fun createEmployees(): String {
val employees: List<Employee> = listOf<Employee>(
Employee(1, "Test1", 10000, 1),
Employee(2, "Test2", 20000, 1)
)
return jacksonObjectMapper().writeValueAsString(employees)
}
}
You can easily verify that all the tests are finished successfully by running them on your laptop. After cloning the repository you need to run Docker and build the apps with the following Maven command:
$ mvn clean package
We can also prepare the build definition for our apps on CircleCI. Since we need to run Testcontainers, we need a machine with a Docker daemon. Here’s the configuration of a built pipeline for CircleCI inside the .circle/config.yml
file:
version: 2.1
jobs:
build:
docker:
- image: 'cimg/openjdk:20.0'
steps:
- checkout
- run:
name: Analyze on SonarCloud
command: mvn verify sonar:sonar -DskipTests
executors:
machine_executor_amd64:
machine:
image: ubuntu-2204:2022.04.2
environment:
architecture: "amd64"
platform: "linux/amd64"
orbs:
maven: circleci/maven@1.4.1
workflows:
maven_test:
jobs:
- maven/test:
executor: machine_executor_amd64
- build:
context: SonarCloud
Here’s the result of the build on CircleCI:
If you have Docker running you can also start our Spring Boot reactive apps with the Postgres container. It is possible thanks to the spring-boot-testcontainers
module. There is a dedicated @TestConfiguration
class that may be used to run Postgres in dev mode:
@TestConfiguration
class PostgresContainerDevMode {
@Bean
@ServiceConnection
fun postgresql(): PostgreSQLContainer<*>? {
return PostgreSQLContainer("postgres:14.0")
.withUsername("spring")
.withPassword("spring123")
}
}
Now, we need to define the “test” main class that uses the configuration provided within the PostgresContainerDevMode
class.
class EmployeeApplicationTest
fun main(args: Array<String>) {
fromApplication<EmployeeApplication>()
.with(PostgresContainerDevMode::class)
.run(*args)
}
In order to run the app in dev Postgres on Docker just execute the following Maven command:
$ mvn spring-boot:test-run
Leave a Reply