Spring Cloud Stream with Schema Registry and Kafka

Spring Cloud Stream with Schema Registry and Kafka

In this article, you will learn how to use Confluent Schema Registry with Spring Cloud Stream and Kafka in a microservices architecture. We will use Apache Avro to serialize and deserialize events exchanged between our applications. Spring Cloud Stream provides a handy mechanism for integration with Kafka and schema registry.

Ok, but before we start, let’s say some words about schema registry. What is this? And why we may use it in our event-driven architecture? Let’s imagine we change the message on the producer side, by adding or removing some fields. We sent that message to a Kafka topic, but we don’t have many subscribers is receiving such events. In a typical microservices architecture, we may have many producers and many subscribers. It is often necessary for all those microservices to agree on a contract that is based on a schema. If a schema is evolving, the existing microservices are still required to work. Here comes a schema registry server. It provides a RESTful interface for storing and retrieving schemas in different formats like JSON, Protobuf, or Avro. It also stores a versioned history of all schemas and provides schema compatibility checks.

We may choose between several available products. Spring Cloud has its own implementation of a schema registry server. Although it can be easily integrated with Spring Cloud Stream, we won’t use it. Currently, it doesn’t allow verifying compatibility between different versions. There is also an Apicurio registry. On the other hand, it is not possible to easily integrate it with Spring Cloud Stream. Therefore our choice fell on the Confluent schema registry.

Event-driven architecture with Spring Cloud and schema registry

We are going to run three applications. One of them is sending events to the Kafka topic, while two others are receiving them. The integration with Kafka is built on top of Spring Cloud Stream. The consumer Consumer-A is expecting events compatible with the v1 of schema, while the second subscriber is expecting events compatible with the v2 of schema. Before sending a message to Kafka the producer application tries to load schema definition from a remote server. If there is no result, it submits the data to the server, which replies with versioning information. The following diagram illustrates our architecture.

spring-cloud-schema-registry-arch

If a new schema is not compatible with the previous version, a schema registry rejects it. As a result, Spring Cloud Stream doesn’t allow to send a message to the Kafka topic. Otherwise, it serializes a message using Apache Avro. When a subscriber receives a message it first fetches schema from a remote registry. It gets a version of the schema from the header of a message. Finally, it deserializes it using the Avro format.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository and switch to the schema-registry branch. Then go to the event-driven directory. After that, you should just follow my instructions. Let’s begin.

Running Confluent Schema Registry on Kubernetes

It seems that the simplest way to run Confluent Schema Registry locally is on Kubernetes. Since we need to run at least Zookeeper and Kafka to be able to run schema registry we will use Helm for it. First, let’s add Confluent Helm repository.

$ helm repo add confluentinc https://packages.confluent.io/helm
$ helm repo update

Then we just need to install Confluent Platform using operator.

$ kubectl create ns confluent
$ helm upgrade --install confluent-operator confluentinc/confluent-for-kubernetes
$ kubectl apply -f https://raw.githubusercontent.com/confluentinc/confluent-kubernetes-examples/master/quickstart-deploy/confluent-platform.yaml

Finally, let’s display a list of running pods in the confluent namespace.

$ kubectl get pod -n confluent
NAME                                                  READY   STATUS    RESTARTS   AGE
kafka-confluent-cp-control-center-5ccb7479fd-hmpg6    1/1     Running   10         2d17h
kafka-confluent-cp-kafka-0                            2/2     Running   5          2d17h
kafka-confluent-cp-kafka-1                            2/2     Running   5          2d17h
kafka-confluent-cp-kafka-2                            2/2     Running   5          2d17h
kafka-confluent-cp-kafka-connect-797bd95655-kxnzm     2/2     Running   6          2d17h
kafka-confluent-cp-kafka-rest-69f49987bf-6nds7        2/2     Running   13         2d17h
kafka-confluent-cp-ksql-server-54675f9777-rbcb7       2/2     Running   9          2d17h
kafka-confluent-cp-schema-registry-7f6f6f9f8d-sh4b9   2/2     Running   11         2d17h
kafka-confluent-cp-zookeeper-0                        2/2     Running   4          2d17h
kafka-confluent-cp-zookeeper-1                        2/2     Running   4          2d17h
kafka-confluent-cp-zookeeper-2                        2/2     Running   4          2d17h

After that, we may display a list of Kubernetes services. Our application we will connect to the Kafka cluster through the kafka-confluent-cp-kafka service.

$ kubectl get svc -n confluent
NAME                                    TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)             AGE
kafka-confluent-cp-control-center       ClusterIP   10.100.47.14     <none>        9021/TCP            2d17h
kafka-confluent-cp-kafka                ClusterIP   10.102.129.194   <none>        9092/TCP,5556/TCP   2d17h
kafka-confluent-cp-kafka-connect        ClusterIP   10.103.223.169   <none>        8083/TCP,5556/TCP   2d17h
kafka-confluent-cp-kafka-headless       ClusterIP   None             <none>        9092/TCP            2d17h
kafka-confluent-cp-kafka-rest           ClusterIP   10.102.7.98      <none>        8082/TCP,5556/TCP   2d17h
kafka-confluent-cp-ksql-server          ClusterIP   10.108.116.196   <none>        8088/TCP,5556/TCP   2d17h
kafka-confluent-cp-schema-registry      ClusterIP   10.102.169.4     <none>        8081/TCP,5556/TCP   2d17h
kafka-confluent-cp-zookeeper            ClusterIP   10.99.33.73      <none>        2181/TCP,5556/TCP   2d17h
kafka-confluent-cp-zookeeper-headless   ClusterIP   None             <none>        2888/TCP,3888/TCP   2d17h

Integrate Spring Cloud Stream with Confluent Schema Registry

In order to enable integration with Confluent Schema Registry we first need to include the spring-cloud-schema-registry-client dependency to the Maven pom.xml.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-schema-registry-client</artifactId>
</dependency>

After that, we should enable RegistryClient through annotation. By default, the client uses a schema registry server provided by Spring Cloud. Therefore, we have registered the ConfluentSchemaRegistryClient bean as a default client implementation.

@SpringBootApplication
@EnableSchemaRegistryClient
class ProductionApplication {

   @Primary
   @Bean
   fun schemaRegistryClient(@Value("\${spring.cloud.schemaRegistryClient.endpoint}") endpoint: String?): SchemaRegistryClient {
      val client = ConfluentSchemaRegistryClient()
      client.setEndpoint(endpoint)
      return client
   }
}

Since we run our schema registry on Kubernetes, its address is different the default one. Let’s override it in application.properties.

spring.cloud.schemaRegistryClient.endpoint=http://kafka-confluent-cp-schema-registry:8081/

Because we are going to serialize messages using Apache Avro format, we need to change a default content type for all topics to application/*-avro. The message is sent with a contentType header by using the following scheme: application/[prefix].[subject].v[version]+avro, where prefix is configurable and subject is deduced from the payload type. The default prefix is vnd, and since the name of a message class is CallmeEvent the value of the header would be application/vnd.callmeevent.v1+avro for the v1 version of schema or application/vnd.callmeevent.v2+avro for the v2 version.

spring.cloud.stream.default.contentType=application/*+avro

Alternatively, we may set a content type just for a single destination. But more about it in the next sections.

Event class and Apache Avro serialization

We may choose between two types of approaches to the event class creation when working with Apache Avro. It is possible to generate Avro schema from a model class, or generate class from Avro schema using avro-maven-plugin. Assuming we use a second approach we first need to create Avro schema and place it in the source code as the .avsc file. Let’s say it is our Avro schema. It contains three fields id, message and eventType. The name of a generated class will be CallmeEvent and a package name will be the same as the namespace.

{
  "type":"record",
  "name":"CallmeEvent",
  "namespace":"pl.piomin.samples.eventdriven.producer.message.avro",
  "fields": [
    {
      "name":"id",
      "type":"int"
    },{
      "name":"message",
      "type":"string"
    },{
      "name":"eventType",
      "type": "string"
    }
  ]
}

After that, we need to the following plugin to the Maven pom.xml. We just need to configure the input directory with Avro schema files, and the output directory for the generated classes. Once you run a build, using for example mvn clean package command it will generate a required class.

<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.10.2</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
      <configuration>
        <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
        <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
      </configuration>
    </execution>
  </executions>
</plugin>

Just to simplify working with generated classes, let’s include the target/generated-sources/avro as a source directory.

<plugin>
  <groupId>org.codehaus.mojo</groupId>
  <artifactId>build-helper-maven-plugin</artifactId>
  <version>3.2.0</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>add-source</goal>
      </goals>
      <configuration>
        <sources>
          <source>${project.build.directory}/generated-sources/avro</source>
        </sources>
      </configuration>
    </execution>
  </executions>
</plugin>

However, the simplest approach, especially in development, is to generate Avro schema automatically from the source code. With this approach, we first need to create CallmeEvent class.

class CallmeEvent(val id: Int,
                  val message: String,
                  val eventType: String)

Then, we just need to enable dynamic Avro schema generation. Once you do it, Spring Cloud Stream automatically generates and sends schema to the schema registry before sending a message to a Kafka topic.

spring.cloud.schema.avro.dynamicSchemaGenerationEnabled=true

Integrate Spring Cloud Stream with Kafka

Spring Cloud Stream offers a broker agnostic programming model for sending and receiving messages. If you are looking for a quick introduction to that model and event-driven microservices read my article Introduction to event-driven microservices with Spring Cloud Stream. We use the same scenario as described in this article. However, we will add schema registry support and replace RabbitMQ with Kafka. In order to change the broker, we just need to replace a binder implementation as shown below.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

Here’s the main class of the producer-service application. It uses the Supplier bean to generate events continuously after startup.

@SpringBootApplication
@EnableSchemaRegistryClient
class ProductionApplication {

   var id: Int = 0

   @Value("\${callme.supplier.enabled}")
   val supplierEnabled: Boolean = false

   @Bean
   fun callmeEventSupplier(): Supplier<Message<CallmeEvent>?> = Supplier { createEvent() }

   @Primary
   @Bean
   fun schemaRegistryClient(@Value("\${spring.cloud.schemaRegistryClient.endpoint}") endpoint: String?): SchemaRegistryClient {
      val client = ConfluentSchemaRegistryClient()
      client.setEndpoint(endpoint)
      return client
   }

   private fun createEvent(): Message<CallmeEvent>? {
      return if (supplierEnabled)
         MessageBuilder.withPayload(CallmeEvent(++id, "I'm callme event!", "ping"))
                     .setHeader("to_process", true)
                     .build()
      else
         null
   }
}

Here’s a Spring Cloud Stream configuration for producer-service and Supplier bean. It configures partitioning based on the value of the id field.

spring.cloud.stream.bindings.callmeEventSupplier-out-0.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventSupplier-out-0.destination=callme-events
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionCount=2

Both consumers are receiving messages from the callme-events topic. The same as for producer-service we need to enable RegistryClient support.

@SpringBootApplication
@EnableSchemaRegistryClient
class ConsumerAApplication {

   val logger: Logger = LoggerFactory.getLogger(ConsumerAApplication::class.java)

   @Bean
   fun callmeEventConsumer(): Consumer<CallmeEvent> = Consumer { 
      logger.info("Received: {}", it) 
   }
}

We also need to configure deserialization with Avro and partitioning on the consumer side.

spring.cloud.stream.default.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventSupplier-in-0.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventConsumer-in-0.destination=callme-events
spring.cloud.stream.bindings.callmeEventConsumer-in-0.group=a
spring.cloud.stream.bindings.callmeEventConsumer-in-0.consumer.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=${INSTANCE_INDEX}

Deploy applications on Kubernetes

Firstly, let’s deploy our Spring Cloud Stream applications on Kubernetes. Here’s a Deployment manifest for producer-service.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer-service
        ports:
        - containerPort: 8080

We also have similar manifests for consumer applications. We need to set the INSTANCE_INDEX environment variable, which is then responsible for partitioning configuration.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-a
spec:
  selector:
    matchLabels:
      app: consumer-a
  template:
    metadata:
      labels:
        app: consumer-a
    spec:
      containers:
      - name: consumer-a
        image: piomin/consumer-a-service
        env:
          - name: INSTANCE_INDEX
            value: "0"
        ports:
        - containerPort: 8080

The Deployment manifest for the consumer-b application is visible below.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-b
spec:
  selector:
    matchLabels:
      app: consumer-b
  template:
    metadata:
      labels:
        app: consumer-b
    spec:
      containers:
      - name: consumer-b
        image: piomin/consumer-b-service
        env:
          - name: INSTANCE_INDEX
            value: "1"
        ports:
        - containerPort: 8080

All those applications may be deployed on Kubernetes with Skaffold. Each application directory contains a Skaffold configuration file skaffold.yaml, so you just need to execute the following command to run them on Kubernetes.

$ skaffold run

Testing integration between Spring Cloud Stream and schema registry

In order to register the v1 version of the schema, we should run the producer-service application with the following event class.

class CallmeEvent(val id: Int,
                  val message: String)

Then, we should restart it with the new version of the CallmeEvent class as shown below.

class CallmeEvent(val id: Int,
                  val message: String,
                  val eventType: String)

Now, we can verify a list of schemas registered on the server. First, let’s enable port forwarding for the Confluent Schema Registry service.

$ kubectl port-forward svc/kafka-confluent-cp-schema-registry 8081:8081 -n confluent

Thanks to that, we may access schema registry REST API on the local port. Let’s display a list of registered subjects. As you see there is a single subject called callmeevent.

$ curl http://localhost:8081/subjects
["callmeevent"]

In the next step, we may get a list of versions registered under the callmeevent subject. As we expect, there are two versions available in the schema registry.

$ curl http://localhost:8081/subjects/callmeevent/versions
[1,2]

We can display a full schema definition by calling the following endpoint using schema id.

$ curl http://localhost:8081/schemas/ids/1
{"schema":"{\"type\":\"record\",\"name\":\"CallmeEvent\",\"namespace\":\"pl.piomin.samples.eventdriven.producer.message\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"message\",\"type\":\"string\"}]}"}

Finally, we are going to change our schema once again. Until then, a new version of a schema was compatible with the previous one. Now, we create a schema, which is incompatible with the previous version. In particular, we change the eventType field into eventTp. That change is provided on the producer side.

class CallmeEvent(val id: Int,
                  val message: String,
                  val eventTp: String)

After restarting producer-service Spring Cloud Stream tries to register a new version of the schema. Let’s just take a look at application logs. As you see, a new schema has been rejected by the Confluent Schema Registry. Here’s a fragment of producer-service logs after a schema change.

Leave a Reply