Event driven microservices using Spring Cloud Stream and RabbitMQ
Before we start let’s look at site Spring Cloud Quick Start. There is a list of spring-cloud releases available grouped as release trains. We use the newest release Camden.SR5 with 1.4.4.RELEASE Spring Boot and Brooklyn.SR2 Spring Cloud Stream version.
[code language=”xml”]
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.4.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
[/code]
Here’s our architecture visualization. Order service sends message to RabbitMQ topic exchange. Product and shipment services listen on that topic for incoming order messages and then process them. After processing they send reply message to the topic on which payment service listens to. Payment service stores incoming messages aggregating reply from product and shipment services, then count prices and sends final response.
Each service has the following dependencies. We have sample-common module where object for messages sent to topics are stored. They’re shared between all services. We’re also using Spring Cloud Sleuth for distributed tracing with one request id between all microservices.
[code language=”xml”]
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>pl.piomin.services</groupId>
<artifactId>sample-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
[/code]
Let me start with a few words on the theoretical aspects of Spring Cloud stream. Here’s short reference of that framework Spring Cloud Stream Reference Guide. It’s based on Spring Integration. It provides three predefined interfaces out of the box:
- Source – can be used for an application which has a single outbound channel
- Sink – can be used for an application which has a single inbound channel
- Processor – can be used for an application which has both an inbound channel and an outbound channel
I’m going to show you sample usage of all of these interfaces. In order service we’re using Source class. Using @InboundChannelAdapter and @Poller annotations we’are sending order message to output once per 10 seconds.
[code language=”java”]
@SpringBootApplication
@EnableBinding(Source.class)
public class Application {
protected Logger logger = Logger.getLogger(Application.class.getName());
private int index = 0;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageSource<Order> orderSource() {
return () -> {
Order o = new Order(index++, OrderType.PURCHASE, LocalDateTime.now(), OrderStatus.NEW, new Product(), new Shipment());
logger.info("Sending order: " + o);
return new GenericMessage<>(o);
};
}
@Bean
public AlwaysSampler defaultSampler() {
return new AlwaysSampler();
}
}
[/code]
Here’s output configuration in application.yml file.
[code]
spring:
cloud:
stream:
bindings:
input:
destination: ex.stream.in
binder: rabbit1
output:
destination: ex.stream.out
binder: rabbit1
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.99.100
port: 30000
username: guest
password: guest
[/code]
Product and shipment services use Processor interface. They listen on stream input and after processing send message to their outputs.
[code language=”java”]
@SpringBootApplication
@EnableBinding(Processor.class)
public class Application {
@Autowired
private ProductService productService;
protected Logger logger = Logger.getLogger(Application.class.getName());
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Order processOrder(Order order) {
logger.info("Processing order: " + order);
return productService.processOrder(order);
}
@Bean
public AlwaysSampler defaultSampler() {
return new AlwaysSampler();
}
}
[/code]
Here’s service configuration. It listens on order service output exchange and also defines its group named product. That group name will be used for automatic queue creation and exchange binding on RabbitMQ. There is also output exchange defined.
[code language=”xml”]
spring:
cloud:
stream:
bindings:
input:
destination: ex.stream.out
group: product
binder: rabbit1
output:
destination: ex.stream.out2
binder: rabbit1
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.99.100
port: 30000
username: guest
password: guest
[/code]
We use docker container for running RabbitMQ instance.
docker run -d --name rabbit1 -p 30001:15672 -p 30000:5672 rabbitmq:management
Let’s look at the management console. It’s available on http://192.168.99.100:30001. Here’s ex.stream.out topic exchange configuration. Below we see the list of declared queues.
Here’s main application class from payment service. We use Sink interface for listening on incoming messages. Input order is processed and we print final price of order sent by order service. Sample application source code is available on GitHub.
[code language=”java”]
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
@Autowired
private PaymentService paymentService;
protected Logger logger = Logger.getLogger(Application.class.getName());
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void processOrder(Order order) {
logger.info("Processing order: " + order);
Order o = paymentService.processOrder(order);
if (o != null)
logger.info("Final response: " + (o.getProduct().getPrice() + o.getShipment().getPrice()));
}
@Bean
public AlwaysSampler defaultSampler() {
return new AlwaysSampler();
}
}
[/code]
By using @Bean AlwaysSampler in every main class of our microservices we propagate one trace and span id between all calls of single order. Here’s fragment from our microservices logging console. And also I get the following warning message which is not understable for me: ‘Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class’. Version 1.1.2.RELEASE of Spring Cloud Sleuth is not applicable Camden.SR5 release?
0 COMMENTS