What is Schema Compatibility?
Schema compatibility ensures that data serialized by a producer using one schema can be deserialized by a consumer using another. The Confluent Schema Registry provides several compatibility modes to enforce these rules when updating schemas.
There are two key directions of compatibility:
-
Backward Compatibility (Upward Compatibility): Old consumers can read data produced with a newer schema.
-
Forward Compatibility (Downward Compatibility): New consumers can read data produced with an older schema.

Types of Compatibility
Backward (Upward) Compatibility
Backward compatibility means that consumers using the old schema should still be able to read data written by producers using the new schema. This is often required in environments where consumers are not updated as frequently as producers.
For backward compatibility to be maintained, schema changes should follow these rules:
-
New fields: Can be added, but must have default values.
-
Fields removal: Removing a field will break backward compatibility.
-
Field type changes: Only safe promotions (e.g., from
int
tolong
) are allowed.
Example:
Old schema:
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"}
]
}
New schema:
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "email", "type": "string", "default": ""}
]
}
In this case, consumers using the old schema can still read the new data because the new field email
has a default value.
Forward (Downward) Compatibility
Forward compatibility ensures that new consumers can read data produced with an older schema. This is important when producers might be updated more slowly than consumers.
For forward compatibility to be maintained, schema changes should follow these rules:
-
New fields: Should not be added unless they have default values in the old schema.
-
Fields removal: Fields can be removed, but only if they were added with a default value in previous versions.
-
Field type changes: Must not break older schemas (e.g., changing a
long
back toint
would break forward compatibility).
Example:
Old schema:
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "email", "type": "string", "default": ""}
]
}
New schema:
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"}
]
}
In this case, the new consumer can read data written with the old schema, even though the email
field has been removed, because it had a default value.
Full Compatibility
Full compatibility ensures that both forward and backward compatibility are preserved. This is the most stringent mode and guarantees that both old and new consumers and producers can communicate seamlessly.
To achieve full compatibility, schema changes must adhere to both the rules for backward and forward compatibility.
Best Practices for Achieving Compatibility
To ensure compatibility in both directions, adhere to the following best practices:
-
Use default values: Always provide default values for new fields to ensure that older consumers can handle them.
-
Avoid removing fields: Try not to remove fields from the schema, especially if consumers may expect them. Instead, mark them as deprecated and avoid using them in new code.
-
Limit field type changes: Be cautious when changing field types. If you need to change a field type, make sure the change is safe for both backward and forward compatibility (e.g., promoting
int
tolong
). -
Schema evolution strategy: Adopt a robust schema evolution strategy that accounts for both backward and forward compatibility requirements based on your system’s needs.
-
Validate compatibility in CI/CD: Integrate schema compatibility checks in your CI/CD pipeline to prevent incompatible changes from being deployed.
Schema Evolution Java Example
Now we want to take a look how a schema evolves in a Java application that writes and reads into a specific version of our schema.
This is our schema in three different versions:
{
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{"name": "name", "type": "string"}
]
}
{
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int", "default": 0}
]
}
{
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{"name": "fullName", "type": "string"}
]
}
We’re writing a little test where …
-
Written schema1 is read with schema 2 → Ok
-
Written schema 2 is read with schema 1 → Ok
-
Written schema 1 is read with schema 3 → Fails
package com.hascode;
import org.apache.avro.Schema;
import org.apache.avro.generic.*;
import org.apache.avro.io.*;
import java.io.*;
public class AvroCompatibilityTest {
public static void main(String[] args) throws Exception {
test("schemas/User_v1.avsc", "schemas/User_v2.avsc");
test("schemas/User_v2.avsc", "schemas/User_v1.avsc");
test("schemas/User_v1.avsc", "schemas/User_v3.avsc");
}
private static void test(String writerPath, String readerPath) throws Exception {
Schema writerSchema = new Schema.Parser().parse(new File(writerPath));
Schema readerSchema = new Schema.Parser().parse(new File(readerPath));
GenericRecord record = new GenericData.Record(writerSchema);
record.put("name", "Alice");
if (writerSchema.getField("age") != null) {
record.put("age", 42);
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(writerSchema);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(record, encoder);
encoder.flush();
byte[] data = out.toByteArray();
try {
DatumReader<GenericRecord> reader = new GenericDatumReader<>(writerSchema, readerSchema);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
GenericRecord result = reader.read(null, decoder);
System.out.printf("✅ Read successful (Writer: %s → Reader: %s)%n", writerPath, readerPath);
System.out.println(" ➤ Result: " + result);
} catch (Exception e) {
System.out.printf("❌ Read failed (Writer: %s → Reader: %s)%n", writerPath, readerPath);
e.printStackTrace(System.out);
}
}
}
Running the tests produces the following output:
✅ Read successful (Writer: schemas/User_v1.avsc → Reader: schemas/User_v2.avsc)
➤ Result: {"name": "Alice", "age": 0}
✅ Read successful (Writer: schemas/User_v2.avsc → Reader: schemas/User_v1.avsc)
➤ Result: {"name": "Alice"}
❌ Read failed (Writer: schemas/User_v1.avsc → Reader: schemas/User_v3.avsc)
org.apache.avro.AvroTypeException: Found example.avro.User, expecting example.avro.User, missing required field fullName
THe full test project can be found here or may be checked out:
git clone https://github.com/hascode/avro-schema-compatibility-demo.git
Confluent Schema Registry
Quick setup with Kafka (needed) using Kraft to avoid Zookeeper and the Schema registry.
version: "3.8"
services:
kafka:
image: bitnami/kafka:3.4
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_BROKER_ID=1
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
- ALLOW_PLAINTEXT_LISTENER=yes
schema-registry:
image: confluentinc/cp-schema-registry:latest
container_name: schema-registry
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
We may now start our schema registry like this:
docker compose up -d
Registering a Schema and Breaking Schema Compatibility
Similar to the Java example example above, we want to register schemas in our Schema registry, and we’ll try to register an incompatible schema change in the registry.
First, we’re going to register the following schema:
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"}
]
}
Registration of a new scheme can be done with one easy curl
call:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}"}' \
http://localhost:8081/subjects/User-value/versions
{"id":1} (1)
1 | This is the version of our registered schema |
Now we may try to update our schema with an incompatible change:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"email\",\"type\":\"string\"}]}"}' \
http://localhost:8081/subjects/User-value/versions
{"error_code":409,"message":"Schema being registered is incompatible with an earlier schema for subject \"User-value\", details: [{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'email' at path '/fields/0' in the new schema has no default value and is missing in the old schema', additionalInfo:'email'}, {oldSchemaVersion: 1}, {oldSchema: '{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}'}, {validateFields: 'false', compatibility: 'BACKWARD'}]"}
The error returned gives us a good hint about the incompatibilities detected.
Another nice feature of the schema registry is, that the compatibility level may be configured on either global or schema-specific level (see Set compatibility mode for the Schema of a Subject).
Schema Registry Operations
Using curl
or any similar tool, we may run the following operations on our schema registry:
Get Subjects
curl -X GET http://localhost:8081/subjects
["User-value"]
Get versions of a Subject
curl -X GET http://localhost:8081/subjects/User-value/versions
[1]
Get latest Schema Version for a Subject
curl -X GET http://localhost:8081/subjects/User-value/versions/latest
{"subject":"User-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}"}
Get specific Schema Version for a Subject
curl -X GET http://localhost:8081/subjects/User-value/versions/1
{"subject":"User-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}"}
Get Schema by ID
curl -X GET http://localhost:8081/schemas/ids/1
{"schema":"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}"}
Test Schema compatibility
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{
"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"email\",\"type\":\"string\"}]}"
}' \
http://localhost:8081/compatibility/subjects/User-value/versions/latest
{"is_compatible":true}%
Get global compatibility mode
curl -X GET http://localhost:8081/config
{"compatibilityLevel":"BACKWARD"}
Set compatibility mode for the Schema of a Subject
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "FULL"}' \
http://localhost:8081/config/User-value
{"compatibility":"FULL"}
Delete a specific schema version of a subject
curl -X DELETE http://localhost:8081/subjects/User-value/versions/1
1
Check schema existence
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{
"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}"
}' \
http://localhost:8081/subjects/User-value
{"subject":"User-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}"}