Kafka Tracing with Spring Boot and Open Telemetry

Kafka Tracing with Spring Boot and Open Telemetry

In this article, you will learn how to configure tracing for Kafka producer and consumer with Spring Boot and Open Telemetry. We will use the Micrometer library for sending traces and Jaeger for storing and visualizing them. Spring Kafka comes with built-in integration with Micrometer for the KafkaTemplate and listener containers. You will also see how to configure the Spring Kafka observability to add our custom tags to traces.

If you are interested in Kafka and Spring Boot, you may find several articles on my blog about it. To read about concurrency with Kafka and Spring Boot read the following post. For example, there is also an interesting article about Kafka transactions here.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should go to the kafka directory. After that, you should just follow my instructions. Let’s begin.

Dependencies

Let’s take a look at the list of required Maven dependencies. It is the same for both of our sample Spring Boot apps. Of course, we need to add the Spring Boot starter and the Spring Kafka for sending or receiving messages. In order to automatically generate traces related to each message, we are including the Spring Boot Actuator and the Micrometer Tracing Open Telemetry bridge. Finally, we need to include the opentelemetry-exporter-otlp library to export traces outside the app.

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
  </dependency>
  <dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-tracing-bridge-otel</artifactId>
  </dependency>
  <dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-exporter-otlp</artifactId>
  </dependency>
</dependencies>

Spring Boot Kafka Tracing for Producer

Our apps don’t do anything complicated. They are just sending and receiving messages. Here’s the class representing the message exchanged between both apps.

public class Info {

    private Long id;
    private String source;
    private String space;
    private String cluster;
    private String message;

    public Info(Long id, String source, String space, String cluster, 
                String message) {
       this.id = id;
       this.source = source;
       this.space = space;
       this.cluster = cluster;
       this.message = message;
    }

   // GETTERS AND SETTERS
}

Let’s begin with the producer app. It generates and sends one message per second. Here’s the implementation of a @Service bean responsible for producing messages. It injects and uses the KafkaTemplate bean for that.

@Service
public class SenderService {

   private static final Logger LOG = LoggerFactory
      .getLogger(SenderService.class);

   AtomicLong id = new AtomicLong();
   @Autowired
   KafkaTemplate<Long, Info> template;

   @Value("${POD:kafka-producer}")
   private String pod;
   @Value("${NAMESPACE:empty}")
   private String namespace;
   @Value("${CLUSTER:localhost}")
   private String cluster;
   @Value("${TOPIC:info}")
   private String topic;

   @Scheduled(fixedRate = 1000)
   public void send() {
      Info info = new Info(id.incrementAndGet(), 
                           pod, 
                           namespace, 
                           cluster, 
                           "HELLO");
      CompletableFuture<SendResult<Long, Info>> result = template
         .send(topic, info.getId(), info);
      result.whenComplete((sr, ex) ->
                LOG.info("Sent({}): {}", sr.getProducerRecord().key(), 
                         sr.getProducerRecord().value()));
    }

}

Spring Boot provides an auto-configured instance of KafkaTemplate. However, to enable Kafka tracing with Spring Boot we need to customize that instance. Here’s the implementation of the KafkaTemplate bean inside the producer app’s main class. In order to enable tracing, we need to invoke the setObservationEnabled method. By default, the Micrometer module generates some generic tags. We want to add at least the name of the target topic and the Kafka message key. Therefore we are creating our custom implementation of the KafkaTemplateObservationConvention interface. It uses the KafkaRecordSenderContext to retrieve the topic name and the message key from the ProducerRecord object.

@SpringBootApplication
@EnableScheduling
public class KafkaProducer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaProducer.class);

   public static void main(String[] args) {
      SpringApplication.run(KafkaProducer.class, args);
   }

   @Bean
   public NewTopic infoTopic() {
      return TopicBuilder.name("info")
             .partitions(1)
             .replicas(1)
             .build();
   }

   @Bean
   public KafkaTemplate<Long, Info> kafkaTemplate(ProducerFactory<Long, Info> producerFactory) {
      KafkaTemplate<Long, Info> t = new KafkaTemplate<>(producerFactory);
      t.setObservationEnabled(true);
      t.setObservationConvention(new KafkaTemplateObservationConvention() {
         @Override
         public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) {
            return KeyValues.of("topic", context.getDestination(),
                    "id", String.valueOf(context.getRecord().key()));
         }
      });
      return t;
   }

}

We also need to set the address of the Jaeger instance and decide which percentage of spans will be exported. Here’s the application.yml file with the required properties:

spring:
  application.name: kafka-producer
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

management:
  tracing:
    enabled: true
    sampling:
      probability: 1.0
  otlp:
    tracing:
      endpoint: http://jaeger:4318/v1/traces

Spring Boot Kafka Tracing for Consumer

Let’s switch to the consumer app. It just receives and prints messages coming to the Kafka topic. Here’s the implementation of the listener @Service. Besides the whole message content, it also prints the message key and a topic partition number.

@Service
public class ListenerService {

   private static final Logger LOG = LoggerFactory
      .getLogger(ListenerService.class);

   @KafkaListener(id = "info", topics = "${app.in.topic}")
   public void onMessage(@Payload Info info,
                         @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key,
                         @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("Received(key={}, partition={}): {}", key, partition, info);
   }

}

In order to generate and export traces on the consumer side we need to override the ConcurrentKafkaListenerContainerFactory bean. For the container listener factory, we should obtain the ContainerProperties instance and then invoke the setObservationEnabled method. The same as before we can create a custom implementation of the KafkaTemplateObservationConvention interface to include the additional tags (optionally).

@SpringBootApplication
@EnableKafka
public class KafkaConsumer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaConsumer.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumer.class, args);
    }

    @Value("${app.in.topic}")
    private String topic;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> listenerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setObservationEnabled(true);
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public NewTopic infoTopic() {
        return TopicBuilder.name(topic)
                .partitions(10)
                .replicas(3)
                .build();
    }

}

Of course, we also need to set a Jaeger address in the application.yml file:

spring:
  application.name: kafka-consumer
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

app.in.topic: ${TOPIC:info}

management:
  tracing:
    enabled: true
    sampling:
      probability: 1.0
  otlp:
    tracing:
      endpoint: http://jaeger:4318/v1/traces

Trying on Docker

Once we finish the implementation we can try out our solution. We will run both Kafka and Jaeger as Docker containers. Firstly, let’s build the project and container images for the producer and consumer apps. Spring Boot provides built-in tools for that. Therefore, we just need to execute the following command:

$ mvn clean package spring-boot:build-image

After that, we can define the docker-compose.yml file with a list of containers. It is possible to dynamically override Spring Boot properties using a style based on environment variables. Thanks to that, we can easily change the Kafka and Jaeger addresses for the containers. Here’s our docker-compose.yml:

version: "3.8"
services:
  broker:
    image: moeenz/docker-kafka-kraft:latest
    restart: always
    ports:
      - "9092:9092"
    environment:
      - KRAFT_CONTAINER_HOST_NAME=broker
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"
      - "4317:4317"
      - "4318:4318"
  producer:
    image: library/producer:1.0-SNAPSHOT
    links:
      - broker
      - jaeger
    environment:
      MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces
      SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092
  consumer:
    image: library/consumer:1.0-SNAPSHOT
    links:
      - broker
      - jaeger
    environment:
      MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces
      SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092

Let’s run all the defined containers with the following command:

$ docker compose up

Our apps are running and exchanging messages:

The Jaeger dashboard is available under the 16686 port. As you see, there are several traces with the kafka-producer and kafka-consumer spans.

spring-boot-kafka-tracing-jaeger

We can go into the details of each entry. The trace generated by the producer app is always correlated to the trace generated by the consumer app for every single message. There are also our two custom tags (id and topic) with values added by the KafkaTemplate bean.

spring-boot-kafka-tracing-details

Running on Kubernetes

Our sample apps are prepared for being deployed on Kubernetes. You can easily do it with the Skaffold CLI. Before that, we need to install Kafka and Jaeger on Kubernetes. I will not get into details about Kafka installation. You can find a detailed description of how to run Kafka on Kubernetes with the Strimzi operator in my article available here. After that, we can proceed to the Jaeger installation. In the first step, we need to add the following Helm repository:

$ helm repo add jaegertracing https://jaegertracing.github.io/helm-charts

By default, the Jaeger Helm chart doesn’t expose OTLP endpoints. In order to enable them, we need to override some default settings. Here’s our values YAML manifest:

collector:
  service:
    otlp:
      grpc:
        name: otlp-grpc
        port: 4317
      http:
        name: otlp-http
        port: 4318

Let’s install Jaeger in the jaeger namespace with the parameters from jaeger-values.yaml:

$ helm install jaeger jaegertracing/jaeger -n jaeger \
    --create-namespace \
    -f jaeger-values.yaml

Once we install Jaeger we can verify a list of Kubernetes Services. We will use the jaeger-collector service to send traces for the apps and the jaeger-query service to access the UI dashboard.

$ kubectl get svc -n jaeger
NAME               TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                           AGE
jaeger-agent       ClusterIP   10.96.147.104   <none>        5775/UDP,6831/UDP,6832/UDP,5778/TCP,14271/TCP     14m
jaeger-cassandra   ClusterIP   None            <none>        7000/TCP,7001/TCP,7199/TCP,9042/TCP,9160/TCP      14m
jaeger-collector   ClusterIP   10.96.111.236   <none>        14250/TCP,14268/TCP,4317/TCP,4318/TCP,14269/TCP   14m
jaeger-query       ClusterIP   10.96.88.64     <none>        80/TCP,16685/TCP,16687/TCP                        14m

Finally, we can run our sample Spring Boot apps that connect to Kafka and Jaeger. Here’s the Deployment object for the producer app. It overrides the default Kafka and Jaeger addresses by defining the KAFKA_URL and MANAGEMENT_OTLP_TRACING_ENDPOINT environment variables.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: MANAGEMENT_OTLP_TRACING_ENDPOINT
            value: http://jaeger-collector.jaeger:4318/v1/traces
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap
          - name: CLUSTER
            value: c1
          - name: TOPIC
            value: test-1
          - name: POD
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace

Here’s a similar Deployment object for the consumer app:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-1
spec:
  selector:
    matchLabels:
      app: consumer-1
  template:
    metadata:
      labels:
        app: consumer-1
    spec:
      containers:
      - name: consumer
        image: piomin/consumer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: SPRING_APPLICATION_NAME
            value: kafka-consumer-1
          - name: TOPIC
            value: test-1
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap
          - name: MANAGEMENT_OTLP_TRACING_ENDPOINT
            value: http://jaeger-collector.jaeger:4318/v1/traces

Assuming that you are inside the kafka directory in the Git repository, you just need to run the following command to deploy both apps. By the way, I’ll create two deployments of the consumer app (consumer-1 and consumer-2) just for Jaeger visualization purposes.

$ skaffold run -n strimzi --tail

Once you run the apps, you can go to the Jaeger dashboard and verify the list of traces. In order to access the dashboard, we can enable port forwarding for the jaeger-query Service.

$ kubectl port-forward svc/jaeger-query 80:80

Final Thoughts

Integration between Spring Kafka and Micrometer Tracing is a relatively new feature available since the 3.0 version. It is possible, that it will be improved soon with some new features. Anyway, currently it gives a simple way to generate and send traces from Kafka producers and consumers.

10 COMMENTS

comments user
Balazs

Thanks for the article, it is really useful.
However, one remark: it is not a good practice to declare “id” as a low cardinality key for metrics/tracing purposes. It should be declared as a high cardinality key, so it will only be used for tracing, and not for metrics.

    comments user
    piotr.minkowski

    Ok, thanks for that suggestion.

comments user
Jean

Hi, I think what you described here can be achieved in an easier, non-programmatic way with pure opentelemetry agents without micrometer. Have you explored this approach or is there something that I’m missing that micrometer is truly adding here?

    comments user
    piotr.minkowski

    No. But I know that currently Spring team works on the simplifying it onside the Spring Kafka. With Spring Boot 3.2 you can use YAML/properties instead of the `setObservationEnabled` method.

comments user
Jagan Singh

but why trace id’s are not visible in logs

    comments user
    piotr.minkowski

    In order to achive it, we should override the default Spring Boot log format (e.g. the `logging.pattern.console` property). Maybe in the future, the Spring Boot Kafka autoconfiguration will do that.

comments user
Artem

Thanks. Please explain from where the consumer gets the producers trace id? I.e. what makes the spans actually work together.

    comments user
    piotr.minkowski

    The library add trace id and span id headers to the messages

comments user
Tommaso

Hi, thanks for the great post. Unfortunately the consumer part doesn’t work properly.
I found a workaround that I would like to share. These are the fixes:

@Bean(“factory”)
public ConcurrentKafkaListenerContainerFactory listenerFactory(ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.getContainerProperties().setObservationEnabled(true);
factory.getContainerProperties().setMicrometerEnabled(true);
factory.setConsumerFactory(consumerFactory);
returnfactory;
}

@Service
public class ListenerService {
private static final Logger LOG = LoggerFactory.getLogger(ListenerService.class);
@KafkaListener(id = “info”, topics = “${app.in.topic}”, containerFactory = “factory”)
public void onMessage(@Payload Info info,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
LOG.info(“Received(key={}, partition={}): {}”, key, partition, info);
}

}
Greetings

    comments user
    piotr.minkowski

    Hi. But what exactly did you change? Just added that line: factory.getContainerProperties().setMicrometerEnabled(true); ?

Leave a Reply