Spring Cloud Stream with Schema Registry and Kafka
In this article, you will learn how to use Confluent Schema Registry with Spring Cloud Stream and Kafka in a microservices architecture. We will use Apache Avro to serialize and deserialize events exchanged between our applications. Spring Cloud Stream provides a handy mechanism for integration with Kafka and schema registry.
Ok, but before we start, let’s say some words about schema registry. What is this? And why we may use it in our event-driven architecture? Let’s imagine we change the message on the producer side, by adding or removing some fields. We sent that message to a Kafka topic, but we don’t have many subscribers is receiving such events. In a typical microservices architecture, we may have many producers and many subscribers. It is often necessary for all those microservices to agree on a contract that is based on a schema. If a schema is evolving, the existing microservices are still required to work. Here comes a schema registry server. It provides a RESTful interface for storing and retrieving schemas in different formats like JSON, Protobuf, or Avro. It also stores a versioned history of all schemas and provides schema compatibility checks.
We may choose between several available products. Spring Cloud has its own implementation of a schema registry server. Although it can be easily integrated with Spring Cloud Stream, we won’t use it. Currently, it doesn’t allow verifying compatibility between different versions. There is also an Apicurio registry. On the other hand, it is not possible to easily integrate it with Spring Cloud Stream. Therefore our choice fell on the Confluent schema registry.
Event-driven architecture with Spring Cloud and schema registry
We are going to run three applications. One of them is sending events to the Kafka topic, while two others are receiving them. The integration with Kafka is built on top of Spring Cloud Stream. The consumer Consumer-A
is expecting events compatible with the v1
of schema, while the second subscriber is expecting events compatible with the v2
of schema. Before sending a message to Kafka the producer application tries to load schema definition from a remote server. If there is no result, it submits the data to the server, which replies with versioning information. The following diagram illustrates our architecture.
If a new schema is not compatible with the previous version, a schema registry rejects it. As a result, Spring Cloud Stream doesn’t allow to send a message to the Kafka topic. Otherwise, it serializes a message using Apache Avro. When a subscriber receives a message it first fetches schema from a remote registry. It gets a version of the schema from the header of a message. Finally, it deserializes it using the Avro format.
Source Code
If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository and switch to the schema-registry branch. Then go to the event-driven
directory. After that, you should just follow my instructions. Let’s begin.
Running Confluent Schema Registry on Kubernetes
It seems that the simplest way to run Confluent Schema Registry locally is on Kubernetes. Since we need to run at least Zookeeper and Kafka to be able to run schema registry we will use Helm for it. First, let’s add Confluent Helm repository.
$ helm repo add confluentinc https://packages.confluent.io/helm
$ helm repo update
Then we just need to install Confluent Platform using operator.
$ kubectl create ns confluent
$ helm upgrade --install confluent-operator confluentinc/confluent-for-kubernetes
$ kubectl apply -f https://raw.githubusercontent.com/confluentinc/confluent-kubernetes-examples/master/quickstart-deploy/confluent-platform.yaml
Finally, let’s display a list of running pods in the confluent
namespace.
$ kubectl get pod -n confluent
NAME READY STATUS RESTARTS AGE
kafka-confluent-cp-control-center-5ccb7479fd-hmpg6 1/1 Running 10 2d17h
kafka-confluent-cp-kafka-0 2/2 Running 5 2d17h
kafka-confluent-cp-kafka-1 2/2 Running 5 2d17h
kafka-confluent-cp-kafka-2 2/2 Running 5 2d17h
kafka-confluent-cp-kafka-connect-797bd95655-kxnzm 2/2 Running 6 2d17h
kafka-confluent-cp-kafka-rest-69f49987bf-6nds7 2/2 Running 13 2d17h
kafka-confluent-cp-ksql-server-54675f9777-rbcb7 2/2 Running 9 2d17h
kafka-confluent-cp-schema-registry-7f6f6f9f8d-sh4b9 2/2 Running 11 2d17h
kafka-confluent-cp-zookeeper-0 2/2 Running 4 2d17h
kafka-confluent-cp-zookeeper-1 2/2 Running 4 2d17h
kafka-confluent-cp-zookeeper-2 2/2 Running 4 2d17h
After that, we may display a list of Kubernetes services. Our application we will connect to the Kafka cluster through the kafka-confluent-cp-kafka
service.
$ kubectl get svc -n confluent
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafka-confluent-cp-control-center ClusterIP 10.100.47.14 <none> 9021/TCP 2d17h
kafka-confluent-cp-kafka ClusterIP 10.102.129.194 <none> 9092/TCP,5556/TCP 2d17h
kafka-confluent-cp-kafka-connect ClusterIP 10.103.223.169 <none> 8083/TCP,5556/TCP 2d17h
kafka-confluent-cp-kafka-headless ClusterIP None <none> 9092/TCP 2d17h
kafka-confluent-cp-kafka-rest ClusterIP 10.102.7.98 <none> 8082/TCP,5556/TCP 2d17h
kafka-confluent-cp-ksql-server ClusterIP 10.108.116.196 <none> 8088/TCP,5556/TCP 2d17h
kafka-confluent-cp-schema-registry ClusterIP 10.102.169.4 <none> 8081/TCP,5556/TCP 2d17h
kafka-confluent-cp-zookeeper ClusterIP 10.99.33.73 <none> 2181/TCP,5556/TCP 2d17h
kafka-confluent-cp-zookeeper-headless ClusterIP None <none> 2888/TCP,3888/TCP 2d17h
Integrate Spring Cloud Stream with Confluent Schema Registry
In order to enable integration with Confluent Schema Registry we first need to include the spring-cloud-schema-registry-client
dependency to the Maven pom.xml
.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-schema-registry-client</artifactId>
</dependency>
After that, we should enable RegistryClient
through annotation. By default, the client uses a schema registry server provided by Spring Cloud. Therefore, we have registered the ConfluentSchemaRegistryClient
bean as a default client implementation.
@SpringBootApplication
@EnableSchemaRegistryClient
class ProductionApplication {
@Primary
@Bean
fun schemaRegistryClient(@Value("\${spring.cloud.schemaRegistryClient.endpoint}") endpoint: String?): SchemaRegistryClient {
val client = ConfluentSchemaRegistryClient()
client.setEndpoint(endpoint)
return client
}
}
Since we run our schema registry on Kubernetes, its address is different the default one. Let’s override it in application.properties
.
spring.cloud.schemaRegistryClient.endpoint=http://kafka-confluent-cp-schema-registry:8081/
Because we are going to serialize messages using Apache Avro format, we need to change a default content type for all topics to application/*-avro
. The message is sent with a contentType
header by using the following scheme: application/[prefix].[subject].v[version]+avro
, where prefix
is configurable and subject
is deduced from the payload type. The default prefix is vnd
, and since the name of a message class is CallmeEvent
the value of the header would be application/vnd.callmeevent.v1+avro
for the v1
version of schema or application/vnd.callmeevent.v2+avro
for the v2
version.
spring.cloud.stream.default.contentType=application/*+avro
Alternatively, we may set a content type just for a single destination. But more about it in the next sections.
Event class and Apache Avro serialization
We may choose between two types of approaches to the event class creation when working with Apache Avro. It is possible to generate Avro schema from a model class, or generate class from Avro schema using avro-maven-plugin
. Assuming we use a second approach we first need to create Avro schema and place it in the source code as the .avsc
file. Let’s say it is our Avro schema. It contains three fields id
, message
and eventType
. The name of a generated class will be CallmeEvent
and a package name will be the same as the namespace
.
{
"type":"record",
"name":"CallmeEvent",
"namespace":"pl.piomin.samples.eventdriven.producer.message.avro",
"fields": [
{
"name":"id",
"type":"int"
},{
"name":"message",
"type":"string"
},{
"name":"eventType",
"type": "string"
}
]
}
After that, we need to the following plugin to the Maven pom.xml
. We just need to configure the input directory with Avro schema files, and the output directory for the generated classes. Once you run a build, using for example mvn clean package
command it will generate a required class.
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.10.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
<outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
Just to simplify working with generated classes, let’s include the target/generated-sources/avro
as a source directory.
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources/avro</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
However, the simplest approach, especially in development, is to generate Avro schema automatically from the source code. With this approach, we first need to create CallmeEvent
class.
class CallmeEvent(val id: Int,
val message: String,
val eventType: String)
Then, we just need to enable dynamic Avro schema generation. Once you do it, Spring Cloud Stream automatically generates and sends schema to the schema registry before sending a message to a Kafka topic.
spring.cloud.schema.avro.dynamicSchemaGenerationEnabled=true
Integrate Spring Cloud Stream with Kafka
Spring Cloud Stream offers a broker agnostic programming model for sending and receiving messages. If you are looking for a quick introduction to that model and event-driven microservices read my article Introduction to event-driven microservices with Spring Cloud Stream. We use the same scenario as described in this article. However, we will add schema registry support and replace RabbitMQ with Kafka. In order to change the broker, we just need to replace a binder implementation as shown below.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
Here’s the main class of the producer-service
application. It uses the Supplier
bean to generate events continuously after startup.
@SpringBootApplication
@EnableSchemaRegistryClient
class ProductionApplication {
var id: Int = 0
@Value("\${callme.supplier.enabled}")
val supplierEnabled: Boolean = false
@Bean
fun callmeEventSupplier(): Supplier<Message<CallmeEvent>?> = Supplier { createEvent() }
@Primary
@Bean
fun schemaRegistryClient(@Value("\${spring.cloud.schemaRegistryClient.endpoint}") endpoint: String?): SchemaRegistryClient {
val client = ConfluentSchemaRegistryClient()
client.setEndpoint(endpoint)
return client
}
private fun createEvent(): Message<CallmeEvent>? {
return if (supplierEnabled)
MessageBuilder.withPayload(CallmeEvent(++id, "I'm callme event!", "ping"))
.setHeader("to_process", true)
.build()
else
null
}
}
Here’s a Spring Cloud Stream configuration for producer-service
and Supplier
bean. It configures partitioning based on the value of the id
field.
spring.cloud.stream.bindings.callmeEventSupplier-out-0.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventSupplier-out-0.destination=callme-events
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionCount=2
Both consumers are receiving messages from the callme-events
topic. The same as for producer-service we need to enable RegistryClient
support.
@SpringBootApplication
@EnableSchemaRegistryClient
class ConsumerAApplication {
val logger: Logger = LoggerFactory.getLogger(ConsumerAApplication::class.java)
@Bean
fun callmeEventConsumer(): Consumer<CallmeEvent> = Consumer {
logger.info("Received: {}", it)
}
}
We also need to configure deserialization with Avro and partitioning on the consumer side.
spring.cloud.stream.default.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventSupplier-in-0.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventConsumer-in-0.destination=callme-events
spring.cloud.stream.bindings.callmeEventConsumer-in-0.group=a
spring.cloud.stream.bindings.callmeEventConsumer-in-0.consumer.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=${INSTANCE_INDEX}
Deploy applications on Kubernetes
Firstly, let’s deploy our Spring Cloud Stream applications on Kubernetes. Here’s a Deployment
manifest for producer-service
.
apiVersion: apps/v1
kind: Deployment
metadata:
name: producer
spec:
selector:
matchLabels:
app: producer
template:
metadata:
labels:
app: producer
spec:
containers:
- name: producer
image: piomin/producer-service
ports:
- containerPort: 8080
We also have similar manifests for consumer applications. We need to set the INSTANCE_INDEX
environment variable, which is then responsible for partitioning configuration.
apiVersion: apps/v1
kind: Deployment
metadata:
name: consumer-a
spec:
selector:
matchLabels:
app: consumer-a
template:
metadata:
labels:
app: consumer-a
spec:
containers:
- name: consumer-a
image: piomin/consumer-a-service
env:
- name: INSTANCE_INDEX
value: "0"
ports:
- containerPort: 8080
The Deployment manifest for the consumer-b
application is visible below.
apiVersion: apps/v1
kind: Deployment
metadata:
name: consumer-b
spec:
selector:
matchLabels:
app: consumer-b
template:
metadata:
labels:
app: consumer-b
spec:
containers:
- name: consumer-b
image: piomin/consumer-b-service
env:
- name: INSTANCE_INDEX
value: "1"
ports:
- containerPort: 8080
All those applications may be deployed on Kubernetes with Skaffold. Each application directory contains a Skaffold configuration file skaffold.yaml
, so you just need to execute the following command to run them on Kubernetes.
$ skaffold run
Testing integration between Spring Cloud Stream and schema registry
In order to register the v1
version of the schema, we should run the producer-service
application with the following event class.
class CallmeEvent(val id: Int,
val message: String)
Then, we should restart it with the new version of the CallmeEvent
class as shown below.
class CallmeEvent(val id: Int,
val message: String,
val eventType: String)
Now, we can verify a list of schemas registered on the server. First, let’s enable port forwarding for the Confluent Schema Registry service.
$ kubectl port-forward svc/kafka-confluent-cp-schema-registry 8081:8081 -n confluent
Thanks to that, we may access schema registry REST API on the local port. Let’s display a list of registered subjects. As you see there is a single subject called callmeevent
.
$ curl http://localhost:8081/subjects
["callmeevent"]
In the next step, we may get a list of versions registered under the callmeevent
subject. As we expect, there are two versions available in the schema registry.
$ curl http://localhost:8081/subjects/callmeevent/versions
[1,2]
We can display a full schema definition by calling the following endpoint using schema id.
$ curl http://localhost:8081/schemas/ids/1
{"schema":"{\"type\":\"record\",\"name\":\"CallmeEvent\",\"namespace\":\"pl.piomin.samples.eventdriven.producer.message\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"message\",\"type\":\"string\"}]}"}
Finally, we are going to change our schema once again. Until then, a new version of a schema was compatible with the previous one. Now, we create a schema, which is incompatible with the previous version. In particular, we change the eventType
field into eventTp
. That change is provided on the producer side.
class CallmeEvent(val id: Int,
val message: String,
val eventTp: String)
After restarting producer-service
Spring Cloud Stream tries to register a new version of the schema. Let’s just take a look at application logs. As you see, a new schema has been rejected by the Confluent Schema Registry. Here’s a fragment of producer-service
logs after a schema change.
Leave a Reply