Introduction to event-driven microservices with Spring Cloud Stream

Introduction to event-driven microservices with Spring Cloud Stream

spring-cloud-stream-microservices-logo

Spring Cloud Stream framework allows us to easily include well-known Spring patterns and best practices to applications while implementing event-driven microservices architecture. It uses the Spring Integration project to provide connectivity to a message broker. It provides built-in support for such features as a persistent publish-subscribe model, consumer grouping, and partitioning. The integration with a specific message broker solution is realized by binder implementations that are hidden behind the middleware-neutral core.
The currently described version of Spring Cloud Stream is 3.0.3.RELEASE within Spring Cloud Release Train Hoxton.SR3.

For a more detailed introduction to a process of building Spring Cloud Stream microservices architecture with you can refer to my video course: Microservices With Spring Boot And Spring Cloud: Part 5 – Event-driven microservices.

Example of Spring Cloud Stream microservices

Our sample system consists of three microservices producer-service, consumer-a-service, and consumer-b-service. Each of them is connecting with the RabbitMQ broker. The application producer-service is responsible for sending events to a message broker, while two other microservices are listening for the incoming events. Communication between applications follows a publish-subscribe model, where data is broadcast through shared topics. Our consumer application is enabling such mechanisms like consumer grouping to guarantee that only a single instance of the application is handling the same event, and partitioning for assigning events to the selected instance of application basing on routing key set on the producer side. The following picture is visualizing a currently described architecture.

The source code of sample applications is as usual available on GitHub. You may find it in the repository https://github.com/piomin/course-spring-microservices.git. That repository also contains code snippets of applications used in previous parts of my video course, so you should go to directory event-driven to access the right samples.

Dependencies to Spring Cloud Stream

To enable Spring Cloud Stream for our application we first need to include the right binder implementation library. Because we are integrating with RabbitMQ we have to use spring-cloud-stream-binder-rabbit artifact. It is referencing to all other required libraries, so we don’t have to include any other dependencies.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

Messaging between Spring Cloud Stream microservices

Each message that is sent to the message broker is automatically serialized to JSON. Here’s a class that is the object representation of JSON payload exchanged by the applications.

data class CallmeEvent(val id: Int = 0, val message: String = "")

Producer application

Here’s the implementation of Supplier bean responsible for generating a stream of events continuously. By default it is sending CallmeEvent once a second. It is incrementing id field on every new event and set header to_process to true. Basing on the value of this header we are routing messages on the consumer side. The mechanism may be easily disabled by setting property callme.supplier.enabled to false.

@SpringBootApplication
class ProductionApplication {

    var id: Int = 0

    @Value("\${callme.supplier.enabled}")
    val supplierEnabled: Boolean = false

    @Bean
    fun callmeEventSupplier(): Supplier<Message<CallmeEvent>?> = Supplier { createEvent() }

    private fun createEvent(): Message<CallmeEvent>? {
        return if (supplierEnabled)
            MessageBuilder.withPayload(CallmeEvent(++id, "I'm callme event!"))
                    .setHeader("to_process", true)
                    .build()
        else
            null
    }
}

Alternatively we may send an event on demand, for example by calling REST endpoint. To do that we need to use StreamBridge bean. It provides send method that takes a name of binding and message object as parameters. We may set just a payload object or use MessageBuilder to create the whole GenericMessage with headers. Our controller is exposing two POST methods. First of them POST /{message} is used just for setting message body. The second method POST /{message}/process/{process} allows set header to_process.

@RestController
@RequestMapping("/producer")
class ProducerController(val streamBridge: StreamBridge) {

    var id: Int = 0

    @PostMapping("/{message}")
    fun sendEvent(@PathVariable message: String): Boolean {
        return streamBridge.send("callmeEventSupplier-out-0", CallmeEvent(++id, message))
    }

    @PostMapping("/{message}/process/{process}")
    fun sendEventWithHeader(@PathVariable message: String, @PathVariable process: Boolean): Boolean {
        return streamBridge.send("callmeEventSupplier-out-0",
                MessageBuilder.createMessage(CallmeEvent(++id, message),
                        MessageHeaders(mutableMapOf(Pair<String, Any>("to_process", process)))))
    }
}

Here is a configuration file of our application. We need a default name of the destination for our binding (1). By default, it is the same as a binding name, but we are going to change it to callme-events. The same destination will be set on the consumer side. If we are using StreamBridge for sending messages we also need to set property spring.cloud.stream.source with the name that is used as a prefix of the generated binding name. If you would like to use the same output as for Supplier you should set the same name as for method that registers Supplier bean – callmeEventSupplier. We should also increase a level of logging for Spring AMQP library to see the structure of messages sent to message broker (3).

spring.application.name=producer-service
spring.cloud.stream.bindings.callmeEventSupplier-out-0.destination=callme-events #(1)
spring.cloud.stream.source=callmeEventSupplier #(2)
logging.level.org.springframework.amqp=DEBUG #(3)
callme.supplier.enabled=true

Consumer application

There are two different applications that listen for incoming events. Let’s start from implementation of consumer-a-service. It is pretty simple, because it is just logging message that has been consumed from callme-events topic. To consume message from destination we need to define Consumer bean that takes CallmeEvent as an argument.

@SpringBootApplication
class ConsumerAApplication {

    val logger: Logger = LoggerFactory.getLogger(ConsumerAApplication::class.java)

    @Bean
    fun callmeEventConsumer(): Consumer<CallmeEvent> = Consumer { logger.info("Received: {}", it) }
}

In the application properties we also need to override a default name of destination for callmeEventConsumer-in-0 binding. That name is the same as name of output destination configured on the producer side – callme-events. We are also setting a consumer group for all instances of consumer-a-service application. The name of group is a. Consumer group guarantees that only a single instance of application in a group is handling a single incoming event.

spring.application.name=consumer-a-service
spring.cloud.stream.bindings.callmeEventConsumer-in-0.destination=callme-events
spring.cloud.stream.bindings.callmeEventConsumer-in-0.group=a

Message broker

We are running RabbitMQ on Docker container. We need to expose two ports outside container: 5672 for TCP connections from applications and 15672 for web management console. The following command starts the container.

$ docker run -d --name rabbit -h rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Now, we may run both our applications producer-service and consumer-a-service. We are going to run a single instance of producer-service and two instances of consumer-a-service. After that producer-service is starting to send events continuously to the destination on message broker. We may verify it with Rabbit Management Console. To login use default guest / guest credentials.

Routing function

We can provide some more advanced routing on the consumer side. In Spring Cloud Stream nomenclature event routing is the ability to either route evens to a particular even subscriber or route event produced by an event subscriber to a particular destination. Event routing may be enabled for application by setting property spring.cloud.stream.function.routing.enabled to true. After that the generated name of bindings is automatically set to functionRouter-*.
I enabled event routing feature for consumer-b-service. Here’s the list of configuration properties required to enable routing for consumer listening on callme-events destination. When enabling event routing we also need to set a routing expression. We may use SPeL notation for it. In that case I’m performing routing basing on to_process header value.

spring.application.name=consumer-b-service
spring.cloud.stream.bindings.functionRouter-in-0.destination=callme-events
spring.cloud.stream.bindings.functionRouter-in-0.group=b
spring.cloud.stream.function.routing.enabled=true
spring.cloud.function.routing-expression=(headers['to_process']!=null && headers['to_process']==true) ? 'process':'fireForget'

The name of declared Consumer or Function beans should be the same as the values returned by spring.cloud.function.routing-expression. In that case these are fireForget() and process().

@SpringBootApplication
class ConsumerBApplication {

    val logger: Logger = LoggerFactory.getLogger(ConsumerBApplication::class.java)

    @Bean
    fun fireForget(): Consumer<CallmeEvent> = Consumer { logger.info("Received(fireForget): {}", it) }

    @Bean
    fun process(): Function<CallmeEvent, CallmeResponse> = Function { logAndResponse(it) }

    private fun logAndResponse(callmeEvent: CallmeEvent): CallmeResponse {
        logger.info("Received(process): {}", callmeEvent)
        return CallmeResponse(callmeEvent.id, "I'm callme response")
    }
}

Here’s the list of exchanges used by our system after starting consumer-b-service. Since the name of destination for functionRouter-in-0 is overridden in configuration properties, the name of destination for functionRouter-out-0 is left at its default value.

Partitioning

Partitioning is a critical concept in stateful processing, where it is required to ensure that all related data is handled together. So, thanks to partitioning we may implement an algorithm responsible for distributing messages across multiple instances of applications in a determined way.
To enable partitioning on the producer side we need to set two properties related to a given binding. It is producer’s partitionKeyExpression, which in that case the id field of CallmeEvent, and producer’s partitionCount with a number of partitions. This number should be the same as a number of running instances of the application, since each partition is assigned to the selected instance of that application. Because we are planning to run two instances, we are setting such a value.

spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionCount=2

To enable partitioning on the consumer side we need to enable a single property on a given binding. We
also need to set a property spring.cloud.stream.instanceCount using the static number that represents a number of deployed instances of an application. While Kafka supports a more flexible way of partitioning configuration, RabbitMQ requires a static value, so we can’t scale up numbers dynamically without changing these properties. We also need to set property spring.cloud.stream.instanceIndex for a single instance. It needs to 0 or 1 if we have two running instances as shown below.

Now we take a look on structure of routing inside callme-events exchange created on RabbitMQ. Two consumers are listening for incoming events per a single consumer group. So if we have to different consumer groups defined there 4 consumers in total. Within each group, we have two different routing keys set for each target queue.

We can also verify that four queues receive incoming events.

Testing

Spring Cloud Stream provides an easy way of testing your microservice applications without need to connecting to an external messaging system. To enable that support we need to include the following dependency to Maven pom.xml.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream</artifactId>
   <version>3.0.3.RELEASE</version>
   <type>test-jar</type>
   <scope>test</scope>
   <classifier>test-binder</classifier>
</dependency>

In JUnit test class we should import TestChannelBinderConfiguration class. after that we may use InputDestination for sending test messages, and OutputDestination for receiving them during a test. The following testRouter is created for consumer-b-service, and it verifies that RouterFunction is working properly by receiving and validating message sent to an output destination.

@SpringBootTest(classes = [ConsumerBApplication::class])
@Import(TestChannelBinderConfiguration::class)
class RouterFunctionTest {

    @Autowired
    lateinit var inputDestination: InputDestination
    @Autowired
    lateinit var outputDestination: OutputDestination

    @Test
    fun testRouter() {
        inputDestination.send(MessageBuilder.withPayload(CallmeEvent(1, "I'm callme event"))
                .setHeader("to_process", true)
                .build())
        val response = outputDestination.receive()
        Assertions.assertNotNull(response)
        Assertions.assertTrue(response.payload.isNotEmpty())
        val payload = String(response.payload)
        val payloadObject = ObjectMapper().readValue(payload, CallmeResponse::class.java)
        Assertions.assertEquals(1, payloadObject.id)
        Assertions.assertEquals("I'm callme response", payloadObject.message)
    }

}
Exit mobile version