Kubernetes Messaging with Java and KubeMQ

Kubernetes Messaging with Java and KubeMQ

Have you ever tried to run any message broker on Kubernetes? KubeMQ is a relatively new solution and is not as popular as competitive tools like RabbitMQ, Kafka, or ActiveMQ. However, it has one big advantage over them – it is Kubernetes native message broker, which may be deployed there using a single command without preparing any additional templates or manifests. This convinced me to take a closer look at KubeMQ.
KubeMQ is an enterprise-grade, scalable, highly available, and secure message broker and message queue, designed as Kubernetes native solution in a lightweight container. It is written in Go. Therefore it is being advertised as a very fast solution running inside a small Docker container, which has about 30MB. It may be easily integrated with some popular third-party tools for observability like Zipkin, Prometheus, or Datadog.
When I’m reading comparison with competitive tools like RabbitMQ or Redis available on KubeMQ site (https://kubemq.io/product-overview/) it looks pretty amazing (for KubeMQ of course). It seems the authors wanted to merge some useful features of RabbitMQ and Kafka in a single product. In fact, KubeMQ provides many interesting mechanisms like delayed delivery, message peeking, message batch sending and receiving for queues, and consumer groups, load balancing and offsetting support for pub/sub.
Ok, when I’m looking at their SDK Java I see that it’s a new product, and there are still some things to do. However, all the features listed above seem to be very useful. Of course, I won’t be able to demonstrate all of them in this article, but I’m going to show you a simple Java application that uses message queue with transactions, and a pub/sub event store. Let’s begin our KubeMQ Java tutorial.

Example

The example application is written in Java 11, and uses Spring Boot. The source code is available as usual on GitHub. The repository address is https://github.com/piomin/sample-java-kubemq.git.

Before start

Before starting with KubeMQ you need to have a running instance of Minikube. I have tested it on version 1.6.1.

$ minikube start --vm-driver=virtualbox

Running KubeMQ on Kubernetes

First, you need to install KubeMQ. For Windows, you just need to download the latest version of CLI available on address https://github.com/kubemq-io/kubemqctl/releases/download/latest/kubemqctl.exe and copy it to the directory under PATH. Before installing KubeMQ on your Minikube instance we need to register on the web site https://account.kubemq.io/login/register. You will receive a token required for the installation. Installation is very easy with CLI. You just need to execute command kubemqctl cluster create with the registration token as shown below.

kubemq-java-tutorial-create

By default, KubeMQ creates a cluster consisting of three instances (pods). It is deployed as Kubernetes StatefulSet. The deployment is available inside the newly created namespace – kubemq. We can easily check the list of running pods with kubectl get pod command.

kubemq-java-tutorial-pods

The list of pods is not very important for us. We can easily scale up and scale down the number of instances in the cluster using command kubemqctl cluster scale. KubeMQ is exposed in the cluster under different interfaces. KubeMQ Java SDK is using GRPC protocol for communication, so we use service kubemq-cluster-grpc available under port 50000.

kubernetes-messaging-java-kubemq-svc

Since KubeMQ is a native Kubernetes message broker starting with it on Minikube is very simple. After executing a single command, we may now focus on development.

Example Architecture

We have an example application deployed on Kubernetes, which integrates with KubeMQ queue and event store. The diagram visible below illustrates an architecture of the application. It exposes REST endpoint POST /orders for creating new orders. Each order signifies a transfer between two in-memory accounts. The incoming order is sent to the queue orders (1). Then it is received by the listener (2), which is responsible for updating account balances using AccountRepository bean (3). If the transaction is finished, the event is sent to the pub/sub topic transactions. Incoming events may be listened to by many subscribers (4). In the example application we have two listeners: TransactionAmountListener and TransactionCountListener (5). They are responsible for adding extra money to the target order’s account based on the different criteria. The first criteria is an amount of a given transaction, while the second is the number of processed transactions per account.

kubernetes-messaging-java-kubemq-arch

On the described example application I’m going to show you the following features of KubeMQ and its SDK for Java:

  • Sending messages to a queue
  • Listening for incoming queue messages and handling transactions
  • Sending messages to pub/sub via Channel
  • Subscribing to pub/sub events and reading older events from a store
  • Using Spring Boot for integration with KubeMQ for standalone Java application

Let’s proceed to the implementation.

Implementation with Spring Boot and KubeMQ SDK

We are beginning with configuration. The URL to KubeMQ GRPC has been externalized in the application.yml.

spring:
  application:
    name: sampleapp-kubemq
kubemq:
  address: kubemq-cluster-grpc:50000

In the @Configuration class we are defining all required KubeMQ resources as Spring beans. Each of them requires a KubeMQ cluster address. We need to declare a queue, a channel for sending events and a subscriber for subscribing to the pub/sub events and events store.

@Configuration
@ConfigurationProperties("kubemq")
public class KubeMQConfiguration {

    private String address;

    @Bean
    public Queue queue() throws ServerAddressNotSuppliedException, SSLException {
        return new Queue("transactions", "orders", address);
    }

    @Bean
    public Subscriber subscriber() {
        return new Subscriber(address);
    }

    @Bean
    public Channel channel() {
        return new Channel("transactions", "orders", true, address);
    }

    String getAddress() {
        return address;
    }

    void setAddress(String address) {
        this.address = address;
    }

}

The first component in our architecture is a controller. It exposes HTTP endpoint for placing an order. OrderController injects Queue bean and uses it for sending messages to the KubeMQ queue. After receiving a response that message has been delivered it returns an order with id and status=ACCEPTED.

@RestController
@RequestMapping("/orders")
public class OrderController {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);

    private Queue queue;

    public OrderController(Queue queue) {
        this.queue = queue;
    }

    @PostMapping
    public Order sendOrder(@RequestBody Order order) {
        try {
            LOGGER.info("Sending: {}", order);
            final SendMessageResult result = queue.SendQueueMessage(new Message()
                    .setBody(Converter.ToByteArray(order)));
            order.setId(result.getMessageID());
            order.setStatus(OrderStatus.ACCEPTED);
            LOGGER.info("Sent: {}", order);
        } catch (ServerAddressNotSuppliedException | IOException e) {
            LOGGER.error("Error sending", e);
            order.setStatus(OrderStatus.ERROR);
        }
        return order;
    }

}

The message is processed asynchronously. Since the current KubeMQ Java SDK does not provide any message listener for asynchronous processing, we use synchronous methods inside the infinitive loop. The loop is started inside a new thread handled using Spring TaskExecutor. When a new message is received, we are starting a KubeMQ transaction. It may be acknowledged or rejected. A transaction is confirmed if the source account has sufficient funds to perform a transfer to a target account. If a transaction is confirmed it sends an event to KubeMQ transactions pub/sub with information about it using Channel bean.

@Component
public class OrderListener {

	private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

	private Queue queue;
	private Channel channel;
	private OrderProcessor orderProcessor;
	private TaskExecutor taskExecutor;

	public OrderListener(Queue queue, Channel channel, OrderProcessor orderProcessor, TaskExecutor taskExecutor) {
		this.queue = queue;
		this.channel = channel;
		this.orderProcessor = orderProcessor;
		this.taskExecutor = taskExecutor;
	}

	@PostConstruct
	public void listen() {
		taskExecutor.execute(() -> {
			while (true) {
			    try {
                    Transaction transaction = queue.CreateTransaction();
                    TransactionMessagesResponse response = transaction.Receive(10, 10);
                    if (response.getMessage().getBody().length > 0) {
                        Order order = orderProcessor
                                .process((Order) Converter.FromByteArray(response.getMessage().getBody()));
                        LOGGER.info("Processed: {}", order);
                        if (order.getStatus().equals(OrderStatus.CONFIRMED)) {
                            transaction.AckMessage();
                            Event event = new Event();
                            event.setEventId(response.getMessage().getMessageID());
                            event.setBody(Converter.ToByteArray(order));
							LOGGER.info("Sending event: id={}", event.getEventId());
                            channel.SendEvent(event);
                        } else {
                            transaction.RejectMessage();
                        }
                    } else {
                        LOGGER.info("No messages");
                    }
                    Thread.sleep(10000);
                } catch (Exception e) {
					LOGGER.error("Error", e);
                }
			}
		});

	}

}

OrderListener class is using AccountRepository bean for account balance management. It is a simple in-memory store just for a demo purpose.

@Repository
public class AccountRepository {

    private List<Account> accounts = new ArrayList<>();

    public Account updateBalance(Integer id, int amount) throws InsufficientFundsException {
        Optional<Account> accOptional = accounts.stream().filter(a -> a.getId().equals(id)).findFirst();
        if (accOptional.isPresent()) {
            Account account = accOptional.get();
            account.setBalance(account.getBalance() + amount);
            if (account.getBalance() < 0)
                throw new InsufficientFundsException();
            int index = accounts.indexOf(account);
            accounts.set(index, account);
            return account;
        }
        return null;
    }

    public Account add(Account account) {
        account.setId(accounts.size() + 1);
        accounts.add(account);
        return account;
    }

    public List<Account> getAccounts() {
        return accounts;
    }

    @PostConstruct
    public void init() {
        add(new Account(null, "123456", 2000));
        add(new Account(null, "123457", 2000));
        add(new Account(null, "123458", 2000));
    }
}

And the last components in our architecture – event listeners. Both of them are subscribing to the same EventsStore transactions. The TransactionAmountListener is the simpler one. It is processing only a single event in order transfer percentage bonus counter from transaction amount to a target account. That’s why we have defined it as a listener just for new events (EventsStoreType.StartNewOnly).

@Component
public class TransactionAmountListener implements StreamObserver<EventReceive> {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionAmountListener.class);

    private Subscriber subscriber;
    private AccountRepository accountRepository;

    public TransactionAmountListener(Subscriber subscriber, AccountRepository accountRepository) {
        this.subscriber = subscriber;
        this.accountRepository = accountRepository;
    }

    @Override
    public void onNext(EventReceive eventReceive) {
        try {
            Order order = (Order) Converter.FromByteArray(eventReceive.getBody());
            LOGGER.info("Amount event: {}", order);
            accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1));
        } catch (IOException | ClassNotFoundException | InsufficientFundsException e) {
            LOGGER.error("Error", e);
        }
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {

    }

    @PostConstruct
    public void init() {
        SubscribeRequest subscribeRequest = new SubscribeRequest();
        subscribeRequest.setChannel("transactions");
        subscribeRequest.setClientID("amount-listener");
        subscribeRequest.setSubscribeType(SubscribeType.EventsStore);
        subscribeRequest.setEventsStoreType(EventsStoreType.StartNewOnly);
        try {
            subscriber.SubscribeToEvents(subscribeRequest, this);
        } catch (ServerAddressNotSuppliedException | SSLException e) {
            e.printStackTrace();
        }
    }
}

The other situation is with TransactionCountListener. It should be able to retrieve a list of all events published on pub/sub after every startup of our application. That’s why we are defining StartFromFirst as EventStoreType for Subscriber. Also a clientId needs to be dynamically generated on apply startup in order to retrieve all stored events. The listener send bonus to a target account after the fifth transaction addressed to that account succesfully processed by the application.

@Component
public class TransactionCountListener implements StreamObserver<EventReceive> {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionCountListener.class);
    private Map<Integer, Integer> transactionsCount = new HashMap<>();

    private Subscriber subscriber;
    private AccountRepository accountRepository;

    public TransactionCountListener(Subscriber subscriber, AccountRepository accountRepository) {
        this.subscriber = subscriber;
        this.accountRepository = accountRepository;
    }

    @Override
    public void onNext(EventReceive eventReceive) {
        try {
            Order order = (Order) Converter.FromByteArray(eventReceive.getBody());
            LOGGER.info("Count event: {}", order);
            Integer accountIdTo = order.getAccountIdTo();
            Integer noOfTransactions = transactionsCount.get(accountIdTo);
            if (noOfTransactions == null)
                transactionsCount.put(accountIdTo, 1);
            else {
                transactionsCount.put(accountIdTo, ++noOfTransactions);
                if (noOfTransactions > 5) {
                    accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1));
                    LOGGER.info("Adding extra to: id={}", order.getAccountIdTo());
                }
            }
        } catch (IOException | ClassNotFoundException | InsufficientFundsException e) {
            LOGGER.error("Error", e);
        }
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {

    }

    @PostConstruct
    public void init() {
        final SubscribeRequest subscribeRequest = new SubscribeRequest();
        subscribeRequest.setChannel("transactions");
        subscribeRequest.setClientID("count-listener-" + System.currentTimeMillis());
        subscribeRequest.setSubscribeType(SubscribeType.EventsStore);
        subscribeRequest.setEventsStoreType(EventsStoreType.StartFromFirst);
        try {
            subscriber.SubscribeToEvents(subscribeRequest, this);
        } catch (ServerAddressNotSuppliedException | SSLException e) {
            e.printStackTrace();
        }
    }

}

Running on Minikube

The easiest way to run our sample application on Minikube is with Skaffold and Jib. We don’t have to prepare any Dockerfile, only a single deployment manifest in k8s directory. Here’s our deployment.yaml file.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: sampleapp-kubemq
  namespace: kubemq
  labels:
    app: sampleapp-kubemq
spec:
  replicas: 1
  selector:
    matchLabels:
      app: sampleapp-kubemq
  template:
    metadata:
      labels:
        app: sampleapp-kubemq
    spec:
      containers:
        - name: sampleapp-kubemq
          image: piomin/sampleapp-kubemq
          ports:
            - containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: sampleapp-kubemq
  namespace: kubemq
  labels:
    app: sampleapp-kubemq
spec:
  ports:
    - port: 8080
      protocol: TCP
  selector:
    app: sampleapp-kubemq
  type: NodePort

The source code is prepared to use Skaffold and Jib. It contains the skaffold.yaml file in the project root directory.

apiVersion: skaffold/v2alpha1
kind: Config
build:
  artifacts:
    - image: piomin/sampleapp-kubemq
      jib: {}
  tagPolicy:
    gitCommit: {}

We also need to have a jib-maven-plugin Maven plugin in our pom.xml.

<plugin>
	<groupId>com.google.cloud.tools</groupId>
	<artifactId>jib-maven-plugin</artifactId>
	<version>1.8.0</version>
</plugin>

Now, we only have to execute the following command.


$ skaffold dev

Since our application is deployed on Minikube, we may perform some test calls. Assuming that Minikube node is available under address 192.168.99.100, here’s the example of test request and response from application.

$ curl -s http://192.168.99.100:30833/orders -d '{"type":"TRANSFER","accountIdFrom":1,"accountIdTo":2,"amount":300,"status":"NEW"}' -H 'Content-Type: application/json'
{"type":"TRANSFER","accountIdFrom":1,"accountIdTo":2,"date":null,"amount":300,"id":"10","status":"ACCEPTED"}

We may check a list of queues created on KubeMQ using command kubemqctl queues list as shown below.

kubemq-java-tutorial-queues

After sending some other test requests and performing some restarts of the application pod we may take a look at the event_store list using command kubemqctl events_store list as shown below. We may see that there are multiple clients with id count-listener* registered, but only the current is active.

kubemq-java-tutorial-events

Let’s take a look on application logs. They are automatically displayed on the screen after running the skaffold dev command. As you see each message sent to the queue is received by the listener, which performs transfer between accounts and then sends events to pub/sub. Finally both event_store listeners receive the event.

logs-1

If you restart the pod with the application TransactionCountListener receives all events available inside event_store and counts them for each target account id. If a total number of transactions for a single account extends 5 it sends extra funds to that account.

logs-2

If a transaction is rejected by OrderListener due to lack of funds on source account the message is re-delivered to the queue.

logs-3

Conclusion

In this article I show you a sample application that integrates with KubeMQ to realize standard use cases based on queues and topics (pub/sub). Starting with KubeMQ on Kubernetes and management is extremely easy with KubeMQ CLI. It has many interesting features described in quite well prepared documentation available on site https://docs.kubemq.io/. As a modern, cloud-native message broker KubeMQ is able to transfer billions of messages daily. However, we should bear in mind, it is a relatively new product, and features are not completely refined as in competition. For example, you can compare KubeMQ dashboard (available after executing command kubemqctl cluster dashboard) with RabbitMQ Web Admin. Of course, everything takes a little time, and I will follow the progress in KubeMQ development.

2 COMMENTS

comments user
Suhas Bs

I see there is community edition which is an open source version. any idea what features are not included in community edition? Does the persistence support and HA is available in free version?

    comments user
    piotr.minkowski

    Currently, it seems there is an open-source version available, but limited to a single pod (https://kubemq.io/product-pricing/). When I was writing the article they have only a commercial version.

Leave a Reply