Why Data Contracts Matter
In distributed data systems, the interface between a producer and a consumer is an implicit agreement. A Kafka topic, a REST endpoint, a database table exposed to a dbt model — each carries assumptions about field names, types, nullability, and semantics. When those assumptions drift silently between teams, the result is broken pipelines, failed analytics, and production incidents that are hard to trace because nothing explicitly changed on either side. The producer added a field and renamed an enum value; the consumer assumed the old shape.
Data contracts make the implicit explicit. A data contract is a formal, versioned, machine-readable specification of what data a producer will supply — including schema, semantics, SLAs for freshness and completeness, and the ownership metadata that tells every consumer whom to contact when things break. The contract is agreed upon before data starts flowing, not inferred after pipelines fail. It shifts the interface from convention to commitment.
This article covers the technical layer of data contracts: how to version schemas with Avro, Protobuf, and JSON Schema, how to enforce compatibility rules in Confluent Schema Registry, how to test producer-consumer agreements in CI/CD with Pact, and practical patterns for evolving shared schemas without coordinating synchronized deployments across every downstream team.
Note
confluent-kafka library, and Confluent Schema Registry 7.x.Anatomy of a Data Contract
A complete data contract bundles four concerns that are otherwise scattered across wikis, Slack threads, and tribal knowledge:
- Schema — the structural definition: field names, data types, nullability, nesting, and any format constraints (e.g., ISO 8601 timestamps, UUID v4 identifiers).
- Semantics — what the data means: which field carries the business key, what the event type enum values represent, and which fields are personally identifiable information (PII) subject to masking.
- SLAs — freshness guarantees (e.g., events within 60 seconds of the source transaction), completeness thresholds, and the incident escalation path when SLAs are breached.
- Ownership — the team or individual responsible for maintaining the contract, the Slack channel for questions, and the approval process for breaking changes.
The Data Contract Specification (an open standard) encodes all four concerns in a single YAML file that tools like datacontract-cli can validate, test, and publish to data catalogs. Below is a minimal contract for an order events topic.
Data Contract Specification — Order Events
# datacontract.yaml — checked into the producer's repository
dataContractSpecification: 0.9.3
id: urn:datacontract:com.acme:orders:v2
info:
title: Order Events
version: 2.0.0
description: >
Domain events emitted by the Order Service on every order lifecycle
transition. Consumers must not rely on event ordering within a partition.
owner: platform-data-team
contact:
name: Platform Data Team
url: https://slack.acme.com/channels/platform-data
email: data-platform@acme.com
servers:
production:
type: kafka
host: kafka.acme.com:9092
topic: com.acme.orders.v2
format: avro
schemaUrl: https://schema-registry.acme.com/subjects/com.acme.orders.v2-value
models:
OrderEvent:
description: Emitted on every order state transition.
fields:
order_id:
type: string
format: uuid
required: true
description: Immutable UUID assigned at order creation.
event_type:
type: string
required: true
enum: [ORDER_CREATED, ORDER_CONFIRMED, ORDER_SHIPPED, ORDER_DELIVERED, ORDER_CANCELLED]
description: Lifecycle state reached by this event.
customer_id:
type: string
required: true
pii: true
description: Internal customer identifier. PII — mask before writing to cold storage.
total_amount_cents:
type: integer
required: true
minimum: 0
description: Order total in minor currency units (cents). Always non-negative.
currency_code:
type: string
required: true
pattern: "^[A-Z]{3}$"
description: ISO 4217 three-letter currency code.
created_at:
type: string
format: date-time
required: true
description: UTC timestamp of the source transaction (RFC 3339).
metadata:
type: object
required: false
description: Optional key-value bag for extensibility. Not guaranteed stable.
serviceLevel:
freshness:
description: Events delivered within 60 seconds of source transaction.
threshold: PT60S
completeness:
description: Less than 0.01% of order events may be missing on any 1-hour window.
threshold: "99.99%"Note
Schema Formats: Avro, Protobuf, and JSON Schema
The choice of schema format affects serialisation efficiency, language support, and the tooling available for compatibility enforcement. Each format has a different sweet spot.
Apache Avro
Avro is the native format of the Kafka ecosystem. It produces compact binary messages (no field names in the wire format — only schema IDs and values), which reduces Kafka storage and network cost significantly for high-throughput topics. Avro schemas are defined in JSON; the fastavro library handles Python serialisation. Confluent Schema Registry provides first-class Avro support with automatic compatibility checking on every schema registration.
# schemas/order_event_v2.avsc
# Avro schema for the OrderEvent. Note: default values are required
# for backward-compatible additions (new fields must have defaults).
{
"type": "record",
"namespace": "com.acme.orders",
"name": "OrderEvent",
"doc": "Emitted on every order lifecycle transition.",
"fields": [
{
"name": "order_id",
"type": "string",
"doc": "Immutable UUID assigned at order creation."
},
{
"name": "event_type",
"type": {
"type": "enum",
"name": "EventType",
"symbols": ["ORDER_CREATED", "ORDER_CONFIRMED", "ORDER_SHIPPED", "ORDER_DELIVERED", "ORDER_CANCELLED"]
}
},
{
"name": "customer_id",
"type": "string"
},
{
"name": "total_amount_cents",
"type": "long"
},
{
"name": "currency_code",
"type": "string"
},
{
"name": "created_at",
"type": "string",
"logicalType": "timestamp-iso"
},
{
"name": "metadata",
"type": ["null", {"type": "map", "values": "string"}],
"default": null,
"doc": "Optional extensibility bag. Not guaranteed stable across versions."
},
{
"name": "shipping_address_id",
"type": ["null", "string"],
"default": null,
"doc": "Added in v2.1 — null for events produced before this field existed."
}
]
}Protocol Buffers
Protobuf has stronger backward-compatibility guarantees than Avro by default: fields are identified by number rather than name, so renaming a field is always safe as long as the number and wire type are preserved. Protobuf is the preferred format for gRPC APIs and cross-language polyglot pipelines. The compiled stubs enforce the contract at the language level — a generated Python class will not accept an unknown field number.
// proto/order_event/v2/order_event.proto
syntax = "proto3";
package com.acme.orders.v2;
option java_package = "com.acme.orders.v2";
option java_outer_classname = "OrderEventProto";
// Field numbers are permanent identifiers — never reuse a retired field number.
// Reserve retired field numbers with the `reserved` keyword to prevent accidental reuse.
message OrderEvent {
string order_id = 1;
EventType event_type = 2;
string customer_id = 3;
int64 total_amount_cents = 4;
string currency_code = 5;
string created_at = 6;
// map fields require a key type and value type
map<string, string> metadata = 7;
// Added in v2.1: optional string with explicit presence (proto3 optional)
optional string shipping_address_id = 8;
enum EventType {
EVENT_TYPE_UNSPECIFIED = 0; // proto3 default; always define a zero value
ORDER_CREATED = 1;
ORDER_CONFIRMED = 2;
ORDER_SHIPPED = 3;
ORDER_DELIVERED = 4;
ORDER_CANCELLED = 5;
// To remove ORDER_CANCELLED in a future version:
// reserved 5; reserved "ORDER_CANCELLED";
}
}Note
EVENT_TYPE_UNSPECIFIED = 0) in proto3. Protobuf initialises missing enum fields to 0, so without an explicit zero value, deserialising an old message that omits the field will produce a valid enum member that your switch statement should handle. The Buf CLI enforces this and other proto best practices in CI with buf lint.Compatibility Modes: Backward, Forward, and Full
Confluent Schema Registry enforces schema evolution rules by checking each new schema version against the previous (or all previous) versions before allowing registration. The compatibility mode controls which evolution operations are permitted.
Compatibility Mode Reference
# Compatibility mode summary for Confluent Schema Registry
#
# BACKWARD (default)
# New schema can read data written with the PREVIOUS schema.
# Safe operations: add optional field with default, remove field.
# Unsafe: rename field, change field type, remove enum value, add required field.
# Consumer upgrades FIRST, then producer deploys new schema.
#
# BACKWARD_TRANSITIVE
# New schema can read data written with ANY PREVIOUS schema version.
# Same safe/unsafe as BACKWARD but checked against all history.
# Required when consumers might replay from the very beginning of the topic.
#
# FORWARD
# Old schema can read data written with the NEW schema.
# Safe operations: add field with default, add enum value, remove optional field.
# Producer deploys new schema FIRST, then consumers upgrade.
#
# FORWARD_TRANSITIVE
# Any old schema version can read data written with the new schema.
#
# FULL
# Both BACKWARD and FORWARD simultaneously.
# Most restrictive: only adding optional fields with defaults is always safe.
# Use when consumer upgrade order cannot be guaranteed.
#
# FULL_TRANSITIVE
# FULL checked against all previous versions. Recommended for long-lived topics.
#
# NONE
# No compatibility checking. Only for development topics — never for production.
# Set compatibility per-subject via the REST API:
# PUT /config/{subject}
# {"compatibility": "FULL_TRANSITIVE"}
# Or globally (applies to all new subjects):
# PUT /config
# {"compatibility": "BACKWARD_TRANSITIVE"}For most production Kafka topics, BACKWARD_TRANSITIVE is the right default. It enables new consumers that replay from the beginning of the topic to deserialise messages written with every historical schema version. Use FULL_TRANSITIVE for topics that are both long-lived and shared across teams with independent deployment schedules — the stronger guarantee prevents surprises when a consumer is five schema versions behind and suddenly needs to catch up.
Registering and Checking Schemas in Python
# producer/schema_registry.py
# Wrapper around the Confluent Schema Registry REST API.
# Uses the confluent-kafka[avro] package for serialisation.
import json
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
SCHEMA_REGISTRY_URL = "https://schema-registry.acme.com"
SUBJECT = "com.acme.orders.v2-value"
def load_schema(path: str) -> str:
with open(path) as f:
return json.dumps(json.load(f))
def register_schema(schema_str: str) -> int:
"""Register a new schema version. Raises SchemaRegistryError if incompatible."""
client = SchemaRegistryClient({"url": SCHEMA_REGISTRY_URL})
schema = Schema(schema_str, schema_type="AVRO")
schema_id = client.register_schema(SUBJECT, schema)
print(f"Registered schema ID: {schema_id}")
return schema_id
def check_compatibility(schema_str: str) -> bool:
"""Returns True if the schema is compatible with the subject's current version."""
client = SchemaRegistryClient({"url": SCHEMA_REGISTRY_URL})
schema = Schema(schema_str, schema_type="AVRO")
result = client.test_compatibility(SUBJECT, schema)
return result
if __name__ == "__main__":
schema_str = load_schema("schemas/order_event_v2.avsc")
if check_compatibility(schema_str):
schema_id = register_schema(schema_str)
print(f"Schema registered: {schema_id}")
else:
print("Schema is INCOMPATIBLE with the current subject version.")
raise SystemExit(1)# producer/order_producer.py
# Avro-serialised Kafka producer with Schema Registry integration.
# The AvroSerializer fetches the schema ID from the registry and
# embeds it in the magic-5-byte prefix of every message.
import uuid
from datetime import datetime, timezone
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
KAFKA_BOOTSTRAP = "kafka.acme.com:9092"
SCHEMA_REGISTRY_URL = "https://schema-registry.acme.com"
TOPIC = "com.acme.orders.v2"
SCHEMA_STR = open("schemas/order_event_v2.avsc").read()
def order_to_dict(order: dict, ctx: SerializationContext) -> dict:
return order
def main() -> None:
schema_registry_client = SchemaRegistryClient({"url": SCHEMA_REGISTRY_URL})
avro_serializer = AvroSerializer(
schema_registry_client,
SCHEMA_STR,
order_to_dict,
conf={"auto.register.schemas": False}, # never auto-register in production
)
producer = Producer({"bootstrap.servers": KAFKA_BOOTSTRAP})
order_event = {
"order_id": str(uuid.uuid4()),
"event_type": "ORDER_CREATED",
"customer_id": "cust-8821",
"total_amount_cents": 4999,
"currency_code": "USD",
"created_at": datetime.now(timezone.utc).isoformat(),
"metadata": {"channel": "web", "promo_code": "SUMMER10"},
"shipping_address_id": None,
}
producer.produce(
topic=TOPIC,
key=order_event["order_id"].encode(),
value=avro_serializer(
order_event,
SerializationContext(TOPIC, MessageField.VALUE),
),
on_delivery=lambda err, msg: print(f"Delivered: {msg.offset()}" if not err else f"Error: {err}"),
)
producer.flush()
if __name__ == "__main__":
main()Note
auto.register.schemas: False is essential in production producers. With auto-registration enabled, any producer code that accidentally sends a message with a new field or a changed type will silently register an incompatible schema, bypassing the compatibility check. Treat the Schema Registry as a deployment gate, not a runtime auto-discovery mechanism.Safe Schema Evolution Patterns
Most schema evolution problems come down to a few recurring failure modes: removing a field that consumers depend on, changing the type of an existing field, or adding a required field without a default. The following patterns give you a safe path for each class of change.
Adding a New Field (Always Safe)
Adding a field is backward-compatible in Avro as long as the field has a default value. Old consumers that do not know the field will ignore it; new consumers that read old messages will receive the default. The default must be specified in the Avro schema — it cannot be left to application code.
# Avro: adding an optional field — always backward-compatible
# Old schema (v2.0):
{"name": "metadata", "type": ["null", {"type": "map", "values": "string"}], "default": null}
# New schema (v2.1) — adds shipping_address_id:
{"name": "metadata", "type": ["null", {"type": "map", "values": "string"}], "default": null},
{"name": "shipping_address_id", "type": ["null", "string"], "default": null,
"doc": "Added in v2.1. Null for events created before this field was introduced."}
# This passes BACKWARD and FORWARD compatibility checks:
# - Old consumers reading v2.1 messages: shipping_address_id is present but unknown → ignored (FORWARD ok)
# - New consumers reading v2.0 messages: shipping_address_id absent → default null applied (BACKWARD ok)Renaming a Field (Avro Aliases)
Field renaming breaks backward compatibility in Avro unless you use the aliases keyword. An alias tells the Avro decoder to match an old field name to the new name when deserialising old data, giving consumers a migration window without a coordinated cutover.
# Renaming total_amount_cents → amount_cents using Avro aliases
# Old schema field:
{
"name": "total_amount_cents",
"type": "long"
}
# New schema field — rename with alias for backward compatibility:
{
"name": "amount_cents",
"type": "long",
"aliases": ["total_amount_cents"],
"doc": "Renamed from total_amount_cents in v2.2. Aliases enable reading old messages."
}
# The alias lets Avro resolution map old 'total_amount_cents' wire data to
# new 'amount_cents' field. Consumers using the new schema can read both
# old and new messages without modification.
# After all consumers have upgraded to the new schema, the alias can be
# removed in a subsequent release — but only once you're certain no consumer
# is still using the old schema to read messages from the topic.Breaking Changes and Topic Versioning
Some changes are genuinely incompatible: removing an enum value that consumers rely on in switch statements, changing a field type from string to long, or restructuring nested records. The only safe approach for breaking changes is a new topic version. Run producer v1 and v2 in parallel, migrate consumers incrementally, then decommission the old topic once all consumers have migrated.
# Topic versioning strategy for breaking changes
# Step 1: Create the new topic and register the new schema
kafka-topics.sh --create \
--topic com.acme.orders.v3 \
--partitions 12 \
--replication-factor 3 \
--bootstrap-server kafka.acme.com:9092
# Register the new (incompatible) schema under a new subject
# POST /subjects/com.acme.orders.v3-value/versions
# {"schema": "<v3 schema JSON>"}
# Step 2: Deploy the dual-write producer (writes to v2 AND v3 simultaneously)
# This ensures no consumer misses events during the migration window.
# producer/dual_write_producer.py (excerpt)
producer.produce(topic="com.acme.orders.v2", ...) # legacy consumers
producer.produce(topic="com.acme.orders.v3", ...) # migrated consumers
# Step 3: Migrate consumers to com.acme.orders.v3 one team at a time.
# Track migration status in the data catalog or contract registry.
# Step 4: Once all consumers are on v3, remove the dual-write and decommission v2.
# Set a tombstone retention policy on v2 to allow stragglers to drain:
kafka-configs.sh --alter \
--topic com.acme.orders.v2 \
--add-config retention.ms=604800000 \ # 7-day drain window
--bootstrap-server kafka.acme.com:9092REST API Contracts with OpenAPI and Pact
For REST-based producer-consumer pairs, OpenAPI 3.x serves as the schema layer, and Pact provides consumer-driven contract testing. Consumer-driven contracts invert the usual testing model: the consumer defines what it needs from the API, generates a Pact file describing those expectations, and the provider verifies its implementation against every consumer's expectations independently. This means producers can evolve their API safely without a manual integration test run against every consumer.
Consumer Pact Test (Python)
# consumer/tests/test_orders_pact.py
# Consumer-driven contract test using the pact-python library.
# This test defines what the OrderService consumer expects from the Orders API.
# The resulting pact.json is published to the Pact Broker and verified by the provider.
import pytest
from pact import Consumer, Provider
PACT_DIR = "pacts"
PACT_MOCK_HOST = "localhost"
PACT_MOCK_PORT = 1234
@pytest.fixture(scope="session")
def pact():
pact = Consumer("OrderDashboard").has_pact_with(
Provider("OrderService"),
host_name=PACT_MOCK_HOST,
port=PACT_MOCK_PORT,
pact_dir=PACT_DIR,
)
pact.start_service()
yield pact
pact.stop_service()
def test_get_order_returns_expected_shape(pact):
expected_body = {
"order_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"event_type": "ORDER_CREATED",
"total_amount_cents": 4999,
"currency_code": "USD",
"created_at": "2026-05-26T10:00:00Z",
}
(
pact.given("Order 3fa85f64 exists")
.upon_receiving("a request for order 3fa85f64")
.with_request("GET", "/orders/3fa85f64-5717-4562-b3fc-2c963f66afa6")
.will_respond_with(
200,
headers={"Content-Type": "application/json"},
body=expected_body,
)
)
with pact:
import httpx
response = httpx.get(
f"http://{PACT_MOCK_HOST}:{PACT_MOCK_PORT}/orders/3fa85f64-5717-4562-b3fc-2c963f66afa6"
)
assert response.status_code == 200
data = response.json()
assert data["order_id"] == "3fa85f64-5717-4562-b3fc-2c963f66afa6"
assert data["total_amount_cents"] == 4999Provider Pact Verification
# provider/tests/test_pact_verification.py
# Provider-side Pact verification. Fetches all consumer pacts from the Pact Broker
# and verifies that the provider implementation satisfies every one.
import pytest
from pact import Verifier
def test_provider_honours_pact_with_order_dashboard():
verifier = Verifier(
provider="OrderService",
provider_base_url="http://localhost:8000",
)
output, _ = verifier.verify_with_broker(
broker_url="https://pact-broker.acme.com",
broker_token="${PACT_BROKER_TOKEN}",
publish_verification_results=True,
provider_version="2.1.0",
provider_version_tags=["main"],
enable_pending=True, # don't fail on pending pacts from new consumers
)
assert output == 0, "Pact verification failed — see output for consumer failures"
# Provider state handlers tell the verifier how to set up test fixtures
# for each "given(...)" clause in the consumer pacts.
# These are registered as a Flask/FastAPI endpoint on the provider's test server.
# @app.post("/_pact/provider_states")
# async def provider_states(body: dict):
# state = body.get("state")
# if state == "Order 3fa85f64 exists":
# await db.insert_test_order(order_id="3fa85f64-5717-4562-b3fc-2c963f66afa6")
# return {"result": "ok"}Note
can-i-deployCLI command that CI pipelines use to block a release if any consumer's contract is not yet verified by the target provider version.CI/CD Integration for Contract Enforcement
Data contracts are only enforceable if they are validated in the deployment pipeline, not just documented in a wiki. The following GitHub Actions workflow checks schema compatibility on every pull request that modifies an Avro schema, blocks the producer from deploying an incompatible change, and publishes the new schema to the registry only after all checks pass.
# .github/workflows/schema-contract.yml
name: Schema Contract CI
on:
pull_request:
paths:
- "schemas/**/*.avsc"
- "proto/**/*.proto"
- "datacontract.yaml"
jobs:
avro-compatibility:
name: Avro Schema Compatibility Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: pip install confluent-kafka[avro] fastavro
- name: Check Avro compatibility against Schema Registry
env:
SCHEMA_REGISTRY_URL: ${{ secrets.SCHEMA_REGISTRY_URL }}
SCHEMA_REGISTRY_KEY: ${{ secrets.SCHEMA_REGISTRY_KEY }}
SCHEMA_REGISTRY_SECRET: ${{ secrets.SCHEMA_REGISTRY_SECRET }}
run: python scripts/check_compatibility.py schemas/order_event_v2.avsc
buf-lint-proto:
name: Buf Proto Lint and Breaking Change Detection
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Buf CLI
uses: bufbuild/buf-setup-action@v1
- name: Buf lint
run: buf lint
- name: Buf breaking change detection
run: |
# Checks against the last released version (git tag or BSR module)
buf breaking --against '.git#branch=main'
datacontract-validate:
name: Data Contract Specification Validation
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install datacontract-cli
run: pip install datacontract-cli
- name: Validate contract schema and metadata
run: datacontract lint datacontract.yaml
- name: Run contract quality checks
run: datacontract test datacontract.yaml
env:
KAFKA_BOOTSTRAP: ${{ secrets.KAFKA_BOOTSTRAP }}
SCHEMA_REGISTRY_URL: ${{ secrets.SCHEMA_REGISTRY_URL }}
pact-can-i-deploy:
name: Pact Can-I-Deploy Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Pact CLI
run: |
curl -LO https://github.com/pact-foundation/pact-ruby-standalone/releases/latest/download/pact-1.x.x-linux-x86_64.tar.gz
tar xzf pact-*.tar.gz
echo "$(pwd)/pact/bin" >> $GITHUB_PATH
- name: Can I deploy OrderService v2.1.0?
run: |
pact-broker can-i-deploy \
--pacticipant OrderService \
--version ${{ github.sha }} \
--to-environment production \
--broker-base-url ${{ secrets.PACT_BROKER_URL }} \
--broker-token ${{ secrets.PACT_BROKER_TOKEN }}Schema Registry in Production
Running Confluent Schema Registry in production requires attention to high availability, access control, and multi-tenant subject naming.
Subject Naming Strategies
Schema Registry subjects are keyed by name. The naming strategy determines how a topic name maps to a subject name. The TopicNameStrategy (default) maps my-topic to subjects my-topic-key and my-topic-value. This is fine for single-schema topics. The RecordNameStrategy maps the fully-qualified Avro record name (e.g., com.acme.orders.OrderEvent) to the subject, enabling multiple schemas on the same topic or reuse of the same schema across topics.
# docker-compose.yml excerpt — Schema Registry with RBAC and HTTPS
version: "3.9"
services:
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SASL_SSL
SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: PLAIN
SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG: >
org.apache.kafka.common.security.plain.PlainLoginModule required
username="${SR_KAFKA_USER}"
password="${SR_KAFKA_PASS}";
# TLS for client connections
SCHEMA_REGISTRY_LISTENERS: https://0.0.0.0:8443
SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: /etc/ssl/schema-registry.jks
SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: ${SR_KEYSTORE_PASS}
SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: /etc/ssl/truststore.jks
SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: ${SR_TRUSTSTORE_PASS}
# Authentication
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: admin,write,read
SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistry
# Global default compatibility — override per-subject as needed
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: BACKWARD_TRANSITIVE
volumes:
- ./ssl:/etc/ssl:ro
- ./etc/schema-registry/password.properties:/etc/schema-registry/password.properties:ro
ports:
- "8443:8443"
healthcheck:
test: ["CMD", "curl", "-f", "-k", "https://localhost:8443/subjects"]
interval: 10s
timeout: 5s
retries: 5Managing Compatibility Per-Subject
#!/bin/bash
# scripts/manage_subject_compatibility.sh
# Override the global compatibility mode for specific subjects.
# Requires: curl, jq, and SR credentials in the environment.
SR_URL="${SCHEMA_REGISTRY_URL}"
SR_AUTH="${SCHEMA_REGISTRY_USER}:${SCHEMA_REGISTRY_PASS}"
set_compatibility() {
local subject="$1"
local mode="$2"
curl -s -X PUT \
--user "$SR_AUTH" \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{"compatibility": "$mode"}" \
"$SR_URL/config/$subject"
}
list_subjects_by_compatibility() {
local mode="$1"
curl -s --user "$SR_AUTH" "$SR_URL/subjects" | jq -r '.[]' | while read subject; do
config=$(curl -s --user "$SR_AUTH" "$SR_URL/config/$subject" | jq -r '.compatibilityLevel // "GLOBAL"')
if [ "$config" = "$mode" ]; then
echo "$subject: $config"
fi
done
}
# Production topics: FULL_TRANSITIVE (safest)
set_compatibility "com.acme.orders.v2-value" "FULL_TRANSITIVE"
set_compatibility "com.acme.payments.v1-value" "FULL_TRANSITIVE"
# Internal dev topics: NONE (fastest iteration)
set_compatibility "dev.sandbox.events-value" "NONE"
echo "--- Subjects with FULL_TRANSITIVE ---"
list_subjects_by_compatibility "FULL_TRANSITIVE"Decision Framework: Choosing Your Contract Stack
The right tool depends on your transport, team structure, and how much compatibility enforcement you need.
# Decision framework — data contract tooling selection
# STREAMING (Kafka / Redpanda / Pulsar)
# → Avro + Schema Registry → standard choice; compact wire format; best Kafka ecosystem support
# → Protobuf + Schema Registry → prefer for polyglot teams or gRPC reuse; stronger rename safety
# → JSON Schema + SR → easy migration from JSON; larger wire size; good for low-throughput topics
# REST / HTTP APIs
# → OpenAPI 3.x (spec) → always for documentation and SDK generation
# → Pact (consumer-driven) → add for testing when consumers have independent deployments
# → schemathesis → property-based API testing against OpenAPI spec; good complement to Pact
# BATCH / WAREHOUSE
# → Data Contract Spec (YAML) → works with dbt, Great Expectations, Soda Core
# → dbt contracts (v1.5+) → native dbt column-level contracts with enforced types
# → Delta/Iceberg schema evolution → table-level schema versioning at the lakehouse layer
# POLYGLOT (mixed transport)
# → Apicurio Registry → open-source alternative to Confluent SR; supports Avro, Protobuf, JSON Schema
# → Buf Schema Registry (BSR) → managed Protobuf registry with breaking change enforcement
# → AsyncAPI → OpenAPI equivalent for event-driven / messaging APIs
# Compatibility mode selection:
# Consumer controls deployment order → BACKWARD_TRANSITIVE
# Producer controls deployment order → FORWARD_TRANSITIVE
# Independent deployments → FULL_TRANSITIVE
# Long-lived topics (months+) → FULL_TRANSITIVE (non-negotiable)
# Breaking change required → new topic version + dual-write migrationNote
contract:block in a model's YAML file that enforces column names and data types at materialisation time. This gives warehouse-layer data producers the same safety guarantee that Avro/Protobuf gives streaming producers — a breaking change (e.g., renaming a column) fails the dbt run before any downstream model is affected.Work with us
Building data pipelines or APIs and struggling with schema drift, broken consumers, or uncoordinated deployments?
We design and implement data contract infrastructure — from Avro and Protobuf schema registries with BACKWARD_TRANSITIVE and FULL_TRANSITIVE compatibility enforcement to Pact consumer-driven contract testing, CI/CD can-i-deploy gates, dbt model contracts, and Data Contract Specification pipelines that keep producers and consumers in sync. Let’s talk.
Get in touch