Back to Blog
Data ReliabilitySLAData ObservabilityData EngineeringPipelinesSLO

The Real Cost of Data Downtime — Measuring SLA Impact and Building Resilient Pipelines

A practical guide to quantifying and reducing data downtime: calculating the business cost of stale or missing data, defining freshness and completeness SLOs, instrumenting pipelines with Prometheus metrics and Great Expectations, implementing circuit breakers and dead letter queues, building idempotent writes with PostgreSQL UPSERT and Delta Lake MERGE, and prioritising reliability with a four-tier pipeline model.

2026-05-08

What Data Downtime Actually Costs

Data downtime is the period during which data is missing, inaccurate, or otherwise unreliable. Unlike infrastructure downtime — where a service is clearly unavailable — data downtime is often silent. A dashboard shows stale numbers. An ML model scores on yesterday's features. A finance report misses three hours of transactions. Nobody knows until a business analyst calls at 8 AM, or worse, a customer notices an anomaly.

The costs are real and compound quickly. According to Monte Carlo Data's 2022 State of Data Quality survey, data engineers spend an average of 40% of their time on data quality issues. Gartner estimates poor data quality costs organisations an average of $12.9 million per year. But the direct costs — engineering hours debugging pipelines — are only the surface. The deeper costs hit decision quality, customer trust, and regulatory exposure.

Direct Costs

Engineering time to detect, diagnose, and remediate pipeline failures. SLA breach penalties if contractual data delivery commitments are violated. Reprocessing compute costs when bad data must be corrected and replayed through downstream systems.

Decision Quality Costs

Business decisions made on stale or incorrect data. Marketing campaigns targeting wrong segments. Inventory systems buying excess stock. Fraud models missing new attack patterns. These costs are invisible on engineering dashboards but very visible in quarterly results.

Trust and Compliance Costs

Once business stakeholders lose confidence in data, they stop using it — reverting to spreadsheets and instinct. Rebuilding data trust takes months. Regulatory exposure from inaccurate reporting (GDPR, SOX, HIPAA) can exceed all other costs combined.

Measuring SLA Impact: Data Freshness and Completeness SLOs

The first step toward reliability is measurement. Most data teams lack formal SLOs for their pipelines. Without them, "the pipeline is down" is as specific as the incident report gets. Define SLOs in terms users care about: freshness (how recent is the latest data?), completeness (is any data missing?), schema validity (does the data match expected shape?), and distribution consistency (are values within expected ranges?).

The Gartner Data Quality framework identifies six dimensions: accuracy, completeness, consistency, timeliness, uniqueness, and validity. For operational data pipelines, the four most tractable dimensions to instrument are freshness, completeness, schema validity, and row-count anomaly detection.

# data_slo_tracker.py
# Tracks pipeline SLOs and exposes Prometheus metrics
# pip install prometheus-client sqlalchemy psycopg2-binary

import time
import datetime
from prometheus_client import start_http_server, Gauge, Counter
from sqlalchemy import create_engine, text
import os

# --- Prometheus metrics ---
FRESHNESS_SECONDS = Gauge(
    "data_pipeline_freshness_seconds",
    "Seconds since the latest record was ingested",
    ["pipeline", "table"],
)
COMPLETENESS_RATIO = Gauge(
    "data_pipeline_completeness_ratio",
    "Ratio of expected rows received vs expected (1.0 = complete)",
    ["pipeline", "table", "window"],
)
SLO_BREACH_TOTAL = Counter(
    "data_pipeline_slo_breach_total",
    "Number of SLO breaches detected",
    ["pipeline", "table", "slo_type"],
)
ROW_COUNT_ANOMALY = Gauge(
    "data_pipeline_row_count_anomaly_score",
    "Z-score of current row count vs 30-day average (|score| > 3 = anomaly)",
    ["pipeline", "table"],
)

# --- SLO definitions: pipeline -> table -> thresholds ---
SLO_CONFIG = {
    "orders": {
        "orders_fact": {
            "max_freshness_seconds": 300,       # 5-minute SLO
            "completeness_window_minutes": 60,
            "expected_rows_per_hour": 2000,
            "completeness_threshold": 0.95,     # 95% minimum
        },
    },
    "analytics": {
        "sessions_daily": {
            "max_freshness_seconds": 3600,      # 1-hour SLO
            "completeness_window_minutes": 1440,
            "expected_rows_per_hour": 500,
            "completeness_threshold": 0.99,
        },
    },
}

def check_freshness(engine, pipeline: str, table: str, config: dict):
    """Query max(updated_at) and compute lag in seconds."""
    with engine.connect() as conn:
        row = conn.execute(
            text(f"SELECT MAX(updated_at) AS latest FROM {table}")
        ).fetchone()

    latest = row.latest if row and row.latest else None
    if latest is None:
        lag = float("inf")
    else:
        lag = (datetime.datetime.utcnow() - latest).total_seconds()

    FRESHNESS_SECONDS.labels(pipeline=pipeline, table=table).set(lag)

    slo_max = config["max_freshness_seconds"]
    if lag > slo_max:
        SLO_BREACH_TOTAL.labels(
            pipeline=pipeline, table=table, slo_type="freshness"
        ).inc()
        print(f"[BREACH] {pipeline}.{table}: freshness {lag:.0f}s > SLO {slo_max}s")

    return lag

def check_completeness(engine, pipeline: str, table: str, config: dict):
    """Compare actual row count in the last window vs expected."""
    window = config["completeness_window_minutes"]
    expected_rate = config["expected_rows_per_hour"]
    threshold = config["completeness_threshold"]

    expected = expected_rate * (window / 60)

    with engine.connect() as conn:
        row = conn.execute(
            text(
                f"""
                SELECT COUNT(*) AS actual
                FROM {table}
                WHERE updated_at >= NOW() - INTERVAL '{window} minutes'
                """
            )
        ).fetchone()

    actual = row.actual if row else 0
    ratio = actual / max(expected, 1)

    COMPLETENESS_RATIO.labels(
        pipeline=pipeline, table=table, window=f"{window}m"
    ).set(ratio)

    if ratio < threshold:
        SLO_BREACH_TOTAL.labels(
            pipeline=pipeline, table=table, slo_type="completeness"
        ).inc()
        print(
            f"[BREACH] {pipeline}.{table}: completeness {ratio:.2%} < SLO {threshold:.0%} "
            f"(got {actual}, expected ~{expected:.0f} in last {window}m)"
        )

    return ratio

def check_row_count_anomaly(engine, pipeline: str, table: str):
    """Compute Z-score of today's row count vs 30-day rolling average."""
    with engine.connect() as conn:
        stats = conn.execute(
            text(
                f"""
                WITH daily AS (
                    SELECT DATE(updated_at) AS day, COUNT(*) AS cnt
                    FROM {table}
                    WHERE updated_at >= NOW() - INTERVAL '31 days'
                    GROUP BY 1
                )
                SELECT
                    AVG(cnt) AS mean,
                    STDDEV(cnt) AS stddev,
                    MAX(CASE WHEN day = CURRENT_DATE THEN cnt END) AS today
                FROM daily
                WHERE day < CURRENT_DATE OR day = CURRENT_DATE
                """
            )
        ).fetchone()

    if not stats or stats.stddev is None or float(stats.stddev) == 0:
        return 0.0

    z = (float(stats.today or 0) - float(stats.mean)) / float(stats.stddev)
    ROW_COUNT_ANOMALY.labels(pipeline=pipeline, table=table).set(abs(z))
    if abs(z) > 3:
        print(f"[ANOMALY] {pipeline}.{table}: row count Z-score {z:.2f}")
    return z

def main():
    db_url = os.environ["DATABASE_URL"]   # postgresql://user:pass@host/db
    engine = create_engine(db_url)

    start_http_server(8081)
    print("Data SLO tracker running on :8081/metrics")

    while True:
        for pipeline, tables in SLO_CONFIG.items():
            for table, config in tables.items():
                try:
                    check_freshness(engine, pipeline, table, config)
                    check_completeness(engine, pipeline, table, config)
                    check_row_count_anomaly(engine, pipeline, table)
                except Exception as exc:
                    print(f"[ERROR] {pipeline}.{table}: {exc}")
        time.sleep(60)

if __name__ == "__main__":
    main()

Note

Define SLOs before you build monitoring, not after an incident. The most common mistake is adding freshness checks in response to a specific outage — which means you only monitor the one failure mode you already experienced. Survey your data consumers: what would they notice first if something broke? That answer defines your SLO.

Data Observability: Monte Carlo, Elementary, and Great Expectations

Data observability platforms apply the same principles as infrastructure observability — automatic anomaly detection, lineage tracking, incident alerting — to data assets. The three dominant approaches are: Monte Carlo (managed, ML-based anomaly detection), Elementary (open-source dbt-native observability), and Great Expectations (code-first, assertion-based quality checks). Each sits at a different point on the automation-vs-control spectrum.

# Great Expectations — code-first data quality assertions
# pip install great-expectations sqlalchemy psycopg2-binary

import great_expectations as gx
from great_expectations.checkpoint import SimpleCheckpoint

# --- Bootstrap a GX project (run once) ---
context = gx.get_context()

# --- Define a datasource (PostgreSQL warehouse) ---
datasource = context.sources.add_or_update_sql(
    name="warehouse",
    connection_string="postgresql+psycopg2://${DB_USER}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}",
)

# --- Add a data asset (the table to test) ---
asset = datasource.add_table_asset(
    name="orders_fact",
    table_name="orders_fact",
)
batch_request = asset.build_batch_request()

# --- Create or load an Expectation Suite ---
suite_name = "orders_fact.critical"
try:
    suite = context.get_expectation_suite(suite_name)
except Exception:
    suite = context.create_expectation_suite(suite_name)

validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=suite_name,
)

# --- Define expectations ---

# Freshness: max(updated_at) within 10 minutes
validator.expect_column_max_to_be_between(
    column="updated_at",
    min_value={"$PARAMETER": "now() - interval '10 minutes'"},
    max_value={"$PARAMETER": "now()"},
    meta={"notes": "SLO: data must be no older than 10 minutes"},
)

# Completeness: no nulls on required fields
for col in ["order_id", "customer_id", "created_at", "total_amount"]:
    validator.expect_column_values_to_not_be_null(column=col)

# Uniqueness: order_id must be unique
validator.expect_column_values_to_be_unique(column="order_id")

# Schema: total_amount must be non-negative
validator.expect_column_values_to_be_between(
    column="total_amount",
    min_value=0,
    mostly=0.999,   # allow 0.1% negative for refund adjustments
)

# Distribution: order count should not deviate more than 30% from 7d avg
validator.expect_table_row_count_to_be_between(
    min_value={"$PARAMETER": "daily_avg_7d * 0.70"},
    max_value={"$PARAMETER": "daily_avg_7d * 1.30"},
    meta={"notes": "Anomaly detection: 30% deviation threshold"},
)

# Status: only known order statuses
validator.expect_column_values_to_be_in_set(
    column="status",
    value_set=["pending", "confirmed", "shipped", "delivered", "cancelled", "refunded"],
)

validator.save_expectation_suite()

# --- Create a Checkpoint (runs the suite and routes results) ---
checkpoint = SimpleCheckpoint(
    name="orders_fact_checkpoint",
    data_context=context,
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": suite_name,
        }
    ],
    action_list=[
        # Persist results to the Data Docs site
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"},
        },
        # Send Slack alert on failure
        {
            "name": "notify_slack",
            "action": {
                "class_name": "SlackNotificationAction",
                "slack_webhook": "${SLACK_WEBHOOK_URL}",
                "notify_on": "failure",
                "renderer": {
                    "module_name": "great_expectations.render.renderer.slack_renderer",
                    "class_name": "SlackRenderer",
                },
            },
        },
        # Optionally: fail the CI job if expectations fail
        {
            "name": "update_data_docs",
            "action": {"class_name": "UpdateDataDocsAction"},
        },
    ],
)
context.add_or_update_checkpoint(checkpoint=checkpoint)

# --- Run the checkpoint ---
result = context.run_checkpoint(checkpoint_name="orders_fact_checkpoint")
if not result["success"]:
    print("DATA QUALITY FAILURE — see Data Docs for details")
    exit(1)
print("All expectations passed.")
# Elementary (dbt-native observability) — add to your dbt project
# pip install elementary-data
# dbt deps  (after adding to packages.yml)

# packages.yml — add Elementary as a dbt package
packages:
  - package: elementary-data/elementary
    version: ">=0.14.0"

# dbt_project.yml — enable Elementary models
models:
  elementary:
    +schema: elementary      # Elementary creates its own schema

# --- After running: dbt run --select elementary ---
# Elementary auto-instruments every dbt model with:
#   - Row count monitoring (anomaly detection)
#   - Freshness checks (based on model run timestamps)
#   - Schema change detection (column add/remove/type change)
#   - Source freshness from dbt sources.yml

# models/orders/schema.yml — add Elementary tests alongside dbt tests
version: 2
models:
  - name: orders_fact
    columns:
      - name: order_id
        tests:
          - not_null
          - unique
          - elementary.column_anomalies:
              column_anomalies:
                - null_count
                - null_percent
      - name: total_amount
        tests:
          - not_null
          - elementary.column_anomalies:
              column_anomalies:
                - average
                - min_value
                - max_value
                - zero_count
    tests:
      # Table-level anomaly detection
      - elementary.volume_anomalies:
          timestamp_column: created_at
          time_bucket:
            period: hour
            count: 1
      - elementary.freshness_anomalies:
          timestamp_column: updated_at
          max_allowed_delay:
            period: minute
            count: 15
      - elementary.schema_changes

# Run Elementary report locally
# edr report --profiles-dir ~/.dbt --project-dir .

# Send Elementary alerts to Slack on schedule
# edr monitor --slack-webhook ${SLACK_WEBHOOK} --profiles-dir ~/.dbt

Note

Elementary is the lowest-friction path to data observability for teams already using dbt. It instruments every model automatically with row count and freshness monitoring, and the reporting UI integrates with dbt Cloud. For non-dbt pipelines, combine it with Great Expectations checkpoints or build custom Prometheus-based freshness checks like the one shown earlier.

Pipeline Resilience Patterns: Circuit Breakers, Dead Letter Queues, and Idempotency

Monitoring tells you something broke. Resilience patterns are what prevent a single upstream failure from cascading through your entire data platform. The three patterns with the highest reliability ROI are: circuit breakers (stop processing when upstream quality degrades), dead letter queues (capture failed records for reprocessing without blocking the main pipeline), and idempotent writes (make every pipeline step safe to replay).

# pipeline_circuit_breaker.py
# Circuit breaker for data pipelines — stops processing when error rate exceeds threshold
# Implements half-open recovery: attempts a probe batch after cooldown

import time
import datetime
import enum
from dataclasses import dataclass, field
from typing import Callable, Any

class CircuitState(enum.Enum):
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Tripped — rejecting calls
    HALF_OPEN = "half_open" # Probing — one attempt allowed

@dataclass
class CircuitBreaker:
    name: str
    failure_threshold: int = 5          # trips after N consecutive failures
    error_rate_threshold: float = 0.5   # or when error rate > 50% in window
    window_seconds: int = 300           # 5-minute sliding window
    recovery_timeout_seconds: int = 60  # wait before half-open probe

    _state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    _failure_count: int = field(default=0, init=False)
    _success_count: int = field(default=0, init=False)
    _last_failure_time: float = field(default=0.0, init=False)
    _events: list = field(default_factory=list, init=False)

    @property
    def state(self) -> CircuitState:
        if self._state == CircuitState.OPEN:
            # Check if recovery timeout has elapsed
            elapsed = time.time() - self._last_failure_time
            if elapsed >= self.recovery_timeout_seconds:
                self._state = CircuitState.HALF_OPEN
                print(f"[{self.name}] Circuit HALF-OPEN — attempting probe")
        return self._state

    def _record_event(self, success: bool):
        now = time.time()
        self._events.append((now, success))
        # Prune events outside the sliding window
        cutoff = now - self.window_seconds
        self._events = [(t, s) for t, s in self._events if t >= cutoff]

    def _window_error_rate(self) -> float:
        if not self._events:
            return 0.0
        failures = sum(1 for _, s in self._events if not s)
        return failures / len(self._events)

    def call(self, func: Callable, *args, **kwargs) -> Any:
        state = self.state

        if state == CircuitState.OPEN:
            raise RuntimeError(
                f"[{self.name}] Circuit OPEN — upstream quality degraded. "
                f"Retry after {self.recovery_timeout_seconds}s cooldown."
            )

        try:
            result = func(*args, **kwargs)
            self._record_event(True)
            self._failure_count = 0
            self._success_count += 1

            if state == CircuitState.HALF_OPEN:
                self._state = CircuitState.CLOSED
                print(f"[{self.name}] Circuit CLOSED — upstream recovered")

            return result

        except Exception as exc:
            self._record_event(False)
            self._failure_count += 1
            self._last_failure_time = time.time()

            consecutive_tripped = self._failure_count >= self.failure_threshold
            rate_tripped = self._window_error_rate() >= self.error_rate_threshold

            if consecutive_tripped or rate_tripped:
                self._state = CircuitState.OPEN
                print(
                    f"[{self.name}] Circuit OPEN — "
                    f"consecutive={self._failure_count}, "
                    f"error_rate={self._window_error_rate():.0%}"
                )

            raise


# --- Usage in a Kafka consumer pipeline ---

import json
from kafka import KafkaConsumer, KafkaProducer

def enrich_order(record: dict) -> dict:
    """Calls an upstream enrichment service — may fail."""
    # ... HTTP call to enrichment API
    return {**record, "enriched": True}

def run_pipeline():
    consumer = KafkaConsumer(
        "orders.raw",
        bootstrap_servers=["kafka:9092"],
        group_id="order-enrichment",
        value_deserializer=lambda v: json.loads(v.decode()),
        auto_offset_reset="earliest",
        enable_auto_commit=False,
    )
    producer = KafkaProducer(
        bootstrap_servers=["kafka:9092"],
        value_serializer=lambda v: json.dumps(v).encode(),
    )
    dlq_producer = KafkaProducer(  # Dead Letter Queue producer
        bootstrap_servers=["kafka:9092"],
        value_serializer=lambda v: json.dumps(v).encode(),
    )

    breaker = CircuitBreaker(
        name="enrichment-service",
        failure_threshold=5,
        error_rate_threshold=0.40,
        window_seconds=120,
        recovery_timeout_seconds=30,
    )

    for message in consumer:
        record = message.value
        try:
            enriched = breaker.call(enrich_order, record)
            producer.send("orders.enriched", value=enriched)
            consumer.commit()

        except RuntimeError as circuit_open:
            # Circuit is open — pause and wait for recovery
            print(f"Circuit open, sleeping 30s: {circuit_open}")
            time.sleep(30)
            # Do NOT commit — will re-process from last committed offset

        except Exception as exc:
            # Individual record failure — send to DLQ
            dlq_record = {
                "original_record": record,
                "error": str(exc),
                "error_type": type(exc).__name__,
                "timestamp": datetime.datetime.utcnow().isoformat(),
                "topic": message.topic,
                "partition": message.partition,
                "offset": message.offset,
                "retry_count": record.get("_retry_count", 0),
            }
            dlq_producer.send("orders.enriched.dlq", value=dlq_record)
            consumer.commit()  # Commit — don't block the pipeline for this record
            print(f"[DLQ] Sent failed record offset={message.offset}: {exc}")
# dlq_reprocessor.py
# Dead letter queue reprocessor — retries failed records with exponential backoff
# Designed for idempotent re-ingestion

import json
import time
import datetime
from kafka import KafkaConsumer, KafkaProducer

MAX_RETRIES = 3
BASE_BACKOFF_SECONDS = 60   # 1m, 2m, 4m backoff schedule

def should_retry(dlq_record: dict) -> bool:
    retry_count = dlq_record.get("retry_count", 0)
    error_type = dlq_record.get("error_type", "")

    # Never retry these — they indicate data problems, not transient failures
    terminal_errors = {
        "ValidationError",
        "SchemaError",
        "UnicodeDecodeError",
        "KeyError",         # missing required field
    }
    if error_type in terminal_errors:
        return False

    return retry_count < MAX_RETRIES

def reprocess():
    consumer = KafkaConsumer(
        "orders.enriched.dlq",
        bootstrap_servers=["kafka:9092"],
        group_id="dlq-reprocessor",
        value_deserializer=lambda v: json.loads(v.decode()),
        auto_offset_reset="earliest",
    )
    producer = KafkaProducer(
        bootstrap_servers=["kafka:9092"],
        value_serializer=lambda v: json.dumps(v).encode(),
    )
    dead_letter_final = KafkaProducer(
        bootstrap_servers=["kafka:9092"],
        value_serializer=lambda v: json.dumps(v).encode(),
    )

    for message in consumer:
        dlq_record = message.value
        retry_count = dlq_record.get("retry_count", 0)
        original = dlq_record.get("original_record", {})

        if not should_retry(dlq_record):
            # Send to final dead letter — needs manual inspection
            dead_letter_final.send("orders.enriched.dead", value={
                **dlq_record,
                "final_dead_at": datetime.datetime.utcnow().isoformat(),
                "reason": "max_retries_exceeded or terminal_error",
            })
            print(f"[DEAD] Moved to final DLQ: {dlq_record.get('error_type')}")
            continue

        # Exponential backoff: sleep before retrying
        backoff = BASE_BACKOFF_SECONDS * (2 ** retry_count)
        print(f"[RETRY {retry_count+1}/{MAX_RETRIES}] Backoff {backoff}s for: {original.get('order_id')}")
        time.sleep(min(backoff, 3600))

        # Re-publish to the main topic with incremented retry counter
        retry_record = {
            **original,
            "_retry_count": retry_count + 1,
            "_dlq_error": dlq_record.get("error"),
            "_first_failed_at": dlq_record.get("timestamp"),
        }
        producer.send("orders.raw", value=retry_record)
        print(f"[RETRY] Re-queued order_id={original.get('order_id')}")

Idempotent Writes: Making Every Pipeline Step Safe to Replay

The most powerful resilience property is idempotency: running the same pipeline step twice produces the same result as running it once. Idempotent pipelines can be safely retried, replayed from checkpoints, and re-run after failures without producing duplicates or inconsistencies. In practice, this means using MERGE/UPSERT semantics instead of INSERT, partition-based overwrites instead of appends, and deterministic IDs derived from content rather than auto-increment sequences.

-- PostgreSQL: idempotent upsert pattern
-- Using INSERT ... ON CONFLICT for safe-to-replay ingestion

-- Table with a natural key constraint
CREATE TABLE orders_fact (
    order_id        UUID        PRIMARY KEY,
    customer_id     UUID        NOT NULL,
    status          TEXT        NOT NULL,
    total_amount    NUMERIC(12,2) NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL,
    updated_at      TIMESTAMPTZ NOT NULL,
    -- Pipeline metadata — helps with debugging and reprocessing
    _ingested_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    _source_topic   TEXT,
    _source_offset  BIGINT,
    _pipeline_run   UUID        -- identifies which pipeline run wrote this
);

-- Idempotent upsert: safe to run multiple times for the same order_id
INSERT INTO orders_fact (
    order_id, customer_id, status, total_amount,
    created_at, updated_at,
    _ingested_at, _source_topic, _source_offset, _pipeline_run
)
VALUES (
    :order_id, :customer_id, :status, :total_amount,
    :created_at, :updated_at,
    NOW(), :source_topic, :source_offset, :pipeline_run
)
ON CONFLICT (order_id) DO UPDATE SET
    status         = EXCLUDED.status,
    total_amount   = EXCLUDED.total_amount,
    updated_at     = EXCLUDED.updated_at,
    _ingested_at   = EXCLUDED._ingested_at,
    _source_offset = EXCLUDED._source_offset,
    _pipeline_run  = EXCLUDED._pipeline_run
-- Only update if the incoming record is newer (prevents out-of-order overwrites)
WHERE orders_fact.updated_at < EXCLUDED.updated_at;

-- Verify idempotency: count rows changed in this pipeline run
SELECT
    COUNT(*) FILTER (WHERE _pipeline_run = :pipeline_run AND xmin::text::bigint > 0) AS rows_upserted,
    COUNT(*) FILTER (WHERE _pipeline_run = :pipeline_run) AS rows_total
FROM orders_fact
WHERE _ingested_at >= NOW() - INTERVAL '1 hour';
# Spark: idempotent partition overwrite for batch pipelines
# Delta Lake merge pattern — safe to replay without duplicates

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from delta.tables import DeltaTable
import datetime

spark = SparkSession.builder     .appName("orders-idempotent-load")     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")     .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")     .getOrCreate()

# Read incoming batch (e.g. from S3 landing zone)
incoming = spark.read.json("s3://data-landing/orders/2026-05-08/")

# Add pipeline metadata for auditability
pipeline_run_id = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
incoming = incoming.withColumn("_pipeline_run", F.lit(pipeline_run_id))                    .withColumn("_ingested_at", F.current_timestamp())

target_path = "s3://data-warehouse/orders_fact"

if DeltaTable.isDeltaTable(spark, target_path):
    target = DeltaTable.forPath(spark, target_path)

    # MERGE: insert new records, update existing only if newer
    target.alias("target").merge(
        incoming.alias("source"),
        "target.order_id = source.order_id"
    ).whenMatchedUpdate(
        condition="source.updated_at > target.updated_at",
        set={
            "status":         "source.status",
            "total_amount":   "source.total_amount",
            "updated_at":     "source.updated_at",
            "_pipeline_run":  "source._pipeline_run",
            "_ingested_at":   "source._ingested_at",
        }
    ).whenNotMatchedInsertAll()      .execute()
else:
    # First write: just write the table
    incoming.write.format("delta").partitionBy("DATE(created_at)").save(target_path)

# Compact small files every 10 runs (idempotent OPTIMIZE)
target = DeltaTable.forPath(spark, target_path)
target.optimize().executeCompaction()

# Vacuum old snapshots (keeps 7 days for time travel)
target.vacuum(retentionHours=168)

print(f"Pipeline run {pipeline_run_id} complete")

Note

The most reliable test for idempotency: run your pipeline twice on the same input in a staging environment and compare row counts and checksums. If anything differs, the pipeline is not idempotent. Automate this as a CI test — it takes 5 minutes to write and will catch regressions before they reach production.

Quantifying Data Downtime Cost: A Framework

Before investing in reliability tooling, quantify the actual cost of your current downtime. This makes the business case concrete and helps prioritise which pipelines to protect first. The framework has four components: detection time, impact breadth, impact depth, and recovery cost.

# data_downtime_cost_model.py
# Quantify the business cost of data downtime for a given pipeline
# Adapt the cost inputs to your organisation's context

from dataclasses import dataclass
from typing import Optional

@dataclass
class PipelineProfile:
    name: str
    # --- Detection ---
    avg_detection_minutes: float     # How long until someone notices
    # --- Breadth: how many downstream consumers ---
    downstream_dashboards: int       # BI dashboards that go stale
    downstream_ml_models: int        # ML models that score stale features
    downstream_pipelines: int        # Other pipelines that read this one
    downstream_api_consumers: int    # APIs / apps that query this data
    # --- Depth: cost per unit of downtime ---
    revenue_per_hour_usd: Optional[float] = None  # if pipeline drives revenue
    decisions_per_hour: int = 0         # high-stakes decisions made with this data
    decision_error_rate: float = 0.05   # fraction of decisions wrong when data stale
    decision_cost_usd: float = 100.0    # avg cost of a wrong decision
    # --- Engineering cost ---
    engineer_hourly_cost_usd: float = 120.0  # fully loaded cost
    avg_remediation_hours: float = 2.0
    avg_incidents_per_month: int = 3

def calculate_monthly_cost(p: PipelineProfile) -> dict:
    # Detection + triage time
    detection_hours = p.avg_detection_minutes / 60
    triage_cost = detection_hours * p.engineer_hourly_cost_usd

    # Remediation cost per incident
    remediation_cost = p.avg_remediation_hours * p.engineer_hourly_cost_usd

    # Downstream impact: assume 1 hour of stale data per incident on average
    stale_hours_per_incident = 1.0 + detection_hours

    # Revenue impact (if applicable)
    revenue_impact = 0.0
    if p.revenue_per_hour_usd:
        revenue_impact = p.revenue_per_hour_usd * stale_hours_per_incident * 0.10
        # Conservative: assume 10% of revenue affected by data quality

    # Decision quality impact
    decisions_impacted = p.decisions_per_hour * stale_hours_per_incident
    decision_impact = decisions_impacted * p.decision_error_rate * p.decision_cost_usd

    # Stakeholder time: assume 30m per affected consumer team per incident
    stakeholder_cost = (
        p.downstream_dashboards * 0.5 +
        p.downstream_ml_models * 1.0 +
        p.downstream_pipelines * 0.5 +
        p.downstream_api_consumers * 0.25
    ) * p.engineer_hourly_cost_usd

    cost_per_incident = (
        triage_cost + remediation_cost + revenue_impact +
        decision_impact + stakeholder_cost
    )
    monthly_cost = cost_per_incident * p.avg_incidents_per_month

    return {
        "pipeline": p.name,
        "cost_per_incident_usd": round(cost_per_incident, 2),
        "monthly_cost_usd": round(monthly_cost, 2),
        "annual_cost_usd": round(monthly_cost * 12, 2),
        "breakdown": {
            "triage_usd": round(triage_cost, 2),
            "remediation_usd": round(remediation_cost, 2),
            "revenue_impact_usd": round(revenue_impact, 2),
            "decision_impact_usd": round(decision_impact, 2),
            "stakeholder_usd": round(stakeholder_cost, 2),
        },
    }


# Example: orders pipeline in an e-commerce company
orders_pipeline = PipelineProfile(
    name="orders_fact",
    avg_detection_minutes=45,
    downstream_dashboards=8,
    downstream_ml_models=3,
    downstream_pipelines=5,
    downstream_api_consumers=2,
    revenue_per_hour_usd=50_000,    # $50k/hr in checkout revenue
    decisions_per_hour=20,
    decision_error_rate=0.08,
    decision_cost_usd=500,
    engineer_hourly_cost_usd=150,
    avg_remediation_hours=2.5,
    avg_incidents_per_month=4,
)

result = calculate_monthly_cost(orders_pipeline)
print(f"Pipeline: {result['pipeline']}")
print(f"Cost per incident: ${result['cost_per_incident_usd']:,.0f}")
print(f"Monthly cost:      ${result['monthly_cost_usd']:,.0f}")
print(f"Annual cost:       ${result['annual_cost_usd']:,.0f}")
print(f"Breakdown: {result['breakdown']}")
# Output:
# Pipeline: orders_fact
# Cost per incident: $7,875.00
# Monthly cost:      $31,500.00
# Annual cost:       $378,000.00

Pipeline Reliability Tiers: Prioritise What Matters

Not every pipeline needs five-nines reliability. Applying gold-tier monitoring to a weekly analytics refresh wastes engineering effort and creates alert fatigue. Define reliability tiers based on the cost model above, and invest accordingly. The tier model makes tradeoffs explicit and helps data teams say "no" to unrealistic SLA demands for low-priority pipelines.

Tier 1 — Mission Critical

Revenue-generating pipelines, regulatory reporting, customer-facing data products. SLO: 99.9% freshness (<5 min lag), 99.5% completeness. Circuit breakers enabled. DLQ + automatic reprocessing. PagerDuty alerts. Full data observability. Reviewed weekly.

Tier 2 — Business Critical

Core business intelligence, marketing analytics, inventory management. SLO: 99% freshness (<30 min lag), 99% completeness. Slack alerts. DLQ without automatic reprocessing. Elementary or GX checks. Reviewed monthly.

Tier 3 — Operational

Internal dashboards, data science experiments, non-time-sensitive analytics. SLO: 95% freshness (<2 hour lag), 95% completeness. Email alerts. Basic row count monitoring. No circuit breaker. Reviewed quarterly.

Tier 4 — Best Effort

Exploratory data, archived datasets, backfill jobs. No formal SLO. Alert on job failure only. Manual remediation. No formal review cadence.

Alert Design and Runbooks: From Detection to Resolution

The worst data incidents are the ones where engineers receive an alert, spend 30 minutes determining what it means, another 30 finding the cause, and another hour figuring out the remediation. Good runbooks collapse that to 15 minutes total. Every data pipeline SLO alert should link directly to a runbook covering: what broke, likely causes (ranked by frequency), investigation steps, and remediation procedures.

# Prometheus alerting rules for data pipeline SLOs
# prometheus/rules/data_pipelines.yml

groups:
  - name: data_pipeline_slos
    interval: 60s
    rules:

      # Freshness SLO breach — Tier 1 pipelines
      - alert: DataFreshnessCritical
        expr: |
          data_pipeline_freshness_seconds{pipeline="orders"} > 300
        for: 2m
        labels:
          severity: critical
          team: data-platform
          tier: "1"
        annotations:
          summary: "Orders pipeline data is stale ({{ $value | humanizeDuration }})"
          description: |
            Pipeline {{ $labels.pipeline }}.{{ $labels.table }} has not
            received new data for {{ $value | humanizeDuration }}.
            SLO: freshness < 5 minutes.
          runbook_url: "https://wiki.internal/runbooks/data/orders-freshness"
          dashboard_url: "https://grafana.internal/d/data-pipelines"

      # Completeness SLO breach
      - alert: DataCompletenessWarning
        expr: |
          data_pipeline_completeness_ratio < 0.95
        for: 5m
        labels:
          severity: warning
          team: data-platform
        annotations:
          summary: "Data completeness below SLO ({{ $value | humanizePercentage }})"
          description: |
            {{ $labels.pipeline }}.{{ $labels.table }} completeness is
            {{ $value | humanizePercentage }} in the last {{ $labels.window }}.
            SLO: >= 95%.
          runbook_url: "https://wiki.internal/runbooks/data/completeness"

      # Row count anomaly
      - alert: DataRowCountAnomaly
        expr: |
          data_pipeline_row_count_anomaly_score > 3
        for: 0m
        labels:
          severity: warning
          team: data-platform
        annotations:
          summary: "Unusual row count in {{ $labels.table }} (Z-score: {{ $value | humanize }})"
          description: |
            Row count in {{ $labels.pipeline }}.{{ $labels.table }} is
            {{ $value | humanize }} standard deviations from the 30-day average.
            Investigate for upstream data loss or volume spike.
          runbook_url: "https://wiki.internal/runbooks/data/row-count-anomaly"

      # SLO breach rate (multi-burn-rate alerting)
      - alert: DataSLOBurnRateHigh
        expr: |
          rate(data_pipeline_slo_breach_total[1h]) > 0.1
        for: 5m
        labels:
          severity: critical
          team: data-platform
        annotations:
          summary: "High SLO breach rate for {{ $labels.pipeline }}"
          description: |
            Pipeline {{ $labels.pipeline }} is breaching SLO at
            {{ $value | humanize }}/sec. At this rate the monthly
            error budget will be exhausted within hours.

Data Reliability Production Checklist

Every Tier 1 pipeline has a freshness SLO with a Prometheus alert

Defined in terms business stakeholders understand (e.g. 'orders data must be no older than 5 minutes'), not internal engineering metrics like 'Kafka lag < 1000'.

Schema changes trigger alerts before they reach downstream consumers

Column renames, type changes, and column drops are the leading cause of silent data corruption. Elementary or Protobuf/Avro schema registries enforce compatibility rules automatically.

Every pipeline write is idempotent

Test idempotency by replaying the last 24 hours of data in staging and comparing checksums. Add this to your CI pipeline. Non-idempotent pipelines are a ticking clock.

Dead letter queues capture every failed record

Failed records sent to /dev/null are data losses. DLQs preserve them for inspection and reprocessing. Include original payload, error details, and source offset so reprocessing can pick up exactly where it failed.

Circuit breakers protect Tier 1 consumers from upstream degradation

Without circuit breakers, a degraded upstream service causes cascading failures across all consumers. A tripped circuit with a clear error message is better than stale data silently flowing downstream.

Runbooks exist for every Tier 1 and Tier 2 alert

The runbook must be maintained by the on-call rotation — not just written once and forgotten. Link it directly from the alert annotation. If the alert fires and the runbook is outdated, update it during the incident, not after.

Data downtime cost is calculated and reviewed quarterly

Use the cost model to prioritise reliability investments. If the annual cost of a pipeline's downtime is $50k and the fix costs $20k in engineering time, the ROI is obvious. Make this number visible to stakeholders.

Error budget policy is defined and enforced

When a Tier 1 pipeline exhausts its monthly error budget (e.g. cumulative freshness breaches > 0.1%), stop feature work and focus exclusively on reliability until the budget is restored. This creates the right incentives.

Work with us

Struggling with unreliable data pipelines and unclear SLA impact?

We design and implement data reliability frameworks — from freshness SLOs and data observability instrumentation to circuit breakers, dead letter queues, and idempotent pipeline architectures that eliminate silent data loss. Let’s talk.

Get in touch

Related Articles