Back to Blog
Delta LakeApache IcebergApache HudiLakehouseData EngineeringSparkOpen Table FormatsData Lakes

Delta Lake vs Iceberg vs Hudi — Choosing the Right Open Table Format for Your Lakehouse

A comprehensive comparison of Delta Lake, Apache Iceberg, and Apache Hudi: ACID transaction models (optimistic concurrency control, OCC), time travel implementations (version/snapshot/instant), schema evolution capabilities (add, rename, type widening, drop columns), partition evolution with Iceberg’s hidden partitioning transforms (days, hours, bucket, truncate), record-level index in Hudi (bloom filter, HBase) vs file-level filtering in Delta/Iceberg, copy-on-write vs merge-on-read storage types for upsert efficiency, multi-engine query support (Spark, Trino, Amazon Athena, DuckDB, Snowflake), Python APIs (delta-rs, PyIceberg, HoodieStreamer), table maintenance procedures (OPTIMIZE/rewrite_data_files, VACUUM/expire_snapshots, delete_orphan_files), Spark Structured Streaming to Delta, Flink SQL to Iceberg, Kafka to Hudi via DeltaStreamer, Databricks UniForm for cross-format compatibility, and a decision framework for choosing the right open table format for your lakehouse architecture.

2026-06-03

The Raw Parquet Problem — Why Data Lakes Need a Table Layer

The original data lake promise was simple: dump all your data into object storage as Parquet files and query it with Spark, Presto, or Athena. Parquet solved the storage problem brilliantly — columnar layout, efficient compression, predicate pushdown at the file level. But raw Parquet is not a table. It is a collection of files with no shared state, no atomicity guarantee, and no way to reason about consistency across concurrent writers.

Five concrete failures make this real in production. First, no ACID transactions: a Spark job that writes 200 files over 20 minutes leaves readers seeing a partially written dataset if the job fails at file 150. Second, no schema enforcement: nothing stops a producer from adding a column or changing a type, breaking downstream consumers silently. Third, no efficient upserts: updating one record requires rewriting the entire partition file. Fourth, no time travel: once files are overwritten, the previous state is gone. Fifth, slow listing: S3 prefix scans on tables with millions of files can take minutes before a single byte of data is read.

Open table formats solve all five problems by adding a metadata layer — a transaction log or manifest structure — on top of Parquet files. The three dominant formats today are Delta Lake, Apache Iceberg, and Apache Hudi. They share the same core goals but make different architectural trade-offs that matter significantly in production.

Delta Lake Architecture — The Transaction Log and Write-Ahead Journal

Delta Lake, originally developed at Databricks and donated to the Linux Foundation in 2019, stores its state in a _delta_log/ directory alongside the data files. Each committed transaction produces a numbered JSON file (000000000000000000000.json, 000000000000000000001.json, …) containing actions: add (new file path + stats), remove (soft-delete of an old file), metaData(schema change), and protocol (reader/writer version requirements). Every 10 commits, Delta automatically compacts the log into a Parquet checkpoint file for fast recovery.

This design gives Delta Lake optimistic concurrency control (OCC): each writer reads the current version, computes its changes, then attempts to write the next log file. If another writer committed first, the transaction is retried. Readers always see a consistent snapshot by reading the log up to the latest checkpoint and replaying subsequent JSON entries — they never see partial writes.

Delta Lake's key strength is its tight integration with Databricks and Spark. The Spark Delta connector is the reference implementation and supports DML operations (MERGE INTO, UPDATE, DELETE), Change Data Feed (CDC), liquid clustering, and Z-ordering natively. Outside of Databricks, delta-rs provides a pure Rust/Python implementation without a JVM dependency.

# Install delta-rs (pure Python/Rust, no Spark required)
pip install deltalake

from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
import pyarrow.dataset as ds

# ── Create a Delta table from an Arrow table ──
data = pa.table({
    "order_id":  pa.array([1, 2, 3], type=pa.int64()),
    "customer":  pa.array(["alice", "bob", "carol"]),
    "amount":    pa.array([100.0, 250.0, 75.0]),
    "status":    pa.array(["pending", "shipped", "delivered"]),
})

write_deltalake(
    "s3://my-lake/orders",
    data,
    mode="overwrite",           # "overwrite" | "append" | "error" | "ignore"
    partition_by=["status"],    # Hive-style partitioning
    storage_options={
        "AWS_REGION": "eu-west-1",
        "AWS_S3_ALLOW_UNSAFE_RENAME": "true",  # needed for S3 (no atomic rename)
    },
)

# ── Read a Delta table as Arrow dataset ──
dt = DeltaTable(
    "s3://my-lake/orders",
    storage_options={"AWS_REGION": "eu-west-1"},
)

# Read as PyArrow table (predicate pushdown to file level)
arrow_tbl = dt.to_pyarrow_table(
    filters=[("status", "=", "pending"), ("amount", ">", 50.0)]
)

# Read as Polars LazyFrame (zero-copy via Arrow)
import polars as pl
lf = pl.scan_delta("s3://my-lake/orders")

# ── Time travel: read a previous version ──
dt_v0 = DeltaTable("s3://my-lake/orders", version=0)
dt_ts  = DeltaTable("s3://my-lake/orders", version=dt.version_as_of("2026-01-01T00:00:00"))

# ── Upsert (MERGE) with delta-rs ──
updates = pa.table({
    "order_id": pa.array([2, 4], type=pa.int64()),
    "customer": pa.array(["bob", "dave"]),
    "amount":   pa.array([300.0, 90.0]),
    "status":   pa.array(["delivered", "pending"]),
})

dt.merge(
    source=updates,
    predicate="target.order_id = source.order_id",
    source_alias="source",
    target_alias="target",
).when_matched_update_all().when_not_matched_insert_all().execute()

# ── Show transaction log history ──
print(dt.history())

Note

Delta Lake on S3 requires AWS_S3_ALLOW_UNSAFE_RENAME=true because S3 does not support atomic rename operations. For true multi-writer safety on S3, use DynamoDB-based locking (Databricks native) or the delta-rs S3 DynamoDB lock client. ADLS Gen2 and GCS support atomic rename natively and do not require the unsafe flag.

Apache Iceberg Architecture — Manifest Files, Snapshot Isolation, and Hidden Partitioning

Apache Iceberg, originally developed at Netflix and now a top-level Apache project, takes a different architectural approach. Instead of a sequential transaction log, Iceberg uses a three-tier metadata hierarchy: a catalog (which maps table names to the current metadata.json pointer), metadata JSON files (containing schema, partition spec, and a list of snapshots), manifest list files (listing all manifest files for a snapshot with partition-level statistics), and manifest files (listing actual data files with column-level statistics for predicate pushdown).

This design has three critical advantages over Delta Lake's log approach. First, manifest-level filtering: a query planner can skip entire manifest files before opening a single data file, making planning on billion-file tables tractable. Second, hidden partitioning: the partition spec is stored in metadata, so queries never need to include partition predicates in the WHERE clause — the engine applies partition pruning automatically from the data column values. Third, partition evolution: the partition spec can be changed (e.g., from days(event_ts) to hours(event_ts)) without rewriting old data — old files retain their original spec, new files use the updated one, and the engine handles both transparently.

Iceberg's multi-engine support is also broader than Delta's. Native connectors exist for Spark, Trino, Flink, Amazon Athena, Snowflake (via Iceberg tables), DuckDB, Impala, and Hive. No single vendor controls the spec.

# Install PyIceberg (pure Python Iceberg client)
pip install "pyiceberg[s3fs,glue,sql-sqlite]"

from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
    NestedField, LongType, StringType, DoubleType, TimestampType,
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform, IdentityTransform
import pyarrow as pa

# ── Connect to catalog (AWS Glue) ──
catalog = load_catalog(
    "glue",
    **{
        "type": "glue",
        "warehouse": "s3://my-lake/warehouse",
    },
)

# ── Create an Iceberg table with schema + partition spec ──
schema = Schema(
    NestedField(1, "order_id",    LongType(),      required=True),
    NestedField(2, "customer_id", LongType(),      required=True),
    NestedField(3, "amount",      DoubleType(),    required=False),
    NestedField(4, "status",      StringType(),    required=False),
    NestedField(5, "event_ts",    TimestampType(), required=True),
)

partition_spec = PartitionSpec(
    PartitionField(
        source_id=5,                # field_id for event_ts
        field_id=1000,
        transform=DayTransform(),   # hidden partitioning: days(event_ts)
        name="event_date",
    )
)

table = catalog.create_table(
    "analytics.orders",
    schema=schema,
    partition_spec=partition_spec,
    properties={
        "write.parquet.compression-codec": "zstd",
        "write.metadata.delete-after-commit.enabled": "true",
        "write.metadata.previous-versions-max": "10",
    },
)

# ── Append data (PyArrow table) ──
arrow_data = pa.table({
    "order_id":    pa.array([1, 2, 3], type=pa.int64()),
    "customer_id": pa.array([10, 11, 12], type=pa.int64()),
    "amount":      pa.array([100.0, 250.0, 75.0]),
    "status":      pa.array(["pending", "shipped", "delivered"]),
    "event_ts":    pa.array(
        ["2026-06-01T10:00:00", "2026-06-01T11:00:00", "2026-06-02T09:00:00"],
        type=pa.timestamp("us"),
    ),
})

table.append(arrow_data)

# ── Read with predicate pushdown (no partition column needed in filter!) ──
scan = table.scan(
    row_filter="amount > 50 AND status = 'shipped'",
    selected_fields=("order_id", "customer_id", "amount"),
    limit=1000,
)
result_df = scan.to_arrow().to_pandas()

# ── Time travel ──
table_at_snapshot = table.scan(snapshot_id=table.history()[1].snapshot_id).to_arrow()

# ── Schema evolution (safe: add nullable column) ──
with table.update_schema() as update:
    update.add_column("region", StringType())

# ── Partition evolution (no data rewrite needed) ──
with table.update_spec() as update:
    update.add_field("status", IdentityTransform(), "status_partition")

Apache Hudi Architecture — Record-Level Indexing and Low-Latency Upserts

Apache Hudi (Hadoop Upserts Deletes and Incrementals), originally developed at Uber and open-sourced in 2017, was designed for a specific problem that Delta and Iceberg handle less efficiently: high-frequency, record-level upserts on large tables with low end-to-end latency (minutes, not hours).

Hudi's key differentiator is its record-level index. When a record is upserted, Hudi's index (bloom filter, HBase, or an in-memory global index) locates which file contains the existing version of that record, making targeted updates possible without scanning the full partition. This contrasts with Delta and Iceberg, which must read entire files during a MERGE to find matching rows.

Hudi exposes two storage types:

  • Copy-On-Write (COW): on each upsert, affected Parquet files are fully rewritten with the new record versions. Reads are always fast (no merging needed), but writes are expensive and produce write amplification. Best for read-heavy, low-upsert-frequency workloads.
  • Merge-On-Read (MOR): upserts are written to small Avro delta log files first (O(1) write cost). On read, the base Parquet file is merged with its delta logs in-memory. Periodic compaction collapses delta logs back into base files. Best for high-frequency upsert workloads where write latency matters more than read latency.

Hudi also supports incremental reads natively: you can query only the records that changed since a given commit timestamp, making it natural for CDC-to-lakehouse pipelines where you want to propagate only deltas downstream.

# Hudi with PySpark (requires the hudi-spark3-bundle JAR on the classpath)
from pyspark.sql import SparkSession

spark = SparkSession.builder     .config("spark.jars.packages",
            "org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0")     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")     .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.hudi.catalog.HoodieCatalog")     .config("spark.sql.extensions",
            "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")     .getOrCreate()

HUDI_TABLE = "orders_hudi"
BASE_PATH  = "s3://my-lake/hudi/orders"

# ── Common Hudi options ──
hudi_options = {
    "hoodie.table.name":                    HUDI_TABLE,
    "hoodie.datasource.write.table.type":   "COPY_ON_WRITE",   # or MERGE_ON_READ
    "hoodie.datasource.write.recordkey.field":    "order_id",
    "hoodie.datasource.write.precombine.field":   "event_ts",  # latest wins on dedup
    "hoodie.datasource.write.partitionpath.field": "status",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    # Bloom filter index for fast file-level record lookup
    "hoodie.index.type":          "BLOOM",
    "hoodie.bloom.index.parallelism": "200",
    # Auto-tune parallelism
    "hoodie.bulkinsert.shuffle.parallelism": "200",
    "hoodie.insert.shuffle.parallelism":     "200",
    "hoodie.upsert.shuffle.parallelism":     "200",
}

# ── Initial bulk insert ──
df = spark.read.parquet("s3://my-lake/raw/orders/")

df.write.format("hudi")     .options(**hudi_options)     .option("hoodie.datasource.write.operation", "bulk_insert")     .mode("overwrite")     .save(BASE_PATH)

# ── Upsert (incremental CDC batch) ──
cdc_batch = spark.read.parquet("s3://my-lake/cdc/orders/batch-42/")

cdc_batch.write.format("hudi")     .options(**hudi_options)     .option("hoodie.datasource.write.operation", "upsert")     .mode("append")     .save(BASE_PATH)

# ── Incremental read: only records changed after a given commit time ──
begin_ts = "20260601000000000"   # Hudi commit timestamp format

incremental_df = spark.read.format("hudi")     .option("hoodie.datasource.query.type",               "incremental")     .option("hoodie.datasource.read.begin.instanttime",   begin_ts)     .load(BASE_PATH)

# ── Trigger compaction (MOR tables only) ──
# spark.sql(f"CALL run_compaction(op => 'schedule', table => '{HUDI_TABLE}')")
# spark.sql(f"CALL run_compaction(op => 'run',      table => '{HUDI_TABLE}')")

Note

Hudi's precombine.field is the tiebreaker column when multiple incoming records share the same primary key in the same batch. Always set it to a timestamp or monotonically increasing sequence number — never to a field that can regress (like updated_at from a database that uses wall-clock time without NTP sync). A wrong precombine field causes silently incorrect upserts that are very difficult to diagnose.

Feature Comparison — Delta Lake vs Iceberg vs Hudi

The three formats have largely converged on basic ACID and time travel, but meaningful differences remain in multi-engine support, upsert efficiency, partition evolution, and ecosystem alignment. Here is a practical comparison across the dimensions that matter most in production:

┌─────────────────────────────────┬─────────────────┬─────────────────┬─────────────────┐
│ Feature                         │ Delta Lake      │ Apache Iceberg  │ Apache Hudi     │
├─────────────────────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ ACID transactions               │ ✓ OCC           │ ✓ OCC           │ ✓ OCC           │
│ Time travel                     │ ✓ version/ts    │ ✓ snapshot/ts   │ ✓ instant time  │
│ Schema evolution                │ ✓ additive      │ ✓ full          │ ✓ additive      │
│ Schema enforcement              │ ✓               │ ✓               │ ✓               │
│ Partition evolution             │ ✗ requires      │ ✓ native        │ partial         │
│                                 │   rewrite       │ no rewrite      │                 │
│ Hidden partitioning             │ ✗               │ ✓               │ ✗               │
│ Record-level index              │ ✗ (file-level)  │ ✗ (file-level)  │ ✓ bloom/HBase   │
│ Merge-on-read mode              │ ✗               │ ✗ (planned)     │ ✓ MOR native    │
│ Incremental reads               │ ✓ CDF           │ partial         │ ✓ native        │
│ Compaction                      │ OPTIMIZE        │ rewrite_data    │ async compaction│
│ Streaming ingest                │ ✓ Spark SS      │ ✓ Flink/Spark   │ ✓ Deltastreamer │
│ Multi-engine support            │ good            │ excellent       │ fair            │
│ Spark                           │ ✓ best-in-class │ ✓               │ ✓               │
│ Trino/Presto                    │ ✓ (community)   │ ✓ native        │ ✓ (limited)     │
│ Amazon Athena                   │ ✓ (v3)          │ ✓ native        │ ✓ (via Glue)    │
│ DuckDB                          │ ✓ (delta ext.)  │ ✓ (iceberg ext.)│ ✗               │
│ Snowflake external table        │ ✓ (UniCatalog)  │ ✓ native        │ ✗               │
│ Without Spark (Python only)     │ ✓ delta-rs      │ ✓ PyIceberg     │ partial hudi-rs │
│ Primary ecosystem               │ Databricks      │ AWS/Snowflake   │ AWS EMR/Uber    │
│ Governance layer                │ Unity Catalog   │ Polaris/Nessie  │ external        │
└─────────────────────────────────┴─────────────────┴─────────────────┴─────────────────┘

Multi-Engine Query Patterns — Spark, Trino, Athena, and DuckDB

One of the most important production considerations is which query engines your team needs to run against the same tables. Delta Lake's ecosystem is deepest with Databricks but has broadened significantly. Iceberg was designed from the start as an open, multi-engine format and tends to win when you need to span AWS Athena, Snowflake, and Trino simultaneously. Here are connector configurations for both:

-- ── Delta Lake: Trino connector (via Delta Hive connector) ──
-- trino/catalog/delta.properties
connector.name=hive
hive.metastore.uri=thrift://hive-metastore:9083
delta.enable-non-concurrent-writes=false

-- ── Delta Lake: Athena (v3 engine, reading from Glue catalog) ──
-- No extra config needed — Athena v3 supports Delta Lake tables
-- registered in Glue as symlink_format_manifest external tables.

CREATE EXTERNAL TABLE orders_delta
LOCATION 's3://my-lake/orders/'
TBLPROPERTIES ('table_type' = 'DELTA');

-- ── Iceberg: Trino connector ──
-- trino/catalog/iceberg.properties
connector.name=iceberg
iceberg.catalog.type=glue
hive.metastore.glue.region=eu-west-1
iceberg.file-format=PARQUET
iceberg.compression-codec=ZSTD

-- Query with Trino — partition pruning happens automatically
SELECT
    DATE_TRUNC('day', event_ts)  AS day,
    status,
    SUM(amount)                  AS revenue
FROM iceberg.analytics.orders
WHERE event_ts >= TIMESTAMP '2026-01-01 00:00:00'   -- hidden partition pruning
  AND amount > 0
GROUP BY 1, 2
ORDER BY 1, revenue DESC;

-- ── Iceberg: Amazon Athena (Iceberg v2 native) ──
CREATE TABLE orders_iceberg
WITH (
    table_type  = 'ICEBERG',
    location    = 's3://my-lake/warehouse/analytics/orders/',
    format      = 'PARQUET',
    partitioning = ARRAY['day(event_ts)']
) AS
SELECT * FROM raw_orders;

-- Time travel in Athena
SELECT * FROM orders_iceberg FOR SYSTEM_TIME AS OF TIMESTAMP '2026-05-01 00:00:00';

-- ── DuckDB: read Delta Lake table locally ──
INSTALL delta;
LOAD delta;

SELECT status, SUM(amount)
FROM delta_scan('s3://my-lake/orders/')
GROUP BY status;

-- ── DuckDB: read Iceberg table (via REST catalog) ──
INSTALL iceberg;
LOAD iceberg;

SELECT * FROM iceberg_scan(
    'path/to/iceberg/metadata.json',
    allow_moved_paths=true
);

Schema Evolution — Safe Changes vs Breaking Changes

All three formats support additive schema changes (adding nullable columns) without rewriting data. The differences emerge with more complex changes: column renaming, type widening, and dropping columns. Iceberg has the most complete schema evolution story, treating the schema as a versioned artifact with field IDs that persist across renames.

-- ── Delta Lake schema evolution with Spark SQL ──
-- Add column (safe — no rewrite)
ALTER TABLE delta.`s3://my-lake/orders` ADD COLUMNS (region STRING);

-- Rename column (requires Delta 2.0+ and spark.databricks.delta.schema.autoMerge.enabled)
ALTER TABLE delta.`s3://my-lake/orders` RENAME COLUMN customer TO customer_name;

-- Change type (only widening allowed: INT → BIGINT, FLOAT → DOUBLE)
ALTER TABLE delta.`s3://my-lake/orders` ALTER COLUMN amount TYPE DOUBLE;

-- Drop column (Delta 2.0+ with column mapping enabled)
ALTER TABLE delta.`s3://my-lake/orders` DROP COLUMN internal_flag;

-- ── Iceberg schema evolution with Spark SQL ──
-- Add column (safe — backward compatible by default)
ALTER TABLE iceberg.analytics.orders ADD COLUMN region STRING;

-- Rename column (field ID preserved — readers see both names during transition)
ALTER TABLE iceberg.analytics.orders RENAME COLUMN customer TO customer_name;

-- Type promotion (INT → BIGINT, FLOAT → DOUBLE, DECIMAL precision increase)
ALTER TABLE iceberg.analytics.orders ALTER COLUMN amount TYPE DECIMAL(18, 4);

-- Drop column (data not deleted from files — column omitted from reads)
ALTER TABLE iceberg.analytics.orders DROP COLUMN internal_flag;

-- ── PyIceberg: schema evolution API ──
from pyiceberg.types import StringType, DoubleType

table = catalog.load_table("analytics.orders")

with table.update_schema() as update:
    update.add_column("region",  StringType())              # new nullable column
    update.rename_column("customer", "customer_name")       # rename, ID preserved
    update.make_column_optional("amount")                   # relax required constraint
    # update.delete_column("internal_flag")                 # soft-delete column

Note

Iceberg column renames are non-breaking because columns are identified internally by their field ID, not their name. A reader compiled against the old schema (with customer) and a reader compiled against the new schema (with customer_name) can both read the same Parquet files because the field ID in the file footer matches either way. Delta Lake achieved the same with column mapping (introduced in Delta 2.0), but it must be explicitly enabled per table.

Table Maintenance — Compaction, Vacuuming, and Metadata Pruning

Open table formats accumulate small files and stale metadata over time. Without regular maintenance, query planning slows as the number of manifest entries and data files grows. Each format has its own maintenance API — automating these via Airflow or dbt is essential in production.

-- ── Delta Lake maintenance ──

-- OPTIMIZE: compact small files into larger ones (default target: 1 GB per file)
OPTIMIZE delta.`s3://my-lake/orders`;

-- OPTIMIZE with Z-order: sort data within files for better predicate pushdown
-- (co-locates records with similar values in the same file)
OPTIMIZE delta.`s3://my-lake/orders` ZORDER BY (customer_id, event_ts);

-- VACUUM: delete files no longer referenced by the transaction log
-- Default retention: 7 days (do NOT lower below 7 days in production)
VACUUM delta.`s3://my-lake/orders` RETAIN 168 HOURS;

-- Auto-optimize (Databricks-only): enable automatic file compaction
ALTER TABLE orders SET TBLPROPERTIES (
    'delta.autoOptimize.optimizeWrite' = 'true',
    'delta.autoOptimize.autoCompact'   = 'true'
);

-- ── Iceberg maintenance (via Spark + Iceberg procedures) ──

-- Rewrite small data files (equivalent to OPTIMIZE)
CALL iceberg.system.rewrite_data_files(
    table => 'analytics.orders',
    strategy => 'sort',
    sort_order => 'event_ts ASC NULLS LAST',
    options => map(
        'target-file-size-bytes', '134217728',   -- 128 MB target
        'min-file-size-bytes',    '8388608',      -- skip files already ≥ 8 MB
        'min-input-files',        '5'             -- only compact if ≥ 5 small files
    )
);

-- Rewrite manifest files (reduces planning overhead on large tables)
CALL iceberg.system.rewrite_manifests('analytics.orders');

-- Expire old snapshots (keeps last 5 days by default)
CALL iceberg.system.expire_snapshots(
    table               => 'analytics.orders',
    older_than          => TIMESTAMP '2026-05-27 00:00:00',
    retain_last         => 5
);

-- Delete orphan files (files not referenced by any snapshot)
CALL iceberg.system.delete_orphan_files(
    table            => 'analytics.orders',
    older_than       => TIMESTAMP '2026-05-27 00:00:00'
);

-- ── PyIceberg maintenance ──
from pyiceberg.catalog import load_catalog

catalog = load_catalog("glue", **{"type": "glue"})
table   = catalog.load_table("analytics.orders")

# Expire snapshots older than 7 days
from datetime import datetime, timedelta
cutoff_ms = int((datetime.utcnow() - timedelta(days=7)).timestamp() * 1000)
table.expire_snapshots().expire_older_than(cutoff_ms).commit()

Streaming Ingestion — Kafka to Lakehouse in Near-Real-Time

All three formats support streaming ingestion, but the recommended patterns differ. Delta Lake integrates tightly with Spark Structured Streaming via its writeStream API. Iceberg has first-class Flink integration and a Kafka Connect sink. Hudi provides DeltaStreamer (now HoodieStreamer), a standalone Spark job designed specifically for continuous Kafka-to-Hudi ingestion with schema registry support.

# ── Delta Lake: Spark Structured Streaming from Kafka ──
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType, TimestampType

spark = SparkSession.builder.getOrCreate()

schema = StructType([
    StructField("order_id",    LongType()),
    StructField("customer_id", LongType()),
    StructField("amount",      DoubleType()),
    StructField("status",      StringType()),
    StructField("event_ts",    TimestampType()),
])

kafka_stream = spark.readStream     .format("kafka")     .option("kafka.bootstrap.servers", "kafka:9092")     .option("subscribe", "orders")     .option("startingOffsets", "latest")     .load()     .select(from_json(col("value").cast("string"), schema).alias("data"))     .select("data.*")

# Write to Delta table with exactly-once semantics
query = kafka_stream.writeStream     .format("delta")     .outputMode("append")     .option("checkpointLocation", "s3://my-lake/checkpoints/orders")     .option("mergeSchema", "true")     .partitionBy("status")     .start("s3://my-lake/orders/")

query.awaitTermination()

# ── Iceberg: Apache Flink SQL streaming from Kafka ──
# (run in Flink SQL CLI or via PyFlink Table API)
"""
CREATE TABLE kafka_orders (
    order_id    BIGINT,
    customer_id BIGINT,
    amount      DOUBLE,
    status      STRING,
    event_ts    TIMESTAMP(3),
    WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND
) WITH (
    'connector'               = 'kafka',
    'topic'                   = 'orders',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format'                  = 'json',
    'scan.startup.mode'       = 'latest-offset'
);

CREATE TABLE iceberg_orders (
    order_id    BIGINT,
    customer_id BIGINT,
    amount      DOUBLE,
    status      STRING,
    event_ts    TIMESTAMP(3)
) WITH (
    'connector'            = 'iceberg',
    'catalog-name'         = 'glue_catalog',
    'catalog-type'         = 'glue',
    'warehouse'            = 's3://my-lake/warehouse',
    'database-name'        = 'analytics',
    'table-name'           = 'orders',
    'write.upsert.enabled' = 'false',    -- set true for MERGE semantics
    'format-version'       = '2'
);

INSERT INTO iceberg_orders SELECT * FROM kafka_orders;
"""

Decision Framework — Choosing the Right Open Table Format

The right choice depends on your primary query engine, upsert frequency, vendor preferences, and governance requirements. Here is a practical decision tree based on the most common real-world configurations:

Decision tree for open table format selection (2026)

1. Are you primarily on Databricks or planning to use Unity Catalog?
   → YES: Delta Lake. Unity Catalog is the governance layer; the Databricks
          engine is the deepest and most battle-tested Delta integration.
          delta-rs gives you Python access outside of Databricks.

2. Do you need broad multi-engine queries across Athena + Snowflake + Trino
   (or any two non-Databricks engines simultaneously)?
   → YES: Apache Iceberg. It is the only format with native connectors in all
          three. Iceberg tables in Snowflake are first-class citizens;
          Delta tables require an external connector workaround.

3. Do you have CDC pipelines that land tens of thousands of upserts per minute
   with a latency requirement under 5 minutes, and the primary access pattern
   is writes-heavy with periodic full scans?
   → YES: Apache Hudi (MOR). The bloom-filter record index and merge-on-read
          write path will outperform Delta/Iceberg MERGE operations that must
          scan all files in the affected partition.

4. Do you run primarily on AWS without Databricks (EMR, Glue, Athena)?
   → YES: Apache Iceberg. AWS has invested heavily in native Glue + Athena
          Iceberg support. Iceberg is the AWS-preferred format for S3-backed
          lakehouses as of 2025.

5. Do you need Python-only access without a JVM / Spark cluster?
   → BOTH delta-rs (Delta) and PyIceberg (Iceberg) work well.
      Hudi's Python support is still maturing (hudi-rs is experimental).

Summary matrix:
  Databricks-first:              Delta Lake
  AWS-first (no Databricks):     Apache Iceberg
  High-frequency CDC upserts:    Apache Hudi (MOR)
  Multi-cloud / multi-engine:    Apache Iceberg
  Tight Spark Streaming:         Delta Lake or Iceberg
  Python-only pipelines:         Delta Lake (delta-rs) or Iceberg (PyIceberg)

Note

The Databricks UniForm feature (Universal Format, GA in 2024) generates Iceberg and Hudi metadata alongside Delta log entries, allowing non-Delta engines to read Delta tables without format conversion. If you are on Databricks but need Athena or Snowflake read access, UniForm eliminates the need to choose between Delta and Iceberg entirely.

Production Patterns — Catalog Setup, Monitoring, and Airflow Maintenance Jobs

In production, open table format tables need three operational components beyond the initial setup: a catalog for table discovery and governance, monitoring for file count and snapshot size growth, and automated maintenance jobs that run on a schedule. Here is a complete Airflow-based maintenance pattern for Iceberg on AWS:

# dags/iceberg_maintenance.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import EmrServerlessStartJobRunOperator

MAINTENANCE_SCRIPT = """
import sys
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("iceberg-maintenance").getOrCreate()

TABLES = [
    ("analytics", "orders"),
    ("analytics", "events"),
    ("analytics", "users"),
]

for db, tbl in TABLES:
    full_name = f"glue.{db}.{tbl}"
    print(f"Maintaining {full_name}")

    # 1. Compact small files (sort by event_ts for better predicate pushdown)
    spark.sql(f\"\"\"
        CALL glue.system.rewrite_data_files(
            table     => '{full_name}',
            strategy  => 'sort',
            sort_order => 'event_ts ASC NULLS LAST',
            options   => map('target-file-size-bytes', '134217728')
        )
    \"\"\")

    # 2. Rewrite manifests
    spark.sql(f"CALL glue.system.rewrite_manifests('{full_name}')")

    # 3. Expire snapshots older than 7 days, keep at least 3
    spark.sql(f\"\"\"
        CALL glue.system.expire_snapshots(
            table         => '{full_name}',
            older_than    => TIMESTAMP '{
                (datetime.utcnow() - timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S')
            }',
            retain_last   => 3
        )
    \"\"\")

    # 4. Delete orphan files
    spark.sql(f\"\"\"
        CALL glue.system.delete_orphan_files(
            table      => '{full_name}',
            older_than => TIMESTAMP '{
                (datetime.utcnow() - timedelta(days=3)).strftime('%Y-%m-%d %H:%M:%S')
            }'
        )
    \"\"\")

spark.stop()
"""

with DAG(
    dag_id="iceberg_maintenance",
    start_date=datetime(2026, 1, 1),
    schedule="0 3 * * *",     # 03:00 UTC daily
    catchup=False,
    default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
) as dag:

    maintain = EmrServerlessStartJobRunOperator(
        task_id="run_maintenance",
        application_id="{{ var.value.EMR_SERVERLESS_APP_ID }}",
        execution_role_arn="{{ var.value.EMR_EXECUTION_ROLE_ARN }}",
        job_driver={
            "sparkSubmit": {
                "entryPoint": "s3://my-lake/scripts/iceberg_maintenance.py",
                "sparkSubmitParameters": (
                    "--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions "
                    "--conf spark.sql.catalog.glue=org.apache.iceberg.spark.SparkCatalog "
                    "--conf spark.sql.catalog.glue.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog "
                    "--conf spark.sql.catalog.glue.warehouse=s3://my-lake/warehouse "
                    "--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,"
                    "software.amazon.awssdk:bundle:2.20.18"
                ),
            }
        },
        configuration_overrides={
            "monitoringConfiguration": {
                "s3MonitoringConfiguration": {
                    "logUri": "s3://my-lake/emr-logs/"
                }
            }
        },
    )

Work with us

Evaluating Delta Lake, Iceberg, or Hudi for your lakehouse and unsure which fits your architecture?

We design and implement lakehouse architectures — from Delta Lake on Databricks with Unity Catalog and Z-ordering to Apache Iceberg on AWS with Glue catalog, hidden partitioning, Athena integration, and PyIceberg Python pipelines, to Apache Hudi MOR tables for high-frequency CDC upserts with DeltaStreamer, bloom-filter indexing, and async compaction. We also handle migrations between formats and set up automated maintenance pipelines with Airflow or dbt. Let’s talk.

Get in touch

Related Articles

DataSOps Consulting

Need help implementing this in production?

We build and operate data pipelines, AI systems, and observability stacks for engineering teams. Reach out for a free 30-minute architecture review.