What is Schema Compatibility?

Schema compatibility ensures that data serialized by a producer using one schema can be deserialized by a consumer using another. The Confluent Schema Registry provides several compatibility modes to enforce these rules when updating schemas.

There are two key directions of compatibility:

  • Backward Compatibility (Upward Compatibility): Old consumers can read data produced with a newer schema.

  • Forward Compatibility (Downward Compatibility): New consumers can read data produced with an older schema.

avro schema compatibility
Figure 1. Avro Schema Compatibility Cover Image

Types of Compatibility

Backward (Upward) Compatibility

Backward compatibility means that consumers using the old schema should still be able to read data written by producers using the new schema. This is often required in environments where consumers are not updated as frequently as producers.

For backward compatibility to be maintained, schema changes should follow these rules:

  • New fields: Can be added, but must have default values.

  • Fields removal: Removing a field will break backward compatibility.

  • Field type changes: Only safe promotions (e.g., from int to long) are allowed.

Example:

Old schema:

{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"}
  ]
}

New schema:

{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "email", "type": "string", "default": ""}
  ]
}

In this case, consumers using the old schema can still read the new data because the new field email has a default value.

Forward (Downward) Compatibility

Forward compatibility ensures that new consumers can read data produced with an older schema. This is important when producers might be updated more slowly than consumers.

For forward compatibility to be maintained, schema changes should follow these rules:

  • New fields: Should not be added unless they have default values in the old schema.

  • Fields removal: Fields can be removed, but only if they were added with a default value in previous versions.

  • Field type changes: Must not break older schemas (e.g., changing a long back to int would break forward compatibility).

Example:

Old schema:

{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "email", "type": "string", "default": ""}
  ]
}

New schema:

{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"}
  ]
}

In this case, the new consumer can read data written with the old schema, even though the email field has been removed, because it had a default value.

Full Compatibility

Full compatibility ensures that both forward and backward compatibility are preserved. This is the most stringent mode and guarantees that both old and new consumers and producers can communicate seamlessly.

To achieve full compatibility, schema changes must adhere to both the rules for backward and forward compatibility.

Best Practices for Achieving Compatibility

To ensure compatibility in both directions, adhere to the following best practices:

  • Use default values: Always provide default values for new fields to ensure that older consumers can handle them.

  • Avoid removing fields: Try not to remove fields from the schema, especially if consumers may expect them. Instead, mark them as deprecated and avoid using them in new code.

  • Limit field type changes: Be cautious when changing field types. If you need to change a field type, make sure the change is safe for both backward and forward compatibility (e.g., promoting int to long).

  • Schema evolution strategy: Adopt a robust schema evolution strategy that accounts for both backward and forward compatibility requirements based on your system’s needs.

  • Validate compatibility in CI/CD: Integrate schema compatibility checks in your CI/CD pipeline to prevent incompatible changes from being deployed.

Schema Evolution Java Example

Now we want to take a look how a schema evolves in a Java application that writes and reads into a specific version of our schema.

This is our schema in three different versions:

Schema in Version 1: User_1.avsc
{
  "type": "record",
  "name": "User",
  "namespace": "example.avro",
  "fields": [
    {"name": "name", "type": "string"}
  ]
}
Schema in Version 2: User_2.avsc with compatible changes
{
  "type": "record",
  "name": "User",
  "namespace": "example.avro",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int", "default": 0}
  ]
}
Schema in Version 3: User_3.avsc with incompatible changes
{
  "type": "record",
  "name": "User",
  "namespace": "example.avro",
  "fields": [
    {"name": "fullName", "type": "string"}
  ]
}

We’re writing a little test where …​

  • Written schema1 is read with schema 2 → Ok

  • Written schema 2 is read with schema 1 → Ok

  • Written schema 1 is read with schema 3 → Fails

AvroCompatibilityTest
package com.hascode;

import org.apache.avro.Schema;
import org.apache.avro.generic.*;
import org.apache.avro.io.*;

import java.io.*;

public class AvroCompatibilityTest {

    public static void main(String[] args) throws Exception {
        test("schemas/User_v1.avsc", "schemas/User_v2.avsc");
        test("schemas/User_v2.avsc", "schemas/User_v1.avsc");
        test("schemas/User_v1.avsc", "schemas/User_v3.avsc");
    }

    private static void test(String writerPath, String readerPath) throws Exception {
        Schema writerSchema = new Schema.Parser().parse(new File(writerPath));
        Schema readerSchema = new Schema.Parser().parse(new File(readerPath));

        GenericRecord record = new GenericData.Record(writerSchema);
        record.put("name", "Alice");
        if (writerSchema.getField("age") != null) {
            record.put("age", 42);
        }

        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(writerSchema);
        Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(record, encoder);
        encoder.flush();

        byte[] data = out.toByteArray();

        try {
            DatumReader<GenericRecord> reader = new GenericDatumReader<>(writerSchema, readerSchema);
            Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
            GenericRecord result = reader.read(null, decoder);
            System.out.printf("✅ Read successful (Writer: %s → Reader: %s)%n", writerPath, readerPath);
            System.out.println("   ➤ Result: " + result);
        } catch (Exception e) {
            System.out.printf("❌ Read failed (Writer: %s → Reader: %s)%n", writerPath, readerPath);
            e.printStackTrace(System.out);
        }
    }
}

Running the tests produces the following output:

✅ Read successful (Writer: schemas/User_v1.avsc → Reader: schemas/User_v2.avsc)
   ➤ Result: {"name": "Alice", "age": 0}
✅ Read successful (Writer: schemas/User_v2.avsc → Reader: schemas/User_v1.avsc)
   ➤ Result: {"name": "Alice"}
❌ Read failed (Writer: schemas/User_v1.avsc → Reader: schemas/User_v3.avsc)
org.apache.avro.AvroTypeException: Found example.avro.User, expecting example.avro.User, missing required field fullName

THe full test project can be found here or may be checked out:

git clone https://github.com/hascode/avro-schema-compatibility-demo.git

Confluent Schema Registry

Quick setup with Kafka (needed) using Kraft to avoid Zookeeper and the Schema registry.

docker-compose.yaml
version: "3.8"
services:
  kafka:
    image: bitnami/kafka:3.4
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
      - ALLOW_PLAINTEXT_LISTENER=yes

  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    container_name: schema-registry
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092

We may now start our schema registry like this:

Starting Kafka and Schema registry
docker compose up -d

Registering a Schema and Breaking Schema Compatibility

Similar to the Java example example above, we want to register schemas in our Schema registry, and we’ll try to register an incompatible schema change in the registry.

First, we’re going to register the following schema:

{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"}
  ]
}

Registration of a new scheme can be done with one easy curl call:

Registering a schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}"}' \
    http://localhost:8081/subjects/User-value/versions

{"id":1} (1)
1 This is the version of our registered schema

Now we may try to update our schema with an incompatible change:

Updating the schema with an incompatible change
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"email\",\"type\":\"string\"}]}"}' \
    http://localhost:8081/subjects/User-value/versions

{"error_code":409,"message":"Schema being registered is incompatible with an earlier schema for subject \"User-value\", details: [{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'email' at path '/fields/0' in the new schema has no default value and is missing in the old schema', additionalInfo:'email'}, {oldSchemaVersion: 1}, {oldSchema: '{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}'}, {validateFields: 'false', compatibility: 'BACKWARD'}]"}

The error returned gives us a good hint about the incompatibilities detected.

Another nice feature of the schema registry is, that the compatibility level may be configured on either global or schema-specific level (see Set compatibility mode for the Schema of a Subject).

Schema Registry Operations

Using curl or any similar tool, we may run the following operations on our schema registry:

Get Subjects

curl -X GET http://localhost:8081/subjects

["User-value"]

Get versions of a Subject

curl -X GET http://localhost:8081/subjects/User-value/versions

[1]

Get latest Schema Version for a Subject

curl -X GET http://localhost:8081/subjects/User-value/versions/latest

{"subject":"User-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}"}

Get specific Schema Version for a Subject

curl -X GET http://localhost:8081/subjects/User-value/versions/1

{"subject":"User-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}"}

Get Schema by ID

curl -X GET http://localhost:8081/schemas/ids/1

{"schema":"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}"}

Test Schema compatibility

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{
  "schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"email\",\"type\":\"string\"}]}"
}' \
http://localhost:8081/compatibility/subjects/User-value/versions/latest

{"is_compatible":true}%

Get global compatibility mode

curl -X GET http://localhost:8081/config

{"compatibilityLevel":"BACKWARD"}

Set compatibility mode for the Schema of a Subject

curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "FULL"}' \
http://localhost:8081/config/User-value

{"compatibility":"FULL"}

Delete a specific schema version of a subject

curl -X DELETE http://localhost:8081/subjects/User-value/versions/1

1

Check schema existence

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{
  "schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}"
}' \
http://localhost:8081/subjects/User-value

{"subject":"User-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}"}