ActiveMQ Artemis with Spring Boot on Kubernetes
This article will teach you how to run ActiveMQ on Kubernetes and integrate it with your app through Spring Boot. We will deploy a clustered ActiveMQ broker using a dedicated operator. Then we are going to build and run two Spring Boot apps. The first of them is running in multiple instances and receiving messages from the queue, while the second is sending messages to that queue. In order to test the ActiveMQ cluster, we will use Kind. The consumer app connects to the cluster using several different modes. We will discuss those modes in detail.
You can find a lot of articles about other message brokers like RabbitMQ or Kafka on my blog. If you would to read about RabbitMQ on Kubernetes please refer to that article. In order to find out more about Kafka and Spring Boot integration, you can read the article about Kafka Streams and Spring Cloud Stream available here. Previously I didn’t write much about ActiveMQ, but it is also a very popular message broker. For example, it supports the latest version of AMQP protocol, while Rabbit is based on their extension of AMQP 0.9.
Source Code
If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then go to the messaging
directory. You will find there three Spring Boot apps: simple-producer
, simple-consumer
and simple-counter
. After that, you should just follow my instructions. Let’s begin.
Integrate Spring Boot with ActiveMQ
Let’s begin with integration between our Spring Boot apps and the ActiveMQ Artemis broker. In fact, ActiveMQ Artemis is the base of the commercial product provided by Red Hat called AMQ Broker. Red Hat actively develops a Spring Boot starter for ActiveMQ and an operator for running it on Kubernetes. In order to access Spring Boot, you need to include the Red Hat Maven repository in your pom.xml
file:
<repository>
<id>red-hat-ga</id>
<url>https://maven.repository.redhat.com/ga</url>
</repository>
After that, you can include a starter in your Maven pom.xml
:
<dependency>
<groupId>org.amqphub.spring</groupId>
<artifactId>amqp-10-jms-spring-boot-starter</artifactId>
<version>2.5.6</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
Then, we just need to enable JMS for our app with the @EnableJMS
annotation:
@SpringBootApplication
@EnableJms
public class SimpleConsumer {
public static void main(String[] args) {
SpringApplication.run(SimpleConsumer.class, args);
}
}
Our application is very simple. It just receives and prints an incoming message. The method for receiving messages should be annotated with @JmsListener
. The destination
field contains the name of a target queue.
@Service
public class Listener {
private static final Logger LOG = LoggerFactory
.getLogger(Listener.class);
@JmsListener(destination = "test-1")
public void processMsg(SimpleMessage message) {
LOG.info("============= Received: " + message);
}
}
Here’s the class that represents our message:
public class SimpleMessage implements Serializable {
private Long id;
private String source;
private String content;
public SimpleMessage() {
}
public SimpleMessage(Long id, String source, String content) {
this.id = id;
this.source = source;
this.content = content;
}
// ... GETTERS AND SETTERS
@Override
public String toString() {
return "SimpleMessage{" +
"id=" + id +
", source='" + source + '\'' +
", content='" + content + '\'' +
'}';
}
}
Finally, we need to set connection configuration settings. With AMQP Spring Boot starter it is very simple. We just need to set the property amqphub.amqp10jms.remoteUrl
. For now, we are going to base on the environment variable set at the level of Kubernetes Deployment
.
amqphub.amqp10jms.remoteUrl = ${ARTEMIS_URL}
The producer application is pretty similar. Instead of the annotation for receiving messages, we use Spring JmsTemplate
for producing and sending messages to the target queue. The method for sending messages is exposed as an HTTP POST /producer/send
endpoint.
@RestController
@RequestMapping("/producer")
public class ProducerController {
private static long id = 1;
private final JmsTemplate jmsTemplate;
@Value("${DESTINATION}")
private String destination;
public ProducerController(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
@PostMapping("/send")
public SimpleMessage send(@RequestBody SimpleMessage message) {
if (message.getId() == null) {
message.setId(id++);
}
jmsTemplate.convertAndSend(destination, message);
return message;
}
}
Create a Kind cluster with Nginx Ingress
Our example apps are ready. Before deploying them, we need to prepare the local Kubernetes cluster. We will deploy there the ActiveMQ cluster consisting of three brokers. Therefore, our Kubernetes cluster will also consist of three nodes. Consequently, there are three instances of the consumer app running on Kubernetes. They are connecting to the ActiveMQ brokers over the AMQP protocol. There is also a single instance of the producer app that sends messages on demand. Here’s the diagram of our architecture.
In order to run a multi-node Kubernetes cluster locally, we will use Kind. We will test not only communication over AMQP protocol but also expose the ActiveMQ management console over HTTP. Because ActiveMQ uses headless services for exposing a web console we have to create and configure Ingress on Kind to access it. Let’s begin.
In the first step, we are going to create a Kind cluster. It consists of a control plane and three workers. The configuration has to be prepared correctly to run the Nginx Ingress Controller. We should add the ingress-ready
label to a single worker node and expose ports 80
and 443
. Here’s the final version of a Kind config file:
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
kubeadmConfigPatches:
- |
kind: JoinConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraPortMappings:
- containerPort: 80
hostPort: 80
protocol: TCP
- containerPort: 443
hostPort: 443
protocol: TCP
- role: worker
- role: worker
Now, let’s create a Kind cluster by executing the following command:
$ kind create cluster --config kind-config.yaml
If your cluster has been successfully created you should see similar information:
After that, let’s install the Nginx Ingress Controller. It is just a single command:
$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/main/deploy/static/provider/kind/deploy.yaml
Let’s verify the installation:
$ kubectl get pod -n ingress-nginx
NAME READY STATUS RESTARTS AGE
ingress-nginx-admission-create-wbbzh 0/1 Completed 0 1m
ingress-nginx-admission-patch-ws2mv 0/1 Completed 0 1m
ingress-nginx-controller-86b6d5756c-rkbmz 1/1 Running 0 1m
Install ActiveMQ Artemis on Kubernetes
Finally, we may proceed to the ActiveMQ Artemis installation. Firstly, let’s install the required CRDs. You may find all the YAML manifests inside the operator repository on GitHub.
$ git clone https://github.com/artemiscloud/activemq-artemis-operator.git
$ cd activemq-artemis-operator
The manifests with CRDs are located in the deploy/crds
directory:
$ kubectl create -f ./deploy/crds
After that, we can install the operator:
$ kubectl create -f ./deploy/service_account.yaml
$ kubectl create -f ./deploy/role.yaml
$ kubectl create -f ./deploy/role_binding.yaml
$ kubectl create -f ./deploy/election_role.yaml
$ kubectl create -f ./deploy/election_role_binding.yaml
$ kubectl create -f ./deploy/operator_config.yaml
$ kubectl create -f ./deploy/operator.yaml
In order to create a cluster, we have to create the ActiveMQArtemis
object. It contains a number of brokers being a part of the cluster (1). We should also set the accessor, to expose the AMQP port outside of every single broker pod (2). Of course, we will also expose the management console (3).
apiVersion: broker.amq.io/v1beta1
kind: ActiveMQArtemis
metadata:
name: ex-aao
spec:
deploymentPlan:
size: 3 # (1)
image: placeholder
messageMigration: true
resources:
limits:
cpu: "500m"
memory: "1024Mi"
requests:
cpu: "250m"
memory: "512Mi"
acceptors: # (2)
- name: amqp
protocols: amqp
port: 5672
connectionsAllowed: 5
console: # (3)
expose: true
Once the ActiveMQArtemis
is created, and the operator starts the deployment process. It creates the StatefulSet
object:
$ kubectl get statefulset
NAME READY AGE
ex-aao-ss 3/3 1m
It starts all three pods with brokers sequentially:
$ kubectl get pod -l application=ex-aao-app
NAME READY STATUS RESTARTS AGE
ex-aao-ss-0 1/1 Running 0 5m
ex-aao-ss-1 1/1 Running 0 3m
ex-aao-ss-2 1/1 Running 0 1m
Let’s display a list of Service
s created by the operator. There is a single Service
per broker for exposing the AMQP port (ex-aao-amqp-*
) and web console (ex-aao-wsconsj-*
):
The operator automatically creates Ingress objects per each web console Service
. We will modify them by adding different hosts. Let’s say that is the one.activemq.com
domain for the first broker, two.activemq.com
for the second broker, etc.
$ kubectl get ing
NAME CLASS HOSTS ADDRESS PORTS AGE
ex-aao-wconsj-0-svc-ing <none> one.activemq.com localhost 80 1h
ex-aao-wconsj-1-svc-ing <none> two.activemq.com localhost 80 1h
ex-aao-wconsj-2-svc-ing <none> three.activemq.com localhost 80 1h
After creating ingresses we would have to add the following line in /etc/hosts
.
127.0.0.1 one.activemq.com two.activemq.com three.activemq.com
Now, we access the management console, for example for the third broker under the following URL http://three.activemq.com/console.
Once the broker is ready, we may define a test queue. The name of that queue is test-1
.
apiVersion: broker.amq.io/v1beta1
kind: ActiveMQArtemisAddress
metadata:
name: address-1
spec:
addressName: address-1
queueName: test-1
routingType: anycast
Run the Spring Boot app on Kubernetes and connect to ActiveMQ
Now, let’s deploy the consumer app. In the Deployment
manifest, we have to set the ActiveMQ cluster connection URL. But wait… how to connect it? There are three brokers exposed using three separate Kubernetes Service
s. Fortunately, the AMQP Spring Boot starter supports it. We may set the addresses of three brokers inside the failover
section. Let’s try it to see what will happen.
apiVersion: apps/v1
kind: Deployment
metadata:
name: simple-consumer
spec:
replicas: 3
selector:
matchLabels:
app: simple-consumer
template:
metadata:
labels:
app: simple-consumer
spec:
containers:
- name: simple-consumer
image: piomin/simple-consumer
env:
- name: ARTEMIS_URL
value: failover:(amqp://ex-aao-amqp-0-svc:5672,amqp://ex-aao-amqp-1-svc:5672,amqp://ex-aao-amqp-2-svc:5672)
resources:
limits:
memory: 256Mi
cpu: 500m
requests:
memory: 128Mi
cpu: 250m
The application is prepared to be deployed with Skaffold. If you run the skaffold dev
command you will deploy and see the logs of all three instances of the consumer app. What’s the result? All the instances connect to the first URL from the list as shown below.
Fortunately, there is a failover parameter that helps distribute client connections more evenly across multiple remote peers. With the failover.randomize
option, URIs are randomly shuffled before attempting to connect to one of them. Let’s replace the ARTEMIS_URL
env in the Deployment
manifest with the following line:
failover:(amqp://ex-aao-amqp-0-svc:5672,amqp://ex-aao-amqp-1-svc:5672,amqp://ex-aao-amqp-2-svc:5672)?failover.randomize=true
The distribution between broker instances looks slightly better. Of course, the result is random, so you may get different results.
The first way to distribute the connections is through the dedicated Kubernetes Service
. We don’t have to leverage the services created automatically by the operator. We can create our own Service
that load balances between all available pods with brokers.
kind: Service
apiVersion: v1
metadata:
name: ex-aao-amqp-lb
spec:
ports:
- name: amqp
protocol: TCP
port: 5672
type: ClusterIP
selector:
application: ex-aao-app
Now, we can resign from the failover
section on the client side and fully rely on Kubernetes mechanisms.
spec:
containers:
- name: simple-consumer
image: piomin/simple-consumer
env:
- name: ARTEMIS_URL
value: amqp://ex-aao-amqp-lb:5672
This time we won’t see anything in the application logs, because all the instances connect to the same URL. We can verify a distribution between all the broker instances using e.g. the management web console. Here’s a list of consumers on the first instance of ActiveMQ:
Below, you will exactly the same results for the second instance. All the consumer app instances have been distributed equally between all available brokers inside the cluster.
Now, we are going to deploy the producer app. We use the same Kubernetes Service
for connecting the ActiveMQ cluster.
apiVersion: apps/v1
kind: Deployment
metadata:
name: simple-producer
spec:
replicas: 3
selector:
matchLabels:
app: simple-producer
template:
metadata:
labels:
app: simple-producer
spec:
containers:
- name: simple-producer
image: piomin/simple-producer
env:
- name: ARTEMIS_URL
value: amqp://ex-aao-amqp-lb:5672
- name: DESTINATION
value: test-1
ports:
- containerPort: 8080
Because we have to call the HTTP endpoint let’s create the Service
for the producer app:
apiVersion: v1
kind: Service
metadata:
name: simple-producer
spec:
type: ClusterIP
selector:
app: simple-producer
ports:
- port: 8080
Let’s deploy the producer app using Skaffold with port-forwarding enabled:
$ skaffold dev --port-forward
Here’s a list of our Deployment
s:
In order to send a test message just execute the following command:
$ curl http://localhost:8080/producer/send \
-d "{\"source\":\"test\",\"content\":\"Hello\"}" \
-H "Content-Type:application/json"
Advanced configuration
If you need more advanced traffic distribution between brokers inside the cluster you can achieve it in several ways. For example, we can dynamically override configuration property on runtime. Here’s a very simple example. After starting the application we are connecting the external service over HTTP. It returns the next instance number.
@Configuration
public class AmqpConfig {
@PostConstruct
public void init() {
RestTemplate t = new RestTemplateBuilder().build();
int x = t.getForObject("http://simple-counter:8080/counter", Integer.class);
System.setProperty("amqphub.amqp10jms.remoteUrl",
"amqp://ex-aao-amqp-" + x + "-svc:5672");
}
}
Here’s the implementation of the counter app. It just increments the number and divides it by the number of the broker instances. Of course, we may create a more advanced implementation, and provide e.g. connection to the instance of a broker running on the same Kubernetes node as the app pod.
@SpringBootApplication
@RestController
@RequestMapping("/counter")
public class CounterApp {
private static int c = 0;
public static void main(String[] args) {
SpringApplication.run(CounterApp.class, args);
}
@Value("${DIVIDER:0}")
int divider;
@GetMapping
public Integer count() {
if (divider > 0)
return c++ % divider;
else
return c++;
}
}
Final Thoughts
ActiveMQ is an interesting alternative to RabbitMQ as a message broker. In this article, you learned how to run, manage and integrate ActiveMQ with Spring Boot on Kubernetes. It can be declaratively managed on Kubernetes thanks to ActiveMQ Artemis Operator. You can also easily integrate it with Spring Boot using a dedicated starter. It provides various configuration options and is actively developed by Red Hat and the community.
6 COMMENTS