Why Polling Falls Short as a Data Sync Strategy
The most common pattern for syncing data between systems is polling: run a query every N minutes that selects rows where updated_at > last_run_time and process the diff. It works until it doesn't. Polling misses deletes entirely — a deleted row has no updated_atto poll on. It's vulnerable to clock skew between the application and the database. It double-processes events when the job fails mid-run and restarts from the last checkpoint. And it introduces latency proportional to the polling interval, which often grows as teams add more tables to the query.
Change Data Capture (CDC) solves these problems by reading from the database's own internal replication stream rather than querying data tables. PostgreSQL exposes changes through its Write-Ahead Log (WAL). MySQL uses its binary log (binlog). These logs capture every INSERT, UPDATE, and DELETE in the order it happened, with sub-second latency, before the change is even visible to other clients. CDC reads from this stream and publishes events downstream — to Kafka, message queues, data warehouses — with exactly-once semantics anchored to the WAL offset.
Debezium is the de-facto open-source CDC platform. It runs as a set of Kafka Connect source connectors that capture changes from PostgreSQL, MySQL, MongoDB, SQL Server, Oracle, and others, publishing structured events to Kafka topics. This guide covers the full pipeline: PostgreSQL WAL configuration, Debezium connector setup, event envelope anatomy, Single Message Transforms, sink connectors for Elasticsearch and S3, delete handling, distributed Connect workers, and production monitoring.
Debezium Architecture: WAL to Kafka
Debezium connects to PostgreSQL as a logical replication client. PostgreSQL's logical decoding feature decodes WAL entries into a stream of row-level change records using an output plugin. Debezium uses pgoutput, which is built into PostgreSQL 10+ and requires no extension installation.
A replication slot maintains the consumer's position in the WAL. PostgreSQL will not remove WAL segments until all active replication slots have confirmed consuming past them. This is the most operationally critical aspect of CDC on PostgreSQL: a stalled or inactive Debezium connector will cause PostgreSQL to retain WAL segments indefinitely, eventually filling the disk and causing a database outage. Replication slot lag monitoring is a production prerequisite, not an optional dashboard.
Debezium publishes one Kafka topic per captured table, following the naming convention {topic.prefix}.{schema}.{table}. Each message key contains the row's primary key; the value contains the full change event with before and after states. The topic-per-table design lets downstream consumers independently subscribe to only the tables they need.
Configuring PostgreSQL for Logical Replication
Logical replication requires three configuration changes in postgresql.conf, a dedicated replication role with minimal privileges, and a publication that defines which tables to capture.
# postgresql.conf — enable logical replication
wal_level = logical # Required; default is 'replica' on most setups
max_wal_senders = 10 # Maximum concurrent replication connections
max_replication_slots = 10 # One slot per Debezium connector instance
wal_keep_size = 1024 # Retain 1GB of WAL as a safety buffer (MB)Note
wal_level requires a PostgreSQL restart. Plan for a brief maintenance window when enabling CDC on an existing cluster. On managed services (RDS, Cloud SQL, AlloyDB), set the equivalent parameter group flags — these services handle the restart automatically but may require a blue/green failover to avoid downtime.Create a dedicated replication role with the minimum privileges Debezium needs. It requires REPLICATION login, SELECT on captured tables for the initial snapshot, and access to replication-related system views and functions.
-- Create Debezium replication role
CREATE ROLE debezium WITH REPLICATION LOGIN PASSWORD 'use-vault-or-secrets-manager';
-- Grant SELECT on existing tables in each captured schema
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA orders TO debezium;
-- Auto-grant SELECT on tables created in the future
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO debezium;
-- Create a publication: defines exactly which tables Debezium captures
-- FOR ALL TABLES is convenient for development; use explicit lists in production
CREATE PUBLICATION debezium_pub
FOR TABLE public.orders, public.customers, public.products, public.inventory;
-- To add a table to an existing publication later:
-- ALTER PUBLICATION debezium_pub ADD TABLE public.new_table;Use explicit table lists in publications for production. Adding all tables means newly created tables are automatically captured before their schemas are registered in Schema Registry, which can cause deserialization failures downstream. Adding tables to the publication explicitly forces a deliberate, coordinated schema registration step.
Deploying the Debezium PostgreSQL Connector
Connectors are deployed via the Kafka Connect REST API. A POST to /connectors with a JSON configuration creates and starts the connector. Debezium creates a replication slot on the PostgreSQL side and begins reading from the WAL. On first start with snapshot.mode=initial, it performs a consistent snapshot of all existing rows before streaming new changes.
# POST http://kafka-connect:8083/connectors
# Content-Type: application/json
{
"name": "postgres-orders-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres.internal",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${file:/opt/kafka/secrets/db.properties:password}",
"database.dbname": "orders_db",
"topic.prefix": "cdc",
"plugin.name": "pgoutput",
"publication.name": "debezium_pub",
"slot.name": "debezium_orders_slot",
"table.include.list": "public.orders,public.customers,public.products",
"snapshot.mode": "initial",
"snapshot.isolation.mode": "repeatable_read",
"heartbeat.interval.ms": "10000",
"heartbeat.action.query":
"INSERT INTO public.debezium_heartbeat(ts) VALUES(NOW()) ON CONFLICT(id) DO UPDATE SET ts = NOW()",
"decimal.handling.mode": "string",
"binary.handling.mode": "base64",
"time.precision.mode": "connect",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "cdc.dlq.postgres-orders",
"errors.deadletterqueue.context.headers.enable": "true"
}
}Note
tasks.max=1 for all Debezium source connectors. The WAL is a sequential stream and cannot be parallelized across multiple tasks. Throughput scaling happens on the consumer side — downstream Kafka Streams jobs, Flink, or sink connectors can all run with higher task counts.The heartbeat.interval.ms and heartbeat.action.query settings address a subtle operational issue. On low-traffic databases, WAL position advances only when rows in captured tables change. If only non-captured tables change (e.g., internal audit tables), Debezium can fall behind in LSN without any events to publish and nothing to confirm. The heartbeat query forces a WAL write at regular intervals so the connector confirms its LSN and releases old WAL segments from the replication slot, preventing unbounded disk growth.
The Debezium Event Envelope
Every change event published by Debezium follows the same envelope structure regardless of source database. Understanding it is essential for writing consumers, configuring sink connectors, and debugging pipeline issues.
// Example: INSERT on public.orders → topic: cdc.public.orders
// op field: "c" = create/insert, "u" = update, "d" = delete, "r" = read/snapshot
{
"payload": {
"before": null, // null on INSERT; populated on UPDATE and DELETE
"after": {
"id": 10042,
"customer_id": 789,
"status": "pending",
"total_amount": "149.99", // string due to decimal.handling.mode=string
"created_at": 1714569600000000, // microseconds since epoch
"updated_at": 1714569600000000
},
"source": {
"version": "2.7.1.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1714569601234, // wall-clock time on the PG server (ms)
"db": "orders_db",
"schema": "public",
"table": "orders",
"txId": 4892, // PostgreSQL transaction ID
"lsn": 87234512, // WAL log sequence number
"xmin": null
},
"op": "c",
"ts_ms": 1714569601500 // wall-clock time on the Debezium connector (ms)
}
}For UPDATE events, both before and after are populated only when REPLICA IDENTITY is set to FULL on the table. The default REPLICA IDENTITY includes only the primary key in the before payload. For most CDC use cases — change detection, audit logs, slowly changing dimensions — FULL replica identity is required.
-- Enable full replica identity to get complete before/after diffs on UPDATE and DELETE
ALTER TABLE public.orders REPLICA IDENTITY FULL;
ALTER TABLE public.customers REPLICA IDENTITY FULL;
ALTER TABLE public.inventory REPLICA IDENTITY FULL;
-- Verify replica identity setting
SELECT relname, relreplident
FROM pg_class
WHERE relname IN ('orders', 'customers', 'inventory');
-- relreplident values: 'd' = default (PK only), 'f' = full, 'i' = index, 'n' = nothingSingle Message Transforms: Flattening and Enriching Events
Kafka Connect's Single Message Transforms (SMTs) are lightweight, in-process transformations applied to each event before it reaches the sink. The most important SMT for Debezium pipelines is ExtractNewRecordState, which flattens the nested Debezium envelope into a plain record containing only the after fields — the flat format that most sink connectors expect.
// Connector config: chain SMTs with comma-separated names
"transforms": "unwrap,maskPII,addTimestamp",
// 1. Flatten the Debezium envelope — emit only the after-state fields
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,source.ts_ms:event_ts",
"transforms.unwrap.delete.handling.mode": "rewrite", // deletes become records with __deleted=true
"transforms.unwrap.drop.tombstones": "false", // keep tombstones for Kafka log compaction
// 2. Mask PII before it reaches the sink
"transforms.maskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskPII.fields": "email,phone_number,tax_id",
"transforms.maskPII.replacement": "REDACTED",
// 3. Stamp the processing time so consumers know when the event was written
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "etl_processed_at"Note
Sink Connectors: Writing CDC Events to Target Systems
Elasticsearch Sink Connector
The Confluent Elasticsearch Sink Connector writes Kafka events as Elasticsearch documents using the message key as the document ID. With write.method=upsert it uses the _update API with doc_as_upsert=true, making it safe to rerun even when documents already exist. Setting behavior.on.null.values=delete causes tombstone records to delete the corresponding ES document.
{
"name": "elasticsearch-orders-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "3",
"topics": "cdc.public.orders",
"connection.url": "https://elasticsearch:9200",
"connection.username": "elastic",
"connection.password": "${file:/opt/kafka/secrets/es.properties:password}",
"key.ignore": "false",
"schema.ignore": "true",
"write.method": "upsert",
"behavior.on.null.values": "delete",
"transforms": "unwrap,extractKey",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "none",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "id",
"batch.size": "2000",
"max.in.flight.requests": "5",
"linger.ms": "50",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.deadletterqueue.topic.name": "cdc.dlq.es-orders"
}
}S3 Sink Connector
The S3 Sink Connector writes Kafka events to S3 in batches, partitioned by time or field value. Parquet output with Snappy compression gives the best query performance when downstream tools are Athena, Spark, Trino, or dbt on S3.
{
"name": "s3-orders-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "4",
"topics": "cdc.public.orders",
"s3.region": "us-east-1",
"s3.bucket.name": "data-lake-cdc",
"s3.part.size": "5242880",
"flush.size": "10000",
"rotate.interval.ms": "600000", // flush every 10 min if flush.size not reached
"rotate.schedule.interval.ms": "3600000", // force-flush hourly regardless
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "en_US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "created_at",
"schema.compatibility": "FULL",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,source.ts_ms:source_ts",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}Handling Deletes and Tombstone Records
When a row is deleted in PostgreSQL, Debezium publishes two consecutive records to the Kafka topic. First, a delete event with op="d" containing the row's last-known state in the before field and a null after. Then a tombstone record — a message with a non-null key and a null value — which signals Kafka's log compaction to remove all previous records with that key from the compacted topic.
How downstream systems handle deletes depends on the sink. The Elasticsearch connector with behavior.on.null.values=delete deletes the document when it receives a tombstone. S3 sinks cannot delete files already written — model deletes as __deleted=true markers using delete.handling.mode=rewrite and handle them in downstream dbt or Spark jobs using slowly changing dimension patterns.
// 1. Delete event published FIRST — before state captures the full row
{
"payload": {
"before": { "id": 10042, "customer_id": 789, "status": "cancelled", ... },
"after": null,
"op": "d",
"source": { "lsn": 87299001, "ts_ms": 1714580000000 }
}
}
// 2. Tombstone published SECOND — null value signals log compaction to remove this key
// Key: { "id": 10042 }
// Value: null
// With delete.handling.mode=rewrite in ExtractNewRecordState,
// the delete event becomes a flat record with a __deleted marker:
{
"id": 10042,
"customer_id": 789,
"status": "cancelled",
"__deleted": "true",
"__op": "d",
"event_ts": 1714580000000
}Note
Running Kafka Connect in Distributed Mode
Distributed mode runs multiple Kafka Connect worker processes that coordinate task assignment via Kafka topics. If one worker fails, its tasks are rebalanced to the remaining workers. All connectors are managed through the REST API on any worker in the cluster — any worker proxies requests to the elected leader.
# worker.properties — identical on every node in the connect cluster
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
group.id=connect-cluster-prod
# Internal coordination topics — must exist before starting workers
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25 # scale with number of connectors
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5
# Default converters (overridden per-connector via REST API)
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
plugin.path=/usr/share/java,/opt/kafka/plugins
rest.advertised.host.name=${HOSTNAME}
rest.port=8083For production, run at least 3 Kafka Connect workers behind a load balancer. Use the official Debezium Connect Docker image as your base — it bundles the PostgreSQL, MySQL, MongoDB, and SQL Server connectors with tested dependency versions. Add Confluent sink connectors by copying the JARs into the plugin.path directories, then deploy as a Kubernetes Deployment or StatefulSet depending on whether you need stable pod identities.
Monitoring: Replication Lag, Consumer Lag, and Connector Health
CDC pipelines have two distinct lag metrics that are often confused. Replication slot lag measures how far behind Debezium is from the current PostgreSQL WAL position — this is the critical safety metric that indicates disk pressure risk. Kafka consumer lag measures how far behind sink connectors are from the events Debezium published — this determines end-to-end pipeline latency.
-- Monitor replication slot lag on PostgreSQL directly
-- Export via pg_exporter for Prometheus or query via cron
SELECT
slot_name,
active,
active_pid,
pg_size_pretty(
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)
) AS consumer_lag,
pg_size_pretty(
pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)
) AS slot_retained_wal,
extract(epoch from (now() - pg_last_xact_replay_timestamp())) AS lag_seconds
FROM pg_replication_slots
WHERE slot_type = 'logical';# Check Kafka consumer lag for sink connector consumer groups
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group connect-elasticsearch-orders-sink
# Query connector and task status via the REST API
curl -s http://kafka-connect:8083/connectors/postgres-orders-connector/status | jq .
# Healthy response: connector.state = "RUNNING", tasks[*].state = "RUNNING"
# Restart a single failed task without touching the rest of the connector
curl -X POST http://kafka-connect:8083/connectors/postgres-orders-connector/tasks/0/restart
# Pause a connector (stop processing without losing the committed offset)
curl -X PUT http://kafka-connect:8083/connectors/postgres-orders-connector/pause
# Resume from where it left off
curl -X PUT http://kafka-connect:8083/connectors/postgres-orders-connector/resumeExport Kafka Connect JMX metrics to Prometheus using the JMX Exporter. The key metrics to alert on: kafka_connect_source_task_metrics_source_record_poll_rate (should track your insert rate and drop to 0 if the connector stalls), kafka_connect_connector_task_metrics_offset_commit_completion_rate (should be > 0), and kafka_consumer_fetch_manager_metrics_records_lag_max per sink consumer group. Alert when replication slot lag exceeds 2GB or when any connector task enters FAILED state.
Production Readiness Checklist
Before promoting a CDC pipeline to production, verify these practices are in place. They prevent the most common failure modes: WAL retention filling the disk, bad records blocking the entire pipeline, silent schema evolution failures, and delete events landing incorrectly in downstream sinks.
Replication slot lag is monitored with a hard alert
A stalled or inactive Debezium connector causes PostgreSQL to retain WAL segments indefinitely. Alert at slot_retained_wal > 2GB and page at active = false AND lag_seconds > 300. If a slot is inactive for more than an hour on a busy database, it may be faster to drop the slot, perform a fresh snapshot with snapshot.mode=initial, and replay from the beginning rather than waiting for the connector to catch up.
Dead letter queue is configured on every connector
Set errors.tolerance=all and errors.deadletterqueue.topic.nameon every source and sink connector. Without a DLQ, a single malformed record causes the connector task to fail, blocking all subsequent records until manual intervention. With a DLQ, bad records are routed to a separate topic for inspection while the pipeline continues processing. Alert on DLQ topic lag > 0.
REPLICA IDENTITY FULL is set on all captured tables
Without FULL replica identity, UPDATE events contain only the primary key in the before state. This breaks change-detection logic that compares before/after column values, prevents correct delete handling in sinks that need the full row state, and makes it impossible to build proper SCD Type 2 history from the event stream. Run ALTER TABLE t REPLICA IDENTITY FULL during off-peak hours; it acquires only a brief ACCESS SHARE lock.
Initial snapshot strategy is defined per table
snapshot.mode=initial reads all existing rows before streaming new changes. On tables with tens of millions of rows, this can take hours and significantly increase database load. For large tables, use snapshot.mode=never and backfill the sink separately via a bulk export (pg_dump, COPY, or a Spark job) before enabling CDC streaming. The bulk load and CDC stream handoff should be coordinated at a known LSN.
Schema Registry is enforcing compatibility mode
Set the Schema Registry subject compatibility to BACKWARD or FULL_TRANSITIVE. Without it, a column drop or type change in PostgreSQL can silently publish events that break downstream consumers still deserializing with the old schema. Debezium automatically registers Avro schemas for each topic; Registry compatibility enforcement means that breaking schema changes are rejected before events are published.
Connector configurations are managed as code
Store connector JSON in version control and apply it via CI/CD using tools like kafka-connect-tools or the Terraform Kafka Connect provider. Manual curl-based connector management leads to configuration drift between environments, which is one of the most common root causes of CDC pipeline bugs that are difficult to reproduce locally.
Building a CDC pipeline or migrating from polling to event-driven data sync?
We design and implement production-grade CDC architectures — from Debezium connector configuration and Kafka Connect cluster setup to sink connector design, schema evolution, and replication slot monitoring. Let’s talk.
Get in Touch