Back to Blog
Apache IcebergLakehouseData EngineeringSparkTime TravelSchema EvolutionTrinoPyIceberg

Apache Iceberg in Production — Time Travel, Schema Evolution, and Lakehouse Architecture

A practical guide to Apache Iceberg in production: table format architecture (catalog, metadata.json, manifest lists, manifest files, data files), creating Iceberg tables with Spark and the REST catalog, safe schema evolution (add/rename/drop/alter column type without data rewrites), partition evolution with hidden partitioning transforms (years, months, days, hours, bucket, truncate), time travel with AS OF VERSION/TIMESTAMP and PyIceberg snapshot API, row-level DML (MERGE INTO upserts, DELETE WHERE, UPDATE) with copy-on-write vs merge-on-read trade-offs, catalog options (REST, AWS Glue, Hive Metastore, Project Nessie with git-like branching), query engine connectivity (Trino connector, Flink table API, DuckDB iceberg extension, PyIceberg), and table maintenance procedures (rewrite_data_files, rewrite_manifests, expire_snapshots, delete_orphan_files).

2026-05-27

Why Iceberg? The Problems With Hive Tables

The Hive table format — directories of Parquet or ORC files governed by a Hive Metastore — defined the first generation of data lakes. It works, but it carries four serious production problems that become increasingly painful at scale.

  • No ACID transactions. Concurrent writes corrupt partition directories. Readers see partial results during long-running ingestion jobs. Rolling back a failed load means manually deleting files.
  • No schema evolution. Adding or renaming a column in a Hive table does not rewrite existing Parquet files. Readers that do not tolerate schema mismatch fail silently or return incorrect nulls.
  • Expensive partition discovery. Hive Metastore stores partition metadata, but a full table scan still lists every partition directory on object storage — a latency-heavy operation that gets worse as partition counts grow into the millions.
  • No time travel. Once data is overwritten, the previous state is gone unless you manage your own S3 versioning or snapshot copies.

Apache Iceberg solves all four. It is an open table format specification — not a storage engine — that sits between your query engines (Spark, Trino, Flink, DuckDB) and your object storage (S3, GCS, ADLS). The format defines a rich metadata layer that enables ACID transactions, safe schema and partition evolution, snapshot-based time travel, and engine-independent table definitions.

Note

This article covers Apache Iceberg v2 (the current specification revision). Iceberg v2 adds row-level deletes (position and equality delete files), which enable efficient MERGE, UPDATE, and DELETE operations with merge-on-read semantics. All code examples use Apache Spark 3.5 with the iceberg-spark-runtime JAR and PyIceberg 0.7+ for Python examples.

Iceberg Table Format Architecture

Every Iceberg table is described by a hierarchy of metadata files stored alongside the data on object storage. Understanding this hierarchy is essential for reasoning about performance, consistency, and time travel.

Metadata Layer Hierarchy

# Iceberg metadata layer — from catalog to data files
#
# 1. CATALOG
#    The catalog maps table names to their current metadata.json pointer.
#    Catalogs: REST (open spec), AWS Glue, Hive Metastore, Project Nessie.
#    The catalog is the only mutable, globally consistent component.
#
# 2. metadata.json  (one per snapshot + current pointer)
#    Contains: table schema history, partition spec, sort order,
#              snapshot log, current-snapshot-id, properties.
#    Immutable once written. A new metadata.json is written on every commit.
#
# 3. MANIFEST LIST  (snap-<snapshot-id>.avro)
#    One per snapshot. Lists all manifest files that make up this snapshot.
#    Each entry in the manifest list contains:
#      - manifest file path
#      - partition_spec_id (enables partition evolution)
#      - added_files_count, existing_files_count, deleted_files_count
#      - partition summary (min/max per partition field — used for partition pruning)
#
# 4. MANIFEST FILES  (*.avro)
#    Each manifest lists a subset of data files and delete files.
#    Per data file entry: file path, format, partition values,
#                         record count, file size, column-level stats
#                         (null counts, lower bounds, upper bounds).
#    Column stats drive data skipping (eliminates files before reading).
#
# 5. DATA FILES  (*.parquet / *.orc / *.avro)
#    The actual data. Immutable. New writes create new files;
#    deletes create separate delete files rather than rewriting data.
#
# Physical layout on S3:
# s3://bucket/warehouse/db/orders/
# ├── metadata/
# │   ├── 00000-<uuid>.metadata.json     ← initial table creation
# │   ├── 00001-<uuid>.metadata.json     ← after first INSERT
# │   ├── snap-<id>-1-<uuid>.avro        ← manifest list for snapshot 1
# │   └── <uuid>-m0.avro                ← manifest file
# └── data/
#     └── event_date=2026-05-27/
#         └── 00000-<task-id>-<uuid>.parquet

The key insight is that every commit (INSERT, DELETE, MERGE) creates a new immutable snapshot: a new metadata.json pointing to a new manifest list. Old snapshots are retained until explicitly expired, which is what enables time travel. Readers always see a consistent point-in-time view of the table by following the snapshot chain from the catalog pointer.

Getting Started: Creating an Iceberg Table with Spark

# spark-defaults.conf (or SparkSession config)
# Use the REST catalog — works with any catalog server implementing the Iceberg REST spec.

spark.sql.extensions                     org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.iceberg                org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type           rest
spark.sql.catalog.iceberg.uri            http://iceberg-catalog:8181
spark.sql.catalog.iceberg.warehouse      s3://my-bucket/warehouse
spark.sql.catalog.iceberg.io-impl        org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.iceberg.s3.region      us-east-1

# For Glue catalog instead:
# spark.sql.catalog.iceberg.type           glue
# spark.sql.catalog.iceberg.warehouse      s3://my-bucket/warehouse
-- Create a partitioned Iceberg table with Spark SQL
-- Note: Iceberg uses USING iceberg, not STORED AS PARQUET

CREATE TABLE iceberg.db.orders (
    order_id        STRING        NOT NULL,
    customer_id     STRING        NOT NULL,
    event_type      STRING        NOT NULL,
    amount_cents    BIGINT        NOT NULL,
    currency_code   STRING        NOT NULL,
    created_at      TIMESTAMP     NOT NULL,
    region          STRING
)
USING iceberg
PARTITIONED BY (days(created_at), region)
TBLPROPERTIES (
    'write.format.default'             = 'parquet',
    'write.parquet.compression-codec'  = 'snappy',
    'write.target-file-size-bytes'     = '134217728',  -- 128 MB target
    'write.merge.mode'                 = 'merge-on-read',
    'history.expire.max-snapshot-age-ms' = '604800000' -- 7 days
);

-- Insert sample data
INSERT INTO iceberg.db.orders VALUES
    ('ord-001', 'cust-101', 'ORDER_CREATED', 4999, 'USD', TIMESTAMP '2026-05-27 10:00:00', 'us-east'),
    ('ord-002', 'cust-102', 'ORDER_SHIPPED',  2500, 'EUR', TIMESTAMP '2026-05-27 11:30:00', 'eu-west');

-- Verify the table and its snapshots
SELECT * FROM iceberg.db.orders.snapshots;
SELECT * FROM iceberg.db.orders.files;
SELECT * FROM iceberg.db.orders.manifests;

Note

The days(created_at) partition transform is Iceberg hidden partitioning. The partition column is not stored in the table schema; Iceberg derives the partition value from created_at at write time. Queries that filter on created_at automatically benefit from partition pruning without requiring the caller to know the partition column name.

Schema Evolution

Iceberg schema evolution is safe: every DDL operation updates only the metadata layer. No data files are rewritten. Columns are identified internally by unique numeric IDs, not names, so renaming a column does not break existing data readers.

DDL Operations in Spark SQL

-- ADD COLUMN — safe, always backward-compatible
-- New column appears as NULL in existing data files.
ALTER TABLE iceberg.db.orders
ADD COLUMN shipping_address_id STRING
AFTER region;

-- Add a nested struct column
ALTER TABLE iceberg.db.orders
ADD COLUMN metadata MAP<STRING, STRING>
COMMENT 'Optional key-value extensibility bag';

-- RENAME COLUMN — safe, no data rewrite
-- Old file readers continue working because columns are identified by ID.
ALTER TABLE iceberg.db.orders
RENAME COLUMN region TO sales_region;

-- DROP COLUMN — safe, existing files retain the bytes but the column is hidden
ALTER TABLE iceberg.db.orders
DROP COLUMN shipping_address_id;

-- ALTER COLUMN TYPE — only promotions are safe (int→long, float→double, decimal widening)
ALTER TABLE iceberg.db.orders
ALTER COLUMN amount_cents TYPE BIGINT;

-- Make a column optional (remove NOT NULL constraint)
ALTER TABLE iceberg.db.orders
ALTER COLUMN currency_code DROP NOT NULL;

Schema Evolution with PyIceberg

# PyIceberg schema evolution API
# pip install pyiceberg[s3,glue]  or  pyiceberg[s3,rest]

from pyiceberg.catalog import load_catalog
from pyiceberg.types import StringType, LongType

catalog = load_catalog(
    "rest",
    **{
        "type": "rest",
        "uri": "http://iceberg-catalog:8181",
        "warehouse": "s3://my-bucket/warehouse",
        "s3.region": "us-east-1",
    },
)

table = catalog.load_table("db.orders")

# Update schema using the fluent update_schema() API
with table.update_schema() as update:
    update.add_column(
        path="loyalty_tier",
        field_type=StringType(),
        doc="Customer loyalty tier at time of order: BRONZE, SILVER, GOLD, PLATINUM",
    )
    update.rename_column("sales_region", "region_code")
    update.make_column_optional("metadata")

print(table.schema())  # reflects the new schema immediately

Partition Evolution

One of Iceberg's most powerful features is partition evolution: changing the partition strategy without rewriting existing data. Old data files retain their original partition layout; new files use the new partition spec. Iceberg tracks which partition spec each manifest file was written with and routes queries correctly across both.

Changing Partition Spec

-- Current partition spec: days(created_at), region_code
-- After high-volume growth, switch to hours(created_at) for finer granularity.
-- Existing daily-partitioned files are NOT rewritten.

ALTER TABLE iceberg.db.orders
REPLACE PARTITION FIELD days(created_at) WITH hours(created_at);

-- Or add a bucket partition on customer_id to improve join performance:
ALTER TABLE iceberg.db.orders
ADD PARTITION FIELD bucket(16, customer_id);

-- Remove a partition field (old files retain it, new files don't):
ALTER TABLE iceberg.db.orders
DROP PARTITION FIELD region_code;

-- Hidden partitioning transform reference:
-- years(ts)        → partition by year   (e.g., 2026)
-- months(ts)       → partition by month  (e.g., 2026-05)
-- days(ts)         → partition by day    (e.g., 2026-05-27)
-- hours(ts)        → partition by hour   (e.g., 2026-05-27-10)
-- bucket(N, col)   → hash bucket into N buckets (good for joins, high-cardinality keys)
-- truncate(W, col) → truncate string/int to width W (good for prefix-based filtering)
-- identity(col)    → explicit value-based partition (classic Hive-style)

Note

Partition evolution is tracked by partition_spec_id in each manifest list entry. When a query engine plans a scan, it reads the manifest list, sees the spec ID for each manifest, applies the correct partition pruning logic per spec, and only opens the relevant manifest files. This is why mixed-spec tables are fully transparent to query engines — they do not need to know about the evolution history.

Time Travel and Snapshot Management

Every Iceberg commit produces an immutable snapshot. Time travel queries read the table as it existed at a specific snapshot ID or timestamp without touching current data. This enables audit trails, reproducing model training datasets, and rollback workflows.

Time Travel Queries in Spark SQL

-- List all snapshots and their timestamps
SELECT snapshot_id, committed_at, operation, summary
FROM iceberg.db.orders.snapshots
ORDER BY committed_at DESC;

-- Time travel by snapshot ID
SELECT COUNT(*) FROM iceberg.db.orders
VERSION AS OF 8765432109876543;

-- Time travel by timestamp (reads the snapshot at or before this timestamp)
SELECT * FROM iceberg.db.orders
TIMESTAMP AS OF '2026-05-26 18:00:00';

-- Compare two snapshots to find changed rows (audit / debugging)
SELECT a.order_id, a.event_type AS old_type, b.event_type AS new_type
FROM (
    SELECT order_id, event_type
    FROM iceberg.db.orders VERSION AS OF 8765432109876543
) a
JOIN (
    SELECT order_id, event_type
    FROM iceberg.db.orders VERSION AS OF 9876543210987654
) b ON a.order_id = b.order_id
WHERE a.event_type != b.event_type;

-- Rollback table to a previous snapshot (updates catalog pointer only)
CALL iceberg.system.rollback_to_snapshot('db.orders', 8765432109876543);

-- Or rollback to a timestamp
CALL iceberg.system.rollback_to_timestamp('db.orders', TIMESTAMP '2026-05-26 18:00:00');

Reading Snapshots with PyIceberg

# Time travel and snapshot inspection with PyIceberg

from pyiceberg.catalog import load_catalog
import pyarrow as pa

catalog = load_catalog("rest", **{"type": "rest", "uri": "http://iceberg-catalog:8181",
                                   "warehouse": "s3://my-bucket/warehouse"})

table = catalog.load_table("db.orders")

# List all snapshots
for snapshot in table.snapshots():
    print(f"snapshot_id={snapshot.snapshot_id} "
          f"committed_at={snapshot.timestamp_ms} "
          f"operation={snapshot.summary.get('operation')}")

# Read a specific snapshot by ID
snapshot_id = 8765432109876543
arrow_table: pa.Table = table.scan(snapshot_id=snapshot_id).to_arrow()
print(f"Row count at snapshot {snapshot_id}: {len(arrow_table)}")

# Read the snapshot at a specific point in time (milliseconds since epoch)
import time
ts_ms = int(time.mktime((2026, 5, 26, 18, 0, 0, 0, 0, 0)) * 1000)
df = table.scan(as_of_timestamp=ts_ms).to_arrow()
print(f"Row count at 2026-05-26 18:00:00: {len(df)}")

Row-Level Operations: MERGE, UPDATE, DELETE

Iceberg v2 supports row-level DML through two write modes that trade off write amplification against read amplification:

  • Copy-on-Write (CoW): On UPDATE/DELETE/MERGE, entire affected data files are rewritten with the changes applied. Reads are fast (no delete files to apply). Writes are expensive. Best for low-frequency bulk corrections on large files.
  • Merge-on-Read (MoR): On UPDATE/DELETE/MERGE, small delete files (position or equality delete files) are written instead of rewriting data files. Writes are cheap. Reads must merge the delete files on the fly. Best for high-frequency upserts with many small changes.

MERGE INTO — Upsert Pattern

-- MERGE INTO upsert: insert new rows, update existing ones
-- Source table: staging area with CDC records

MERGE INTO iceberg.db.orders AS target
USING iceberg.db.orders_staging AS source
ON target.order_id = source.order_id
WHEN MATCHED AND source.event_type != target.event_type THEN
    UPDATE SET
        target.event_type   = source.event_type,
        target.amount_cents = source.amount_cents,
        target.created_at   = source.created_at
WHEN NOT MATCHED THEN
    INSERT (order_id, customer_id, event_type, amount_cents,
            currency_code, created_at, region_code)
    VALUES (source.order_id, source.customer_id, source.event_type,
            source.amount_cents, source.currency_code,
            source.created_at, source.region_code);

-- DELETE with predicate — removes rows and writes a delete file (MoR) or rewrites (CoW)
DELETE FROM iceberg.db.orders
WHERE created_at < TIMESTAMP '2024-01-01 00:00:00'
  AND event_type = 'ORDER_CANCELLED';

-- UPDATE — same write-mode trade-off as DELETE
UPDATE iceberg.db.orders
SET event_type = 'ORDER_DELIVERED'
WHERE order_id = 'ord-001'
  AND event_type = 'ORDER_SHIPPED';

Configuring Write Mode Per Table

-- Set write mode at the table level via TBLPROPERTIES
-- copy-on-write (default for new tables if not set):
ALTER TABLE iceberg.db.orders SET TBLPROPERTIES (
    'write.merge.mode'  = 'copy-on-write',
    'write.update.mode' = 'copy-on-write',
    'write.delete.mode' = 'copy-on-write'
);

-- merge-on-read (better for high-frequency streaming upserts):
ALTER TABLE iceberg.db.orders_streaming SET TBLPROPERTIES (
    'write.merge.mode'  = 'merge-on-read',
    'write.update.mode' = 'merge-on-read',
    'write.delete.mode' = 'merge-on-read'
);

-- After accumulating many delete files under MoR, run compaction
-- (see Table Maintenance section) to restore read performance.

Iceberg Catalogs: REST, Glue, Hive, Nessie

The catalog is the authoritative registry that maps table names to their current metadata file pointer. Every Iceberg write is a catalog commit: an atomic swap of the metadata pointer. Choosing the right catalog determines concurrency guarantees, multi-engine access, and operational complexity.

Catalog Comparison

# Catalog comparison (Iceberg REST Spec / Glue / Hive Metastore / Nessie)
#
# REST Catalog (open spec)
#   Best for:    Multi-engine environments; decoupled from cloud vendor
#   Concurrency: Optimistic locking via ETags on the catalog server
#   Tools:       Project Nessie, Tabular, Lakeformation, custom implementations
#   Config:      spark.sql.catalog.X.type = rest
#                spark.sql.catalog.X.uri  = http://catalog-server:8181
#
# AWS Glue Data Catalog
#   Best for:    AWS-native stacks (EMR, Glue Jobs, Athena, SageMaker)
#   Concurrency: Optimistic locking via Glue API; conditional updates
#   Tools:       Native AWS — no extra service to operate
#   Config:      spark.sql.catalog.X.type    = glue
#                spark.sql.catalog.X.warehouse = s3://bucket/warehouse
#                (uses AWS SDK credentials from environment)
#
# Hive Metastore
#   Best for:    Migration from existing Hive/Spark stacks
#   Concurrency: HMS lock manager (requires HMS 3.x for Iceberg support)
#   Limitation:  Not ideal for high-concurrency writers; HMS is a bottleneck
#   Config:      spark.sql.catalog.X.type           = hive
#                spark.sql.catalog.X.uri             = thrift://hms:9083
#
# Project Nessie (git-like catalog)
#   Best for:    Multi-branch workflows, isolated experiments, data-as-code
#   Concurrency: Git-like branching with merge/diff/compare; conflict detection
#   Tools:       Dremio (Nessie maintainer), Docker image, Kubernetes Helm chart
#   Config:      spark.sql.catalog.X.type             = nessie
#                spark.sql.catalog.X.uri              = http://nessie:19120/api/v1
#                spark.sql.catalog.X.ref              = main

REST Catalog Setup (Docker)

# docker-compose.yml — Iceberg REST catalog backed by a local filesystem (dev)
# For production, use Tabular, Nessie, or build your own REST catalog server.

version: "3.9"
services:
  iceberg-rest:
    image: tabulario/iceberg-rest:0.11.0
    ports:
      - "8181:8181"
    environment:
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
      AWS_REGION: us-east-1
      CATALOG_WAREHOUSE: s3://my-iceberg-bucket/warehouse
      CATALOG_IO__IMPL: org.apache.iceberg.aws.s3.S3FileIO
      CATALOG_S3_ENDPOINT: https://s3.us-east-1.amazonaws.com

Nessie: Git-Like Branching for Data

Project Nessie extends the Iceberg REST catalog with Git-style branching: create a branch, run transformations in isolation, then merge the branch back to main. Useful for testing data migrations without affecting production readers.

# Nessie branching workflow with PyIceberg
from pynessie import init
from pyiceberg.catalog import load_catalog

nessie_client = init("http://nessie:19120/api/v1")

# Create a feature branch from main
nessie_client.create_branch("feature/add-loyalty-tier", "main")

# Load the catalog pointing to the feature branch
catalog = load_catalog(
    "nessie",
    **{
        "type": "nessie",
        "uri": "http://nessie:19120/api/v1",
        "ref": "feature/add-loyalty-tier",
        "warehouse": "s3://my-iceberg-bucket/warehouse",
    },
)

# Perform table changes on the branch (schema evolution, data load)
table = catalog.load_table("db.orders")
with table.update_schema() as update:
    update.add_column("loyalty_tier", field_type="string")

# After validation, merge the branch back to main via the Nessie API
nessie_client.merge("feature/add-loyalty-tier", "main")

Query Engines: Trino, Flink, DuckDB, PyIceberg

Trino Connector

Trino's Iceberg connector supports REST, Glue, and Hive Metastore catalogs, full time travel, and pushdown of partition pruning and column statistics to minimize data scanned.

# etc/catalog/iceberg.properties (Trino catalog configuration)
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://iceberg-catalog:8181
iceberg.rest-catalog.warehouse=s3://my-bucket/warehouse

# S3 file system config
fs.native-s3.enabled=true
s3.region=us-east-1
s3.aws-access-key=${AWS_ACCESS_KEY_ID}
s3.aws-secret-key=${AWS_SECRET_ACCESS_KEY}

# Optional: tune for analytics workloads
iceberg.file-format=PARQUET
iceberg.compression-codec=SNAPPY
iceberg.max-partitions-per-writer=100
-- Trino: time travel and metadata queries
USE iceberg.db;

-- Time travel by snapshot ID
SELECT COUNT(*) FROM orders FOR VERSION AS OF 8765432109876543;

-- Time travel by timestamp
SELECT SUM(amount_cents) FROM orders
FOR TIMESTAMP AS OF TIMESTAMP '2026-05-26 18:00:00 UTC';

-- Inspect table history
SELECT * FROM "orders$snapshots" ORDER BY committed_at DESC LIMIT 10;
SELECT * FROM "orders$files"     LIMIT 20;
SELECT * FROM "orders$manifests" LIMIT 10;

Apache Flink Table API

-- Flink SQL: create an Iceberg table and read/write with exactly-once semantics
-- Requires: flink-iceberg-runtime JAR on the Flink cluster classpath

CREATE CATALOG iceberg_catalog WITH (
    'type'            = 'iceberg',
    'catalog-type'    = 'rest',
    'uri'             = 'http://iceberg-catalog:8181',
    'warehouse'       = 's3://my-bucket/warehouse',
    'io-impl'         = 'org.apache.iceberg.flink.FlinkFileIO'
);

USE CATALOG iceberg_catalog;

-- Streaming insert from a Kafka source into Iceberg (exactly-once)
INSERT INTO db.orders
SELECT
    order_id,
    customer_id,
    event_type,
    amount_cents,
    currency_code,
    TO_TIMESTAMP_LTZ(created_at_epoch_ms, 3) AS created_at,
    region_code
FROM kafka_orders_source;

-- Flink checkpointing drives Iceberg commit frequency:
-- each checkpoint = one Iceberg snapshot (configurable per-sink).

DuckDB + Iceberg Extension

-- DuckDB: query Iceberg tables directly from S3 (no Spark needed)
-- Requires DuckDB >= 0.10 with the iceberg and httpfs extensions

INSTALL iceberg;
INSTALL httpfs;
LOAD iceberg;
LOAD httpfs;

SET s3_region = 'us-east-1';
SET s3_access_key_id     = '${AWS_ACCESS_KEY_ID}';
SET s3_secret_access_key = '${AWS_SECRET_ACCESS_KEY}';

-- Read the current snapshot of an Iceberg table
SELECT COUNT(*), SUM(amount_cents)
FROM iceberg_scan('s3://my-bucket/warehouse/db/orders');

-- Time travel by snapshot ID
SELECT *
FROM iceberg_scan(
    's3://my-bucket/warehouse/db/orders',
    snapshot_id = 8765432109876543
);

PyIceberg: Python-Native Reads

# PyIceberg: read Iceberg tables into pandas or polars without a JVM
# pip install "pyiceberg[s3,pyarrow]"

from pyiceberg.catalog import load_catalog
import pyarrow.compute as pc

catalog = load_catalog("rest", **{
    "type": "rest",
    "uri": "http://iceberg-catalog:8181",
    "warehouse": "s3://my-bucket/warehouse",
})

table = catalog.load_table("db.orders")

# Push-down filter — Iceberg applies it at the manifest / file level (data skipping)
result = (
    table.scan(
        row_filter="created_at >= '2026-05-01' AND region_code = 'us-east'",
        selected_fields=("order_id", "event_type", "amount_cents"),
        limit=10_000,
    )
    .to_arrow()
)

import pandas as pd
df: pd.DataFrame = result.to_pandas()
print(df.groupby("event_type")["amount_cents"].sum())

File Compaction and Table Maintenance

Iceberg tables accumulate small files from streaming writes and delete files from merge-on-read operations. Without maintenance, this degrades query performance. Iceberg ships four core maintenance procedures that should be scheduled regularly.

Spark Maintenance Procedures

-- 1. rewrite_data_files: compact small files into larger ones
--    Also applies pending MoR delete files, restoring CoW read performance.
CALL iceberg.system.rewrite_data_files(
    table             => 'db.orders',
    strategy          => 'binpack',            -- 'binpack' or 'sort'
    options           => map(
        'target-file-size-bytes', '134217728', -- 128 MB target
        'max-concurrent-file-group-rewrites', '10'
    ),
    -- Only compact files in the last 7 days' partitions (avoid full table scan)
    where             => 'created_at >= current_date - interval 7 days'
);

-- 2. rewrite_manifests: merge small manifests into fewer, larger ones
--    Reduces the number of manifest files a scan must open.
CALL iceberg.system.rewrite_manifests(
    table => 'db.orders',
    use_caching => true
);

-- 3. expire_snapshots: delete snapshot metadata and unreferenced data files
--    Retains snapshots newer than the specified timestamp.
CALL iceberg.system.expire_snapshots(
    table               => 'db.orders',
    older_than          => TIMESTAMP '2026-05-20 00:00:00',
    retain_last         => 5,                  -- always keep at least 5 snapshots
    max_concurrent_deletes => 10
);

-- 4. delete_orphan_files: remove files on storage with no metadata reference
--    Safe to run after expire_snapshots. Use a time buffer to avoid
--    deleting files from in-progress writes.
CALL iceberg.system.delete_orphan_files(
    table          => 'db.orders',
    older_than     => TIMESTAMP '2026-05-20 00:00:00',
    location       => 's3://my-bucket/warehouse/db/orders'
);

PyIceberg Maintenance API

# PyIceberg maintenance operations — useful for orchestrating from Airflow or dbt

from pyiceberg.catalog import load_catalog
from datetime import datetime, timedelta, timezone

catalog = load_catalog("rest", **{
    "type": "rest",
    "uri": "http://iceberg-catalog:8181",
    "warehouse": "s3://my-bucket/warehouse",
})

table = catalog.load_table("db.orders")

# Expire snapshots older than 7 days
cutoff = datetime.now(timezone.utc) - timedelta(days=7)
result = table.expire_snapshots().expire_older_than(cutoff).commit()
print(f"Expired {result['deleted-manifests']} manifests, "
      f"{result['deleted-data-files']} data files, "
      f"{result['deleted-delete-files']} delete files")

# Rewrite data files — Python API delegates to a Spark action
# For production, prefer the Spark SQL CALL procedures above.
# PyIceberg rewrite is available via table.rewrite_data_files()
# but requires a Spark session or a compatible action executor.

Airflow DAG for Scheduled Maintenance

# airflow/dags/iceberg_maintenance.py
# Runs nightly: compaction → manifest rewrite → snapshot expiry → orphan cleanup

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

default_args = {
    "owner": "data-platform",
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="iceberg_maintenance",
    default_args=default_args,
    schedule="0 2 * * *",   # 02:00 UTC daily
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["iceberg", "maintenance"],
) as dag:

    compact = SparkSubmitOperator(
        task_id="compact_data_files",
        application="s3://my-bucket/jobs/iceberg_maintenance.py",
        application_args=["--procedure", "rewrite_data_files", "--table", "db.orders"],
        conf={
            "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
            "spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog",
            "spark.sql.catalog.iceberg.type": "rest",
            "spark.sql.catalog.iceberg.uri": "http://iceberg-catalog:8181",
        },
    )

    rewrite_manifests = SparkSubmitOperator(
        task_id="rewrite_manifests",
        application="s3://my-bucket/jobs/iceberg_maintenance.py",
        application_args=["--procedure", "rewrite_manifests", "--table", "db.orders"],
    )

    expire = SparkSubmitOperator(
        task_id="expire_snapshots",
        application="s3://my-bucket/jobs/iceberg_maintenance.py",
        application_args=["--procedure", "expire_snapshots", "--table", "db.orders",
                          "--retain-days", "7"],
    )

    orphan_cleanup = SparkSubmitOperator(
        task_id="delete_orphan_files",
        application="s3://my-bucket/jobs/iceberg_maintenance.py",
        application_args=["--procedure", "delete_orphan_files", "--table", "db.orders"],
    )

    compact >> rewrite_manifests >> expire >> orphan_cleanup

Note

For the full Spark configuration reference, see the Iceberg Spark configuration docs. For Python-native workflows without Spark, PyIceberg provides a complete catalog and table API including schema evolution, snapshot management, and data scanning via PyArrow — no JVM required.

Work with us

Building a lakehouse and evaluating Apache Iceberg for production data engineering?

We design and implement Apache Iceberg lakehouses — from catalog setup (REST, Glue, Nessie) and Spark table configuration to schema evolution strategies, partition evolution, row-level DML with copy-on-write vs merge-on-read tuning, time travel audit workflows, and table maintenance automation with Airflow or dbt. Let’s talk.

Get in touch

Related Articles

DataSOps Consulting

Need help implementing this in production?

We build and operate data pipelines, AI systems, and observability stacks for engineering teams. Reach out for a free 30-minute architecture review.