Kafka Tracing with Spring Boot and Open Telemetry
In this article, you will learn how to configure tracing for Kafka producer and consumer with Spring Boot and Open Telemetry. We will use the Micrometer library for sending traces and Jaeger for storing and visualizing them. Spring Kafka comes with built-in integration with Micrometer for the KafkaTemplate
and listener containers. You will also see how to configure the Spring Kafka observability to add our custom tags to traces.
If you are interested in Kafka and Spring Boot, you may find several articles on my blog about it. To read about concurrency with Kafka and Spring Boot read the following post. For example, there is also an interesting article about Kafka transactions here.
Source Code
If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should go to the kafka
directory. After that, you should just follow my instructions. Let’s begin.
Dependencies
Let’s take a look at the list of required Maven dependencies. It is the same for both of our sample Spring Boot apps. Of course, we need to add the Spring Boot starter and the Spring Kafka for sending or receiving messages. In order to automatically generate traces related to each message, we are including the Spring Boot Actuator and the Micrometer Tracing Open Telemetry bridge. Finally, we need to include the opentelemetry-exporter-otlp
library to export traces outside the app.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
</dependencies>
Spring Boot Kafka Tracing for Producer
Our apps don’t do anything complicated. They are just sending and receiving messages. Here’s the class representing the message exchanged between both apps.
public class Info {
private Long id;
private String source;
private String space;
private String cluster;
private String message;
public Info(Long id, String source, String space, String cluster,
String message) {
this.id = id;
this.source = source;
this.space = space;
this.cluster = cluster;
this.message = message;
}
// GETTERS AND SETTERS
}
Let’s begin with the producer
app. It generates and sends one message per second. Here’s the implementation of a @Service
bean responsible for producing messages. It injects and uses the KafkaTemplate
bean for that.
@Service
public class SenderService {
private static final Logger LOG = LoggerFactory
.getLogger(SenderService.class);
AtomicLong id = new AtomicLong();
@Autowired
KafkaTemplate<Long, Info> template;
@Value("${POD:kafka-producer}")
private String pod;
@Value("${NAMESPACE:empty}")
private String namespace;
@Value("${CLUSTER:localhost}")
private String cluster;
@Value("${TOPIC:info}")
private String topic;
@Scheduled(fixedRate = 1000)
public void send() {
Info info = new Info(id.incrementAndGet(),
pod,
namespace,
cluster,
"HELLO");
CompletableFuture<SendResult<Long, Info>> result = template
.send(topic, info.getId(), info);
result.whenComplete((sr, ex) ->
LOG.info("Sent({}): {}", sr.getProducerRecord().key(),
sr.getProducerRecord().value()));
}
}
Spring Boot provides an auto-configured instance of KafkaTemplate
. However, to enable Kafka tracing with Spring Boot we need to customize that instance. Here’s the implementation of the KafkaTemplate
bean inside the producer
app’s main class. In order to enable tracing, we need to invoke the setObservationEnabled
method. By default, the Micrometer module generates some generic tags. We want to add at least the name of the target topic and the Kafka message key. Therefore we are creating our custom implementation of the KafkaTemplateObservationConvention
interface. It uses the KafkaRecordSenderContext
to retrieve the topic name and the message key from the ProducerRecord
object.
@SpringBootApplication
@EnableScheduling
public class KafkaProducer {
private static final Logger LOG = LoggerFactory
.getLogger(KafkaProducer.class);
public static void main(String[] args) {
SpringApplication.run(KafkaProducer.class, args);
}
@Bean
public NewTopic infoTopic() {
return TopicBuilder.name("info")
.partitions(1)
.replicas(1)
.build();
}
@Bean
public KafkaTemplate<Long, Info> kafkaTemplate(ProducerFactory<Long, Info> producerFactory) {
KafkaTemplate<Long, Info> t = new KafkaTemplate<>(producerFactory);
t.setObservationEnabled(true);
t.setObservationConvention(new KafkaTemplateObservationConvention() {
@Override
public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) {
return KeyValues.of("topic", context.getDestination(),
"id", String.valueOf(context.getRecord().key()));
}
});
return t;
}
}
We also need to set the address of the Jaeger instance and decide which percentage of spans will be exported. Here’s the application.yml
file with the required properties:
spring:
application.name: kafka-producer
kafka:
bootstrap-servers: ${KAFKA_URL:localhost}:9092
producer:
key-serializer: org.apache.kafka.common.serialization.LongSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
management:
tracing:
enabled: true
sampling:
probability: 1.0
otlp:
tracing:
endpoint: http://jaeger:4318/v1/traces
Spring Boot Kafka Tracing for Consumer
Let’s switch to the consumer
app. It just receives and prints messages coming to the Kafka topic. Here’s the implementation of the listener @Service
. Besides the whole message content, it also prints the message key and a topic partition number.
@Service
public class ListenerService {
private static final Logger LOG = LoggerFactory
.getLogger(ListenerService.class);
@KafkaListener(id = "info", topics = "${app.in.topic}")
public void onMessage(@Payload Info info,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
LOG.info("Received(key={}, partition={}): {}", key, partition, info);
}
}
In order to generate and export traces on the consumer side we need to override the ConcurrentKafkaListenerContainerFactory
bean. For the container listener factory, we should obtain the ContainerProperties
instance and then invoke the setObservationEnabled
method. The same as before we can create a custom implementation of the KafkaTemplateObservationConvention
interface to include the additional tags (optionally).
@SpringBootApplication
@EnableKafka
public class KafkaConsumer {
private static final Logger LOG = LoggerFactory
.getLogger(KafkaConsumer.class);
public static void main(String[] args) {
SpringApplication.run(KafkaConsumer.class, args);
}
@Value("${app.in.topic}")
private String topic;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> listenerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setObservationEnabled(true);
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public NewTopic infoTopic() {
return TopicBuilder.name(topic)
.partitions(10)
.replicas(3)
.build();
}
}
Of course, we also need to set a Jaeger address in the application.yml
file:
spring:
application.name: kafka-consumer
kafka:
bootstrap-servers: ${KAFKA_URL:localhost}:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
app.in.topic: ${TOPIC:info}
management:
tracing:
enabled: true
sampling:
probability: 1.0
otlp:
tracing:
endpoint: http://jaeger:4318/v1/traces
Trying on Docker
Once we finish the implementation we can try out our solution. We will run both Kafka and Jaeger as Docker containers. Firstly, let’s build the project and container images for the producer
and consumer
apps. Spring Boot provides built-in tools for that. Therefore, we just need to execute the following command:
$ mvn clean package spring-boot:build-image
After that, we can define the docker-compose.yml
file with a list of containers. It is possible to dynamically override Spring Boot properties using a style based on environment variables. Thanks to that, we can easily change the Kafka and Jaeger addresses for the containers. Here’s our docker-compose.yml
:
version: "3.8"
services:
broker:
image: moeenz/docker-kafka-kraft:latest
restart: always
ports:
- "9092:9092"
environment:
- KRAFT_CONTAINER_HOST_NAME=broker
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "4317:4317"
- "4318:4318"
producer:
image: library/producer:1.0-SNAPSHOT
links:
- broker
- jaeger
environment:
MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces
SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092
consumer:
image: library/consumer:1.0-SNAPSHOT
links:
- broker
- jaeger
environment:
MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces
SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092
Let’s run all the defined containers with the following command:
$ docker compose up
Our apps are running and exchanging messages:
The Jaeger dashboard is available under the 16686
port. As you see, there are several traces with the kafka-producer
and kafka-consumer
spans.
We can go into the details of each entry. The trace generated by the producer
app is always correlated to the trace generated by the consumer
app for every single message. There are also our two custom tags (id
and topic
) with values added by the KafkaTemplate
bean.
Running on Kubernetes
Our sample apps are prepared for being deployed on Kubernetes. You can easily do it with the Skaffold CLI. Before that, we need to install Kafka and Jaeger on Kubernetes. I will not get into details about Kafka installation. You can find a detailed description of how to run Kafka on Kubernetes with the Strimzi operator in my article available here. After that, we can proceed to the Jaeger installation. In the first step, we need to add the following Helm repository:
$ helm repo add jaegertracing https://jaegertracing.github.io/helm-charts
By default, the Jaeger Helm chart doesn’t expose OTLP endpoints. In order to enable them, we need to override some default settings. Here’s our values
YAML manifest:
collector:
service:
otlp:
grpc:
name: otlp-grpc
port: 4317
http:
name: otlp-http
port: 4318
Let’s install Jaeger in the jaeger
namespace with the parameters from jaeger-values.yaml
:
$ helm install jaeger jaegertracing/jaeger -n jaeger \
--create-namespace \
-f jaeger-values.yaml
Once we install Jaeger we can verify a list of Kubernetes Service
s. We will use the jaeger-collector
service to send traces for the apps and the jaeger-query
service to access the UI dashboard.
$ kubectl get svc -n jaeger
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
jaeger-agent ClusterIP 10.96.147.104 <none> 5775/UDP,6831/UDP,6832/UDP,5778/TCP,14271/TCP 14m
jaeger-cassandra ClusterIP None <none> 7000/TCP,7001/TCP,7199/TCP,9042/TCP,9160/TCP 14m
jaeger-collector ClusterIP 10.96.111.236 <none> 14250/TCP,14268/TCP,4317/TCP,4318/TCP,14269/TCP 14m
jaeger-query ClusterIP 10.96.88.64 <none> 80/TCP,16685/TCP,16687/TCP 14m
Finally, we can run our sample Spring Boot apps that connect to Kafka and Jaeger. Here’s the Deployment object for the producer app. It overrides the default Kafka and Jaeger addresses by defining the KAFKA_URL
and MANAGEMENT_OTLP_TRACING_ENDPOINT
environment variables.
apiVersion: apps/v1
kind: Deployment
metadata:
name: producer
spec:
selector:
matchLabels:
app: producer
template:
metadata:
labels:
app: producer
spec:
containers:
- name: producer
image: piomin/producer
resources:
requests:
memory: 200Mi
cpu: 100m
ports:
- containerPort: 8080
env:
- name: MANAGEMENT_OTLP_TRACING_ENDPOINT
value: http://jaeger-collector.jaeger:4318/v1/traces
- name: KAFKA_URL
value: my-cluster-kafka-bootstrap
- name: CLUSTER
value: c1
- name: TOPIC
value: test-1
- name: POD
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
Here’s a similar Deployment object for the consumer
app:
apiVersion: apps/v1
kind: Deployment
metadata:
name: consumer-1
spec:
selector:
matchLabels:
app: consumer-1
template:
metadata:
labels:
app: consumer-1
spec:
containers:
- name: consumer
image: piomin/consumer
resources:
requests:
memory: 200Mi
cpu: 100m
ports:
- containerPort: 8080
env:
- name: SPRING_APPLICATION_NAME
value: kafka-consumer-1
- name: TOPIC
value: test-1
- name: KAFKA_URL
value: my-cluster-kafka-bootstrap
- name: MANAGEMENT_OTLP_TRACING_ENDPOINT
value: http://jaeger-collector.jaeger:4318/v1/traces
Assuming that you are inside the kafka
directory in the Git repository, you just need to run the following command to deploy both apps. By the way, I’ll create two deployments of the consumer
app (consumer-1
and consumer-2
) just for Jaeger visualization purposes.
$ skaffold run -n strimzi --tail
Once you run the apps, you can go to the Jaeger dashboard and verify the list of traces. In order to access the dashboard, we can enable port forwarding for the jaeger-query
Service
.
$ kubectl port-forward svc/jaeger-query 80:80
Final Thoughts
Integration between Spring Kafka and Micrometer Tracing is a relatively new feature available since the 3.0
version. It is possible, that it will be improved soon with some new features. Anyway, currently it gives a simple way to generate and send traces from Kafka producers and consumers.
11 COMMENTS