Introduction
Initially when Kafka was created in 2010, the prime goal was to clean up the mess of LinkedIn Enterprise Architecture. It acted as a pub/sub and then it evolved into a complete event streaming platform. During this evolution, Confluent (The company providing a lot of useful tools along with Apache Kafka) realized that there is a need to have Schema Registry. This enables organizations to publish and keep clean data into Kafka Cluster. Let’s start our deep diving voyage to the expedition journey of Confluent Schema Registry.
Learning Objectives
- Introduction to Confluent Schema Registry
- Confluent Schema Registry Architecture
- Overview of AVRO
- Schema Evolution
- Hands-On: Working with Confluent Schema Registry
Introduction to Confluent Schema Registry
Confluent Schema Registry acts as a serving layer for the schemas in Kafka Ecosystem. RESTful interface is being provided for retrieving and storing various schema formats i.e. Avro, JSON, and Protobuf. Versioned history is being stored in it for all the schemas based on specified subject name strategy, it provides multiple compatibility settings and also allows evolution of schemas on the basis of the configured compatibility settings and expanded support for mentioned schema types. It provides serializers that plug into Apache Kafka® clients that handle schema storage and retrieval for Kafka messages that are sent in any of the supported formats.
Confluent Schema Registry Architecture
When we talk about the Schema Registry in the Confluent Platform, it has a place totally outside of the Kafka brokers. Producers and Consumers can still talk to Kafka to publish and read data (messages) to topics. At the same point of time, they can also talk to the Schema Registry to send and retrieve schemas that describe the data models for the messages.
(Source: Confluent)
It assigns a globally unique ID for each and every registered schema. Then allocated IDs are guaranteed to be monotonically increased but not necessarily consecutive. Durable backend and functions are provided by Kafka as a write-ahead changelog for the state of Schema Registry and the schemas it contains. Schema Registry is designed to be distributed, with single-primary architecture, and ZooKeeper/Kafka coordinates primary election (based on the configuration). Since Avro is one of the key formats for working with the Confluent Schema registry, let’s get a quick overview of it.
Overview of AVRO
Apache Avro is a language-neutral data serialization system developed by Doug Cutting, the father of Hadoop. A language-independent schema is associated with its read and write operations. Avro serializes the data which has a built-in schema. Data is being serialized into binary formats by Avro, which can be deserialized by any application. One can even declare the data structure in Avro by using the JSON format. Along with the AVRO data, Schema is also being stored in a file for any further processing.
Schema Evolution
Schemas act as a contract which defines what a message should look like. Most of the schema technologies provide a mechanism to validate whether a message in a new schema is compatible with previous versions or not.
- If data written with old version of schema is readable by new version of schema then it is called Backward Compatible
- If data written with new version of schema is readable by old version of schema then it is called Forward Compatible
- If it supports both then it is called Full Compatible
This ability to evolve a schema that doesn’t break old programs is how most shared messaging models are managed over time.
Create a New Schema using Kafka Avro console producer consumer
About the environment
2 options have been proposed to set up the environment. One is Docker based approach and the other is Linux based installation approach. You may choose any approach to install the Confluent platform.
Follow below blog to setup Confluent Kafka –
- QuickStart Guide for Installing Confluent Platform having Apache Kafka using Docker
- QuickStart Guide for Installing Confluent Platform 6.x
Start Confluent Kafka Servers
For this lab you should start the confluent service using the instructions below..
confluent local services start
Create a topic datacouch-product
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic datacouch-product
Produce a record with three field
kafka-avro-console-producer \
--broker-list 127.0.0.1:9092 --topic datacouch-product \
--property schema.registry.url=http://127.0.0.1:8081 \
--property value.schema='{"type":"record","name":"product","fields":[{"name":"prodid","type":"string"},{"name":"prodName","type":"string"},{"name":"price","type":"string"}]}'
Copy the below text one by one and paste it when you are producing topic
{“prodid”: “Sominex-Herbal-30-tablets”,”prodName”:”Sominex_Herbal”,”price”:”3.39″}
{“prodid”: “Fitbit-Surge-Fitness-Super-Watch-Black-Small”,”prodName”:”Fitbit_Surge_Fitness”,”price”:”157.99″}
Go to Control Center http://localhost:9021 and click on topics and search for datacouch-product topics
You see the topic datacouch-product and schema you declare in it
Open in another terminal
Consume the records from the beginning of the topic:
kafka-avro-console-consumer --topic datacouch-product \
--bootstrap-server 127.0.0.1:9092 \
--property schema.registry.url=http://127.0.0.1:8081 \
--from-beginning
let’s trigger an error:
If the data is not matched with given schema then it will trigger an error
kafka-avro-console-producer \
--broker-list 127.0.0.1:9092 --topic datacouch-product \
--property schema.registry.url=http://127.0.0.1:8081 \
--property value.schema='{"type":"record","name":"product","fields":[{"name":"prodid","type":"string"},{"name":"prodName","type":"string"},{"name":"price","type":"string"}]}'
{“prod”: “Express-Nails-Polished-Mushroom”,”prodName”:”Express Nails Polished Mushroom”,”price”:”7.99″}
Let’s Evolve Schema
kafka-avro-console-producer --broker-list localhost:9092 --topic datacouch-product --property schema.registry.url=http://127.0.0.1:8081 --property value.schema='{"type":"record","name":"product","fields":[{"name":"prod","type":"string","aliases":["prodid"]},{"name":"prodName","type":"string"},{"name":"price","type":"string"}]}'
{“prod”: “Express-Nails-Polished-Mushroom”,”prodName”:”Express Nails Polished Mushroom”,”price”:”7.99″}
Go to Control center → topics inside datacouch-product topic, and here you see the schema changes
Consume the records again from the beginning of the topic:
kafka-avro-console-consumer --topic datacouch-product \
--bootstrap-server localhost:9092 \
--from-beginning \
--property schema.registry.url=http://127.0.0.1:8081
If you need to check version history you can see the below screenshot
Check compatibility setting
Inspecting the Schema Registry REST API
Next you will inspect the contents and settings of Schema Registry via the REST API.
1. Find all the subjects in your Schema Registry:
curl http://127.0.0.1:8081/subjects
2. Check the number of versions in your subject
curl http://127.0.0.1:8081/subjects/datacouch-product-value/versions
3. View the contents of version 1 of the schema:
curl http://127.0.0.1:8081/subjects/datacouch-product-value/versions/1
Congratulations, you just experienced how Schema Registry can prevent issues at the producer side itself!
Conclusion
So in a nutshell we can say that the Schema Registry is a framework that acts as a central repository for your schemas in the Kafka ecosystem. It resolves the issue of having a multiple schema template for the data publishing from various sources and to make the life of a developer much easier.