Event driven microservices using Spring Cloud Stream and RabbitMQ

Event driven microservices using Spring Cloud Stream and RabbitMQ

Before we start let’s look at site Spring Cloud Quick Start. There is a list of spring-cloud releases available grouped as release trains. We use the newest release Camden.SR5 with 1.4.4.RELEASE Spring Boot and Brooklyn.SR2 Spring Cloud Stream version.

[code language=”xml”]
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.4.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
[/code]

Here’s our architecture visualization. Order service sends message to RabbitMQ topic exchange. Product and shipment services listen on that topic for incoming order messages and then process them. After processing they send reply message to the topic on which payment service listens to. Payment service stores incoming messages aggregating reply from product and shipment services, then count prices and sends final response.

sample1

Each service has the following dependencies. We have sample-common module where object for messages sent to topics are stored. They’re shared between all services. We’re also using Spring Cloud Sleuth for distributed tracing with one request id between all microservices.

[code language=”xml”]
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>pl.piomin.services</groupId>
<artifactId>sample-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
[/code]

Let me start with a few words on the theoretical aspects of Spring Cloud stream. Here’s short reference of that framework Spring Cloud Stream Reference Guide. It’s based on Spring Integration. It provides three predefined interfaces out of the box:

  • Source  – can be used for an application which has a single outbound channel
  • Sink – can be used for an application which has a single inbound channel
  • Processor – can be used for an application which has both an inbound channel and an outbound channel

I’m going to show you sample usage of all of these interfaces. In order service we’re using Source class. Using @InboundChannelAdapter and @Poller annotations we’are sending order message to output once per 10 seconds.

[code language=”java”]
@SpringBootApplication
@EnableBinding(Source.class)
public class Application {

protected Logger logger = Logger.getLogger(Application.class.getName());

private int index = 0;

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageSource<Order> orderSource() {
return () -> {
Order o = new Order(index++, OrderType.PURCHASE, LocalDateTime.now(), OrderStatus.NEW, new Product(), new Shipment());
logger.info("Sending order: " + o);
return new GenericMessage<>(o);
};
}

@Bean
public AlwaysSampler defaultSampler() {
return new AlwaysSampler();
}

}
[/code]

Here’s output configuration in application.yml file.

[code]
spring:
cloud:
stream:
bindings:
input:
destination: ex.stream.in
binder: rabbit1
output:
destination: ex.stream.out
binder: rabbit1
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.99.100
port: 30000
username: guest
password: guest
[/code]

Product and shipment services use Processor interface. They listen on stream input and after processing send message to their outputs.

[code language=”java”]
@SpringBootApplication
@EnableBinding(Processor.class)
public class Application {

@Autowired
private ProductService productService;

protected Logger logger = Logger.getLogger(Application.class.getName());

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Order processOrder(Order order) {
logger.info("Processing order: " + order);
return productService.processOrder(order);
}

@Bean
public AlwaysSampler defaultSampler() {
return new AlwaysSampler();
}

}
[/code]

Here’s service configuration. It listens on order service output exchange and also defines its group named product. That group name will be used for automatic queue creation and exchange binding on RabbitMQ. There is also output exchange defined.

[code language=”xml”]
spring:
cloud:
stream:
bindings:
input:
destination: ex.stream.out
group: product
binder: rabbit1
output:
destination: ex.stream.out2
binder: rabbit1
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.99.100
port: 30000
username: guest
password: guest
[/code]

We use docker container for running RabbitMQ instance.

docker run -d --name rabbit1 -p 30001:15672 -p 30000:5672 rabbitmq:management

Let’s look at the management console. It’s available on http://192.168.99.100:30001. Here’s ex.stream.out topic exchange configuration. Below we see the list of declared queues.

rabbit1

rabbit2

Here’s main application class from payment service. We use Sink interface for listening on incoming messages. Input order is processed and we print final price of order sent by order service. Sample application source code is available on GitHub.

[code language=”java”]
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

@Autowired
private PaymentService paymentService;

protected Logger logger = Logger.getLogger(Application.class.getName());

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@StreamListener(Sink.INPUT)
public void processOrder(Order order) {
logger.info("Processing order: " + order);
Order o = paymentService.processOrder(order);
if (o != null)
logger.info("Final response: " + (o.getProduct().getPrice() + o.getShipment().getPrice()));
}

@Bean
public AlwaysSampler defaultSampler() {
return new AlwaysSampler();
}

}
[/code]

By using @Bean AlwaysSampler in every main class of our microservices we propagate one trace and span id between all calls of single order. Here’s fragment from our microservices logging console. And also I get the following warning message which is not understable for me: ‘Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class’. Version 1.1.2.RELEASE of Spring Cloud Sleuth is not applicable Camden.SR5 release?

logs

0 COMMENTS

comments user
Marcin Grzejszczak

Hi / Siemano Piotr 😉 !

Very good catch with the warning

Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class’.

It should state please upgrade Sleuth to 1.2 . When I first wrote this we weren’t planning on releasing a 1.2 but then the plans have changed and I have forgotten to bump the warning message. I’ll file an issue and update the warning message.

    comments user
    Piotr Mińkowski

    Hi Marcin,
    Ok. Which headers are deprecated?

      comments user
      Marcin Grzejszczak

      The ones with the `-` . That’s because they are not compliant with JMS standards.

comments user
Marcin Grzejszczak

Hi / Siemano Piotr 😉 !

Very good catch with the warning

Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class’.

It should state please upgrade Sleuth to 1.2 . When I first wrote this we weren’t planning on releasing a 1.2 but then the plans have changed and I have forgotten to bump the warning message. I’ll file an issue and update the warning message.

    comments user
    Piotr Mińkowski

    Hi Marcin,
    Ok. Which headers are deprecated?

      comments user
      Marcin Grzejszczak

      The ones with the `-` . That’s because they are not compliant with JMS standards.

comments user
onesinfiniteit

How do I call the orderSource explicitly instead of polling it every 10000ms?

comments user
onesinfiniteit

How do I call the orderSource explicitly instead of polling it every 10000ms?

comments user
Soleil

Hi, thank for your article, I ‘m finding the way to communicate between instances of a service in microservices, for example in your post, these are 2 instances of order services running in the same time, how could they communicate via rabbitmq, could you give an example? Thanks

    comments user
    Piotr Mińkowski

    Hi. You are asking scaling up your event-driven microservices? Take a look here https://piotrminkowski.wordpress.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/.

      comments user
      Soleil

      hi, my question is about communication between many instances of a same microservice. For example the microservice Order have 3 instances (A1, A2, A3), how could design for instance A1 send message to other instances and A2, A3 could receive but A1 can’t. Could you give any ideas, or link for reference? Thank you!

        comments user
        Piotr Mińkowski

        Ok, why would you like to send a message to other instances of the same microservice? Anyway, in the article I sent you before I describe consumer group mechanism which guarantees that only single instance of microservice receives a message. However, if you send message to the sender microservice it won’t prevent you from receiving the message by sending instance.

comments user
Soleil

Hi, thank for your article, I ‘m finding the way to communicate between instances of a service in microservices, for example in your post, these are 2 instances of order services running in the same time, how could they communicate via rabbitmq, could you give an example? Thanks

    comments user
    Piotr Mińkowski

    Hi. You are asking scaling up your event-driven microservices? Take a look here https://piotrminkowski.wordpress.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/.

      comments user
      Soleil

      hi, my question is about communication between many instances of a same microservice. For example the microservice Order have 3 instances (A1, A2, A3), how could design for instance A1 send message to other instances and A2, A3 could receive but A1 can’t. Could you give any ideas, or link for reference? Thank you!

        comments user
        Piotr Mińkowski

        Ok, why would you like to send a message to other instances of the same microservice? Anyway, in the article I sent you before I describe consumer group mechanism which guarantees that only single instance of microservice receives a message. However, if you send message to the sender microservice it won’t prevent you from receiving the message by sending instance.

Leave a Reply