Reactive Elasticsearch With Spring Boot


One of more notable feature introduced in the latest release of Spring Data is reactive support for Elasticsearch. Since Spring Data Moore we can take advantage of reactive template and repository. It is built on top of fully reactive Elasticsearch REST client, that is based on Spring WebClient. It is also worth to mention about support for reactive Querydsl, which can be included to your application through ReactiveQueryPredicateExecutor.
I have already show you how to use Spring Data Repositories for synchronous integration with Elasticsearch API in one of my previous articles Elasticsearch with Spring Boot. There is no a big difference between using standard and reactive Spring Data Repositories. I’ll focus on showing you those differences on simple sample application used also in the previous article. Therefore it is worth to read my previous article before reading this.

1. Dependencies

I’m using the latest stable version of Spring Boot with JDK 11.

<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>2.2.0.RELEASE</version>
	<relativePath/>
</parent>
<properties>
	<java.version>11</java.version>
</properties>

We need to include Spring WebFlux and Spring Data Elasticsearch starters. We will also use actuator for exposing healthchecks, and some libraries for automated testing like Spring Test and Testcontainers.

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-webflux</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-actuator</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.testcontainers</groupId>
		<artifactId>elasticsearch</artifactId>
		<version>1.12.2</version>
		<scope>test</scope>
	</dependency>
</dependencies>

2. Enabling Reactive Repositories

Before start working with reactive Spring Data repositories we should enable it by annotating the main or configuration class with @EnableReactiveElasticsearchRepositories.

@SpringBootApplication
@EnableReactiveElasticsearchRepositories
public class SampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SampleApplication.class, args);
    }

    @Bean
    @ConditionalOnProperty("initial-import.enabled")
    public SampleDataSet dataSet() {
        return new SampleDataSet();
    }

}

3. Building Repositories

Spring Data Elasticsearch comes with three interfaces that supports reactive operations: ReactiveRepository, ReactiveCrudRepository that adds save/update operations, and ReactiveSortingRepository offering some methods with sorting. The usage is the same as earlier – we just need to create our own repository that extends one of interfaces listed above. We can also add some custom find method following Spring Data query naming convention. Similarly to all other Spring reactive projects Spring Data Elasticsearch Repositories support is built on top of Project Reactor.

@Repository
public interface EmployeeRepository extends ReactiveCrudRepository<Employee, Long> {

    Flux<Employee> findByOrganizationName(String name);
    Flux<Employee> findByName(String name);

}

Here’s our model class:

@Document(indexName = "sample", type = "employee")
public class Employee {

    @Id
    private Long id;
    @Field(type = FieldType.Object)
    private Organization organization;
    @Field(type = FieldType.Object)
    private Department department;
    private String name;
    private int age;
    private String position;
	
}

3. Building Controller

We will expose some reactive CRUD methods outside application using Spring WebFlux. Here’s our controller class implementation:

@RestController
@RequestMapping("/employees")
public class EmployeeController {

	@Autowired
	EmployeeRepository repository;

	@PostMapping
	public Mono<Employee> add(@RequestBody Employee employee) {
		return repository.save(employee);
	}

	@GetMapping("/{name}")
	public Flux<Employee> findByName(@PathVariable("name") String name) {
		return repository.findByName(name);
	}

	@GetMapping
	public Flux<Employee> findAll() {
		return repository.findAll();
	}

	@GetMapping("/organization/{organizationName}")
	public Flux<Employee> findByOrganizationName(@PathVariable("organizationName") String organizationName) {
		return repository.findByOrganizationName(organizationName);
	}

}

4. Running Application

For the tests purpose we need single node Elasticsearch instance running in development mode. As usual we will use Docker container. Here’s the command that starts Docker container and exposes it on port 9200.

$ docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" elasticsearch:6.6.2

My Docker Machine is available on virtual address 192.168.99.100, so I had to override Elasticsearch address in Spring Boot configuration file. Because Elasticsearch reactive repositories use ReactiveElasticsearchClient we have to set property spring.data.elasticsearch.client.reactive.endpoints to 192.168.99.100:9200. Actuator still uses synchronous REST client for detecting Elasticsearch status in healthcheck, so we also need to override default address in spring.elasticsearch.rest.uris property.

spring:
  application:
    name: sample-spring-elasticsearch
  data:
    elasticsearch:
      client:
        reactive:
          endpoints: 192.168.99.100:9200
  elasticsearch:
    rest:
      uris: http://192.168.99.100:9200

5. Testing

The same as with synchronous repositories we use Testcontainers for JUnit tests. The only difference is that we need to block repository method when verifying the result of the test.

@RunWith(SpringRunner.class)
@SpringBootTest
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class EmployeeRepositoryTest {

    @ClassRule
    public static ElasticsearchContainer container = new ElasticsearchContainer();
    @Autowired
    EmployeeRepository repository;

    @BeforeClass
    public static void before() {
        System.setProperty("spring.data.elasticsearch.client.reactive.endpoints", container.getContainerIpAddress() + ":" + container.getMappedPort(9200));
    }

    @Test
    public void testAdd() {
        Employee employee = new Employee();
        employee.setId(1L);
        employee.setName("John Smith");
        employee.setAge(33);
        employee.setPosition("Developer");
        employee.setDepartment(new Department(1L, "TestD"));
        employee.setOrganization(new Organization(1L, "TestO", "Test Street No. 1"));
        Mono<Employee> employeeSaved = repository.save(employee);
        Assert.assertNotNull(employeeSaved.block());
    }

    @Test
    public void testFindAll() {
        Flux<Employee> employees = repository.findAll();
        Assert.assertTrue(employees.count().block() > 0);
    }

    @Test
    public void testFindByOrganization() {
        Flux<Employee> employees = repository.findByOrganizationName("TestO");
        Assert.assertTrue(employees.count().block() > 0);
    }

    @Test
    public void testFindByName() {
        Flux<Employee> employees = repository.findByName("John Smith");
        Assert.assertTrue(employees.count().block() > 0);
    }

}

Source Code

For the current sample I’m using the same repository as for the sample with synchronous repositories. I created a new branch reactive for that. Here’s GitHub repository address https://github.com/piomin/sample-spring-elasticsearch/tree/reactive.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.