What Data Downtime Actually Costs
Data downtime is the period during which data is missing, inaccurate, or otherwise unreliable. Unlike infrastructure downtime — where a service is clearly unavailable — data downtime is often silent. A dashboard shows stale numbers. An ML model scores on yesterday's features. A finance report misses three hours of transactions. Nobody knows until a business analyst calls at 8 AM, or worse, a customer notices an anomaly.
The costs are real and compound quickly. According to Monte Carlo Data's 2022 State of Data Quality survey, data engineers spend an average of 40% of their time on data quality issues. Gartner estimates poor data quality costs organisations an average of $12.9 million per year. But the direct costs — engineering hours debugging pipelines — are only the surface. The deeper costs hit decision quality, customer trust, and regulatory exposure.
Direct Costs
Engineering time to detect, diagnose, and remediate pipeline failures. SLA breach penalties if contractual data delivery commitments are violated. Reprocessing compute costs when bad data must be corrected and replayed through downstream systems.
Decision Quality Costs
Business decisions made on stale or incorrect data. Marketing campaigns targeting wrong segments. Inventory systems buying excess stock. Fraud models missing new attack patterns. These costs are invisible on engineering dashboards but very visible in quarterly results.
Trust and Compliance Costs
Once business stakeholders lose confidence in data, they stop using it — reverting to spreadsheets and instinct. Rebuilding data trust takes months. Regulatory exposure from inaccurate reporting (GDPR, SOX, HIPAA) can exceed all other costs combined.
Measuring SLA Impact: Data Freshness and Completeness SLOs
The first step toward reliability is measurement. Most data teams lack formal SLOs for their pipelines. Without them, "the pipeline is down" is as specific as the incident report gets. Define SLOs in terms users care about: freshness (how recent is the latest data?), completeness (is any data missing?), schema validity (does the data match expected shape?), and distribution consistency (are values within expected ranges?).
The Gartner Data Quality framework identifies six dimensions: accuracy, completeness, consistency, timeliness, uniqueness, and validity. For operational data pipelines, the four most tractable dimensions to instrument are freshness, completeness, schema validity, and row-count anomaly detection.
# data_slo_tracker.py
# Tracks pipeline SLOs and exposes Prometheus metrics
# pip install prometheus-client sqlalchemy psycopg2-binary
import time
import datetime
from prometheus_client import start_http_server, Gauge, Counter
from sqlalchemy import create_engine, text
import os
# --- Prometheus metrics ---
FRESHNESS_SECONDS = Gauge(
"data_pipeline_freshness_seconds",
"Seconds since the latest record was ingested",
["pipeline", "table"],
)
COMPLETENESS_RATIO = Gauge(
"data_pipeline_completeness_ratio",
"Ratio of expected rows received vs expected (1.0 = complete)",
["pipeline", "table", "window"],
)
SLO_BREACH_TOTAL = Counter(
"data_pipeline_slo_breach_total",
"Number of SLO breaches detected",
["pipeline", "table", "slo_type"],
)
ROW_COUNT_ANOMALY = Gauge(
"data_pipeline_row_count_anomaly_score",
"Z-score of current row count vs 30-day average (|score| > 3 = anomaly)",
["pipeline", "table"],
)
# --- SLO definitions: pipeline -> table -> thresholds ---
SLO_CONFIG = {
"orders": {
"orders_fact": {
"max_freshness_seconds": 300, # 5-minute SLO
"completeness_window_minutes": 60,
"expected_rows_per_hour": 2000,
"completeness_threshold": 0.95, # 95% minimum
},
},
"analytics": {
"sessions_daily": {
"max_freshness_seconds": 3600, # 1-hour SLO
"completeness_window_minutes": 1440,
"expected_rows_per_hour": 500,
"completeness_threshold": 0.99,
},
},
}
def check_freshness(engine, pipeline: str, table: str, config: dict):
"""Query max(updated_at) and compute lag in seconds."""
with engine.connect() as conn:
row = conn.execute(
text(f"SELECT MAX(updated_at) AS latest FROM {table}")
).fetchone()
latest = row.latest if row and row.latest else None
if latest is None:
lag = float("inf")
else:
lag = (datetime.datetime.utcnow() - latest).total_seconds()
FRESHNESS_SECONDS.labels(pipeline=pipeline, table=table).set(lag)
slo_max = config["max_freshness_seconds"]
if lag > slo_max:
SLO_BREACH_TOTAL.labels(
pipeline=pipeline, table=table, slo_type="freshness"
).inc()
print(f"[BREACH] {pipeline}.{table}: freshness {lag:.0f}s > SLO {slo_max}s")
return lag
def check_completeness(engine, pipeline: str, table: str, config: dict):
"""Compare actual row count in the last window vs expected."""
window = config["completeness_window_minutes"]
expected_rate = config["expected_rows_per_hour"]
threshold = config["completeness_threshold"]
expected = expected_rate * (window / 60)
with engine.connect() as conn:
row = conn.execute(
text(
f"""
SELECT COUNT(*) AS actual
FROM {table}
WHERE updated_at >= NOW() - INTERVAL '{window} minutes'
"""
)
).fetchone()
actual = row.actual if row else 0
ratio = actual / max(expected, 1)
COMPLETENESS_RATIO.labels(
pipeline=pipeline, table=table, window=f"{window}m"
).set(ratio)
if ratio < threshold:
SLO_BREACH_TOTAL.labels(
pipeline=pipeline, table=table, slo_type="completeness"
).inc()
print(
f"[BREACH] {pipeline}.{table}: completeness {ratio:.2%} < SLO {threshold:.0%} "
f"(got {actual}, expected ~{expected:.0f} in last {window}m)"
)
return ratio
def check_row_count_anomaly(engine, pipeline: str, table: str):
"""Compute Z-score of today's row count vs 30-day rolling average."""
with engine.connect() as conn:
stats = conn.execute(
text(
f"""
WITH daily AS (
SELECT DATE(updated_at) AS day, COUNT(*) AS cnt
FROM {table}
WHERE updated_at >= NOW() - INTERVAL '31 days'
GROUP BY 1
)
SELECT
AVG(cnt) AS mean,
STDDEV(cnt) AS stddev,
MAX(CASE WHEN day = CURRENT_DATE THEN cnt END) AS today
FROM daily
WHERE day < CURRENT_DATE OR day = CURRENT_DATE
"""
)
).fetchone()
if not stats or stats.stddev is None or float(stats.stddev) == 0:
return 0.0
z = (float(stats.today or 0) - float(stats.mean)) / float(stats.stddev)
ROW_COUNT_ANOMALY.labels(pipeline=pipeline, table=table).set(abs(z))
if abs(z) > 3:
print(f"[ANOMALY] {pipeline}.{table}: row count Z-score {z:.2f}")
return z
def main():
db_url = os.environ["DATABASE_URL"] # postgresql://user:pass@host/db
engine = create_engine(db_url)
start_http_server(8081)
print("Data SLO tracker running on :8081/metrics")
while True:
for pipeline, tables in SLO_CONFIG.items():
for table, config in tables.items():
try:
check_freshness(engine, pipeline, table, config)
check_completeness(engine, pipeline, table, config)
check_row_count_anomaly(engine, pipeline, table)
except Exception as exc:
print(f"[ERROR] {pipeline}.{table}: {exc}")
time.sleep(60)
if __name__ == "__main__":
main()Note
Data Observability: Monte Carlo, Elementary, and Great Expectations
Data observability platforms apply the same principles as infrastructure observability — automatic anomaly detection, lineage tracking, incident alerting — to data assets. The three dominant approaches are: Monte Carlo (managed, ML-based anomaly detection), Elementary (open-source dbt-native observability), and Great Expectations (code-first, assertion-based quality checks). Each sits at a different point on the automation-vs-control spectrum.
# Great Expectations — code-first data quality assertions
# pip install great-expectations sqlalchemy psycopg2-binary
import great_expectations as gx
from great_expectations.checkpoint import SimpleCheckpoint
# --- Bootstrap a GX project (run once) ---
context = gx.get_context()
# --- Define a datasource (PostgreSQL warehouse) ---
datasource = context.sources.add_or_update_sql(
name="warehouse",
connection_string="postgresql+psycopg2://${DB_USER}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}",
)
# --- Add a data asset (the table to test) ---
asset = datasource.add_table_asset(
name="orders_fact",
table_name="orders_fact",
)
batch_request = asset.build_batch_request()
# --- Create or load an Expectation Suite ---
suite_name = "orders_fact.critical"
try:
suite = context.get_expectation_suite(suite_name)
except Exception:
suite = context.create_expectation_suite(suite_name)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=suite_name,
)
# --- Define expectations ---
# Freshness: max(updated_at) within 10 minutes
validator.expect_column_max_to_be_between(
column="updated_at",
min_value={"$PARAMETER": "now() - interval '10 minutes'"},
max_value={"$PARAMETER": "now()"},
meta={"notes": "SLO: data must be no older than 10 minutes"},
)
# Completeness: no nulls on required fields
for col in ["order_id", "customer_id", "created_at", "total_amount"]:
validator.expect_column_values_to_not_be_null(column=col)
# Uniqueness: order_id must be unique
validator.expect_column_values_to_be_unique(column="order_id")
# Schema: total_amount must be non-negative
validator.expect_column_values_to_be_between(
column="total_amount",
min_value=0,
mostly=0.999, # allow 0.1% negative for refund adjustments
)
# Distribution: order count should not deviate more than 30% from 7d avg
validator.expect_table_row_count_to_be_between(
min_value={"$PARAMETER": "daily_avg_7d * 0.70"},
max_value={"$PARAMETER": "daily_avg_7d * 1.30"},
meta={"notes": "Anomaly detection: 30% deviation threshold"},
)
# Status: only known order statuses
validator.expect_column_values_to_be_in_set(
column="status",
value_set=["pending", "confirmed", "shipped", "delivered", "cancelled", "refunded"],
)
validator.save_expectation_suite()
# --- Create a Checkpoint (runs the suite and routes results) ---
checkpoint = SimpleCheckpoint(
name="orders_fact_checkpoint",
data_context=context,
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": suite_name,
}
],
action_list=[
# Persist results to the Data Docs site
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
# Send Slack alert on failure
{
"name": "notify_slack",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "${SLACK_WEBHOOK_URL}",
"notify_on": "failure",
"renderer": {
"module_name": "great_expectations.render.renderer.slack_renderer",
"class_name": "SlackRenderer",
},
},
},
# Optionally: fail the CI job if expectations fail
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
],
)
context.add_or_update_checkpoint(checkpoint=checkpoint)
# --- Run the checkpoint ---
result = context.run_checkpoint(checkpoint_name="orders_fact_checkpoint")
if not result["success"]:
print("DATA QUALITY FAILURE — see Data Docs for details")
exit(1)
print("All expectations passed.")# Elementary (dbt-native observability) — add to your dbt project
# pip install elementary-data
# dbt deps (after adding to packages.yml)
# packages.yml — add Elementary as a dbt package
packages:
- package: elementary-data/elementary
version: ">=0.14.0"
# dbt_project.yml — enable Elementary models
models:
elementary:
+schema: elementary # Elementary creates its own schema
# --- After running: dbt run --select elementary ---
# Elementary auto-instruments every dbt model with:
# - Row count monitoring (anomaly detection)
# - Freshness checks (based on model run timestamps)
# - Schema change detection (column add/remove/type change)
# - Source freshness from dbt sources.yml
# models/orders/schema.yml — add Elementary tests alongside dbt tests
version: 2
models:
- name: orders_fact
columns:
- name: order_id
tests:
- not_null
- unique
- elementary.column_anomalies:
column_anomalies:
- null_count
- null_percent
- name: total_amount
tests:
- not_null
- elementary.column_anomalies:
column_anomalies:
- average
- min_value
- max_value
- zero_count
tests:
# Table-level anomaly detection
- elementary.volume_anomalies:
timestamp_column: created_at
time_bucket:
period: hour
count: 1
- elementary.freshness_anomalies:
timestamp_column: updated_at
max_allowed_delay:
period: minute
count: 15
- elementary.schema_changes
# Run Elementary report locally
# edr report --profiles-dir ~/.dbt --project-dir .
# Send Elementary alerts to Slack on schedule
# edr monitor --slack-webhook ${SLACK_WEBHOOK} --profiles-dir ~/.dbtNote
Pipeline Resilience Patterns: Circuit Breakers, Dead Letter Queues, and Idempotency
Monitoring tells you something broke. Resilience patterns are what prevent a single upstream failure from cascading through your entire data platform. The three patterns with the highest reliability ROI are: circuit breakers (stop processing when upstream quality degrades), dead letter queues (capture failed records for reprocessing without blocking the main pipeline), and idempotent writes (make every pipeline step safe to replay).
# pipeline_circuit_breaker.py
# Circuit breaker for data pipelines — stops processing when error rate exceeds threshold
# Implements half-open recovery: attempts a probe batch after cooldown
import time
import datetime
import enum
from dataclasses import dataclass, field
from typing import Callable, Any
class CircuitState(enum.Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Tripped — rejecting calls
HALF_OPEN = "half_open" # Probing — one attempt allowed
@dataclass
class CircuitBreaker:
name: str
failure_threshold: int = 5 # trips after N consecutive failures
error_rate_threshold: float = 0.5 # or when error rate > 50% in window
window_seconds: int = 300 # 5-minute sliding window
recovery_timeout_seconds: int = 60 # wait before half-open probe
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_count: int = field(default=0, init=False)
_success_count: int = field(default=0, init=False)
_last_failure_time: float = field(default=0.0, init=False)
_events: list = field(default_factory=list, init=False)
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
# Check if recovery timeout has elapsed
elapsed = time.time() - self._last_failure_time
if elapsed >= self.recovery_timeout_seconds:
self._state = CircuitState.HALF_OPEN
print(f"[{self.name}] Circuit HALF-OPEN — attempting probe")
return self._state
def _record_event(self, success: bool):
now = time.time()
self._events.append((now, success))
# Prune events outside the sliding window
cutoff = now - self.window_seconds
self._events = [(t, s) for t, s in self._events if t >= cutoff]
def _window_error_rate(self) -> float:
if not self._events:
return 0.0
failures = sum(1 for _, s in self._events if not s)
return failures / len(self._events)
def call(self, func: Callable, *args, **kwargs) -> Any:
state = self.state
if state == CircuitState.OPEN:
raise RuntimeError(
f"[{self.name}] Circuit OPEN — upstream quality degraded. "
f"Retry after {self.recovery_timeout_seconds}s cooldown."
)
try:
result = func(*args, **kwargs)
self._record_event(True)
self._failure_count = 0
self._success_count += 1
if state == CircuitState.HALF_OPEN:
self._state = CircuitState.CLOSED
print(f"[{self.name}] Circuit CLOSED — upstream recovered")
return result
except Exception as exc:
self._record_event(False)
self._failure_count += 1
self._last_failure_time = time.time()
consecutive_tripped = self._failure_count >= self.failure_threshold
rate_tripped = self._window_error_rate() >= self.error_rate_threshold
if consecutive_tripped or rate_tripped:
self._state = CircuitState.OPEN
print(
f"[{self.name}] Circuit OPEN — "
f"consecutive={self._failure_count}, "
f"error_rate={self._window_error_rate():.0%}"
)
raise
# --- Usage in a Kafka consumer pipeline ---
import json
from kafka import KafkaConsumer, KafkaProducer
def enrich_order(record: dict) -> dict:
"""Calls an upstream enrichment service — may fail."""
# ... HTTP call to enrichment API
return {**record, "enriched": True}
def run_pipeline():
consumer = KafkaConsumer(
"orders.raw",
bootstrap_servers=["kafka:9092"],
group_id="order-enrichment",
value_deserializer=lambda v: json.loads(v.decode()),
auto_offset_reset="earliest",
enable_auto_commit=False,
)
producer = KafkaProducer(
bootstrap_servers=["kafka:9092"],
value_serializer=lambda v: json.dumps(v).encode(),
)
dlq_producer = KafkaProducer( # Dead Letter Queue producer
bootstrap_servers=["kafka:9092"],
value_serializer=lambda v: json.dumps(v).encode(),
)
breaker = CircuitBreaker(
name="enrichment-service",
failure_threshold=5,
error_rate_threshold=0.40,
window_seconds=120,
recovery_timeout_seconds=30,
)
for message in consumer:
record = message.value
try:
enriched = breaker.call(enrich_order, record)
producer.send("orders.enriched", value=enriched)
consumer.commit()
except RuntimeError as circuit_open:
# Circuit is open — pause and wait for recovery
print(f"Circuit open, sleeping 30s: {circuit_open}")
time.sleep(30)
# Do NOT commit — will re-process from last committed offset
except Exception as exc:
# Individual record failure — send to DLQ
dlq_record = {
"original_record": record,
"error": str(exc),
"error_type": type(exc).__name__,
"timestamp": datetime.datetime.utcnow().isoformat(),
"topic": message.topic,
"partition": message.partition,
"offset": message.offset,
"retry_count": record.get("_retry_count", 0),
}
dlq_producer.send("orders.enriched.dlq", value=dlq_record)
consumer.commit() # Commit — don't block the pipeline for this record
print(f"[DLQ] Sent failed record offset={message.offset}: {exc}")# dlq_reprocessor.py
# Dead letter queue reprocessor — retries failed records with exponential backoff
# Designed for idempotent re-ingestion
import json
import time
import datetime
from kafka import KafkaConsumer, KafkaProducer
MAX_RETRIES = 3
BASE_BACKOFF_SECONDS = 60 # 1m, 2m, 4m backoff schedule
def should_retry(dlq_record: dict) -> bool:
retry_count = dlq_record.get("retry_count", 0)
error_type = dlq_record.get("error_type", "")
# Never retry these — they indicate data problems, not transient failures
terminal_errors = {
"ValidationError",
"SchemaError",
"UnicodeDecodeError",
"KeyError", # missing required field
}
if error_type in terminal_errors:
return False
return retry_count < MAX_RETRIES
def reprocess():
consumer = KafkaConsumer(
"orders.enriched.dlq",
bootstrap_servers=["kafka:9092"],
group_id="dlq-reprocessor",
value_deserializer=lambda v: json.loads(v.decode()),
auto_offset_reset="earliest",
)
producer = KafkaProducer(
bootstrap_servers=["kafka:9092"],
value_serializer=lambda v: json.dumps(v).encode(),
)
dead_letter_final = KafkaProducer(
bootstrap_servers=["kafka:9092"],
value_serializer=lambda v: json.dumps(v).encode(),
)
for message in consumer:
dlq_record = message.value
retry_count = dlq_record.get("retry_count", 0)
original = dlq_record.get("original_record", {})
if not should_retry(dlq_record):
# Send to final dead letter — needs manual inspection
dead_letter_final.send("orders.enriched.dead", value={
**dlq_record,
"final_dead_at": datetime.datetime.utcnow().isoformat(),
"reason": "max_retries_exceeded or terminal_error",
})
print(f"[DEAD] Moved to final DLQ: {dlq_record.get('error_type')}")
continue
# Exponential backoff: sleep before retrying
backoff = BASE_BACKOFF_SECONDS * (2 ** retry_count)
print(f"[RETRY {retry_count+1}/{MAX_RETRIES}] Backoff {backoff}s for: {original.get('order_id')}")
time.sleep(min(backoff, 3600))
# Re-publish to the main topic with incremented retry counter
retry_record = {
**original,
"_retry_count": retry_count + 1,
"_dlq_error": dlq_record.get("error"),
"_first_failed_at": dlq_record.get("timestamp"),
}
producer.send("orders.raw", value=retry_record)
print(f"[RETRY] Re-queued order_id={original.get('order_id')}")Idempotent Writes: Making Every Pipeline Step Safe to Replay
The most powerful resilience property is idempotency: running the same pipeline step twice produces the same result as running it once. Idempotent pipelines can be safely retried, replayed from checkpoints, and re-run after failures without producing duplicates or inconsistencies. In practice, this means using MERGE/UPSERT semantics instead of INSERT, partition-based overwrites instead of appends, and deterministic IDs derived from content rather than auto-increment sequences.
-- PostgreSQL: idempotent upsert pattern
-- Using INSERT ... ON CONFLICT for safe-to-replay ingestion
-- Table with a natural key constraint
CREATE TABLE orders_fact (
order_id UUID PRIMARY KEY,
customer_id UUID NOT NULL,
status TEXT NOT NULL,
total_amount NUMERIC(12,2) NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
-- Pipeline metadata — helps with debugging and reprocessing
_ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
_source_topic TEXT,
_source_offset BIGINT,
_pipeline_run UUID -- identifies which pipeline run wrote this
);
-- Idempotent upsert: safe to run multiple times for the same order_id
INSERT INTO orders_fact (
order_id, customer_id, status, total_amount,
created_at, updated_at,
_ingested_at, _source_topic, _source_offset, _pipeline_run
)
VALUES (
:order_id, :customer_id, :status, :total_amount,
:created_at, :updated_at,
NOW(), :source_topic, :source_offset, :pipeline_run
)
ON CONFLICT (order_id) DO UPDATE SET
status = EXCLUDED.status,
total_amount = EXCLUDED.total_amount,
updated_at = EXCLUDED.updated_at,
_ingested_at = EXCLUDED._ingested_at,
_source_offset = EXCLUDED._source_offset,
_pipeline_run = EXCLUDED._pipeline_run
-- Only update if the incoming record is newer (prevents out-of-order overwrites)
WHERE orders_fact.updated_at < EXCLUDED.updated_at;
-- Verify idempotency: count rows changed in this pipeline run
SELECT
COUNT(*) FILTER (WHERE _pipeline_run = :pipeline_run AND xmin::text::bigint > 0) AS rows_upserted,
COUNT(*) FILTER (WHERE _pipeline_run = :pipeline_run) AS rows_total
FROM orders_fact
WHERE _ingested_at >= NOW() - INTERVAL '1 hour';# Spark: idempotent partition overwrite for batch pipelines
# Delta Lake merge pattern — safe to replay without duplicates
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from delta.tables import DeltaTable
import datetime
spark = SparkSession.builder .appName("orders-idempotent-load") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate()
# Read incoming batch (e.g. from S3 landing zone)
incoming = spark.read.json("s3://data-landing/orders/2026-05-08/")
# Add pipeline metadata for auditability
pipeline_run_id = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
incoming = incoming.withColumn("_pipeline_run", F.lit(pipeline_run_id)) .withColumn("_ingested_at", F.current_timestamp())
target_path = "s3://data-warehouse/orders_fact"
if DeltaTable.isDeltaTable(spark, target_path):
target = DeltaTable.forPath(spark, target_path)
# MERGE: insert new records, update existing only if newer
target.alias("target").merge(
incoming.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.updated_at > target.updated_at",
set={
"status": "source.status",
"total_amount": "source.total_amount",
"updated_at": "source.updated_at",
"_pipeline_run": "source._pipeline_run",
"_ingested_at": "source._ingested_at",
}
).whenNotMatchedInsertAll() .execute()
else:
# First write: just write the table
incoming.write.format("delta").partitionBy("DATE(created_at)").save(target_path)
# Compact small files every 10 runs (idempotent OPTIMIZE)
target = DeltaTable.forPath(spark, target_path)
target.optimize().executeCompaction()
# Vacuum old snapshots (keeps 7 days for time travel)
target.vacuum(retentionHours=168)
print(f"Pipeline run {pipeline_run_id} complete")Note
Quantifying Data Downtime Cost: A Framework
Before investing in reliability tooling, quantify the actual cost of your current downtime. This makes the business case concrete and helps prioritise which pipelines to protect first. The framework has four components: detection time, impact breadth, impact depth, and recovery cost.
# data_downtime_cost_model.py
# Quantify the business cost of data downtime for a given pipeline
# Adapt the cost inputs to your organisation's context
from dataclasses import dataclass
from typing import Optional
@dataclass
class PipelineProfile:
name: str
# --- Detection ---
avg_detection_minutes: float # How long until someone notices
# --- Breadth: how many downstream consumers ---
downstream_dashboards: int # BI dashboards that go stale
downstream_ml_models: int # ML models that score stale features
downstream_pipelines: int # Other pipelines that read this one
downstream_api_consumers: int # APIs / apps that query this data
# --- Depth: cost per unit of downtime ---
revenue_per_hour_usd: Optional[float] = None # if pipeline drives revenue
decisions_per_hour: int = 0 # high-stakes decisions made with this data
decision_error_rate: float = 0.05 # fraction of decisions wrong when data stale
decision_cost_usd: float = 100.0 # avg cost of a wrong decision
# --- Engineering cost ---
engineer_hourly_cost_usd: float = 120.0 # fully loaded cost
avg_remediation_hours: float = 2.0
avg_incidents_per_month: int = 3
def calculate_monthly_cost(p: PipelineProfile) -> dict:
# Detection + triage time
detection_hours = p.avg_detection_minutes / 60
triage_cost = detection_hours * p.engineer_hourly_cost_usd
# Remediation cost per incident
remediation_cost = p.avg_remediation_hours * p.engineer_hourly_cost_usd
# Downstream impact: assume 1 hour of stale data per incident on average
stale_hours_per_incident = 1.0 + detection_hours
# Revenue impact (if applicable)
revenue_impact = 0.0
if p.revenue_per_hour_usd:
revenue_impact = p.revenue_per_hour_usd * stale_hours_per_incident * 0.10
# Conservative: assume 10% of revenue affected by data quality
# Decision quality impact
decisions_impacted = p.decisions_per_hour * stale_hours_per_incident
decision_impact = decisions_impacted * p.decision_error_rate * p.decision_cost_usd
# Stakeholder time: assume 30m per affected consumer team per incident
stakeholder_cost = (
p.downstream_dashboards * 0.5 +
p.downstream_ml_models * 1.0 +
p.downstream_pipelines * 0.5 +
p.downstream_api_consumers * 0.25
) * p.engineer_hourly_cost_usd
cost_per_incident = (
triage_cost + remediation_cost + revenue_impact +
decision_impact + stakeholder_cost
)
monthly_cost = cost_per_incident * p.avg_incidents_per_month
return {
"pipeline": p.name,
"cost_per_incident_usd": round(cost_per_incident, 2),
"monthly_cost_usd": round(monthly_cost, 2),
"annual_cost_usd": round(monthly_cost * 12, 2),
"breakdown": {
"triage_usd": round(triage_cost, 2),
"remediation_usd": round(remediation_cost, 2),
"revenue_impact_usd": round(revenue_impact, 2),
"decision_impact_usd": round(decision_impact, 2),
"stakeholder_usd": round(stakeholder_cost, 2),
},
}
# Example: orders pipeline in an e-commerce company
orders_pipeline = PipelineProfile(
name="orders_fact",
avg_detection_minutes=45,
downstream_dashboards=8,
downstream_ml_models=3,
downstream_pipelines=5,
downstream_api_consumers=2,
revenue_per_hour_usd=50_000, # $50k/hr in checkout revenue
decisions_per_hour=20,
decision_error_rate=0.08,
decision_cost_usd=500,
engineer_hourly_cost_usd=150,
avg_remediation_hours=2.5,
avg_incidents_per_month=4,
)
result = calculate_monthly_cost(orders_pipeline)
print(f"Pipeline: {result['pipeline']}")
print(f"Cost per incident: ${result['cost_per_incident_usd']:,.0f}")
print(f"Monthly cost: ${result['monthly_cost_usd']:,.0f}")
print(f"Annual cost: ${result['annual_cost_usd']:,.0f}")
print(f"Breakdown: {result['breakdown']}")
# Output:
# Pipeline: orders_fact
# Cost per incident: $7,875.00
# Monthly cost: $31,500.00
# Annual cost: $378,000.00Pipeline Reliability Tiers: Prioritise What Matters
Not every pipeline needs five-nines reliability. Applying gold-tier monitoring to a weekly analytics refresh wastes engineering effort and creates alert fatigue. Define reliability tiers based on the cost model above, and invest accordingly. The tier model makes tradeoffs explicit and helps data teams say "no" to unrealistic SLA demands for low-priority pipelines.
Tier 1 — Mission Critical
Revenue-generating pipelines, regulatory reporting, customer-facing data products. SLO: 99.9% freshness (<5 min lag), 99.5% completeness. Circuit breakers enabled. DLQ + automatic reprocessing. PagerDuty alerts. Full data observability. Reviewed weekly.
Tier 2 — Business Critical
Core business intelligence, marketing analytics, inventory management. SLO: 99% freshness (<30 min lag), 99% completeness. Slack alerts. DLQ without automatic reprocessing. Elementary or GX checks. Reviewed monthly.
Tier 3 — Operational
Internal dashboards, data science experiments, non-time-sensitive analytics. SLO: 95% freshness (<2 hour lag), 95% completeness. Email alerts. Basic row count monitoring. No circuit breaker. Reviewed quarterly.
Tier 4 — Best Effort
Exploratory data, archived datasets, backfill jobs. No formal SLO. Alert on job failure only. Manual remediation. No formal review cadence.
Alert Design and Runbooks: From Detection to Resolution
The worst data incidents are the ones where engineers receive an alert, spend 30 minutes determining what it means, another 30 finding the cause, and another hour figuring out the remediation. Good runbooks collapse that to 15 minutes total. Every data pipeline SLO alert should link directly to a runbook covering: what broke, likely causes (ranked by frequency), investigation steps, and remediation procedures.
# Prometheus alerting rules for data pipeline SLOs
# prometheus/rules/data_pipelines.yml
groups:
- name: data_pipeline_slos
interval: 60s
rules:
# Freshness SLO breach — Tier 1 pipelines
- alert: DataFreshnessCritical
expr: |
data_pipeline_freshness_seconds{pipeline="orders"} > 300
for: 2m
labels:
severity: critical
team: data-platform
tier: "1"
annotations:
summary: "Orders pipeline data is stale ({{ $value | humanizeDuration }})"
description: |
Pipeline {{ $labels.pipeline }}.{{ $labels.table }} has not
received new data for {{ $value | humanizeDuration }}.
SLO: freshness < 5 minutes.
runbook_url: "https://wiki.internal/runbooks/data/orders-freshness"
dashboard_url: "https://grafana.internal/d/data-pipelines"
# Completeness SLO breach
- alert: DataCompletenessWarning
expr: |
data_pipeline_completeness_ratio < 0.95
for: 5m
labels:
severity: warning
team: data-platform
annotations:
summary: "Data completeness below SLO ({{ $value | humanizePercentage }})"
description: |
{{ $labels.pipeline }}.{{ $labels.table }} completeness is
{{ $value | humanizePercentage }} in the last {{ $labels.window }}.
SLO: >= 95%.
runbook_url: "https://wiki.internal/runbooks/data/completeness"
# Row count anomaly
- alert: DataRowCountAnomaly
expr: |
data_pipeline_row_count_anomaly_score > 3
for: 0m
labels:
severity: warning
team: data-platform
annotations:
summary: "Unusual row count in {{ $labels.table }} (Z-score: {{ $value | humanize }})"
description: |
Row count in {{ $labels.pipeline }}.{{ $labels.table }} is
{{ $value | humanize }} standard deviations from the 30-day average.
Investigate for upstream data loss or volume spike.
runbook_url: "https://wiki.internal/runbooks/data/row-count-anomaly"
# SLO breach rate (multi-burn-rate alerting)
- alert: DataSLOBurnRateHigh
expr: |
rate(data_pipeline_slo_breach_total[1h]) > 0.1
for: 5m
labels:
severity: critical
team: data-platform
annotations:
summary: "High SLO breach rate for {{ $labels.pipeline }}"
description: |
Pipeline {{ $labels.pipeline }} is breaching SLO at
{{ $value | humanize }}/sec. At this rate the monthly
error budget will be exhausted within hours.Data Reliability Production Checklist
Every Tier 1 pipeline has a freshness SLO with a Prometheus alert
Defined in terms business stakeholders understand (e.g. 'orders data must be no older than 5 minutes'), not internal engineering metrics like 'Kafka lag < 1000'.
Schema changes trigger alerts before they reach downstream consumers
Column renames, type changes, and column drops are the leading cause of silent data corruption. Elementary or Protobuf/Avro schema registries enforce compatibility rules automatically.
Every pipeline write is idempotent
Test idempotency by replaying the last 24 hours of data in staging and comparing checksums. Add this to your CI pipeline. Non-idempotent pipelines are a ticking clock.
Dead letter queues capture every failed record
Failed records sent to /dev/null are data losses. DLQs preserve them for inspection and reprocessing. Include original payload, error details, and source offset so reprocessing can pick up exactly where it failed.
Circuit breakers protect Tier 1 consumers from upstream degradation
Without circuit breakers, a degraded upstream service causes cascading failures across all consumers. A tripped circuit with a clear error message is better than stale data silently flowing downstream.
Runbooks exist for every Tier 1 and Tier 2 alert
The runbook must be maintained by the on-call rotation — not just written once and forgotten. Link it directly from the alert annotation. If the alert fires and the runbook is outdated, update it during the incident, not after.
Data downtime cost is calculated and reviewed quarterly
Use the cost model to prioritise reliability investments. If the annual cost of a pipeline's downtime is $50k and the fix costs $20k in engineering time, the ROI is obvious. Make this number visible to stakeholders.
Error budget policy is defined and enforced
When a Tier 1 pipeline exhausts its monthly error budget (e.g. cumulative freshness breaches > 0.1%), stop feature work and focus exclusively on reliability until the budget is restored. This creates the right incentives.
Work with us
Struggling with unreliable data pipelines and unclear SLA impact?
We design and implement data reliability frameworks — from freshness SLOs and data observability instrumentation to circuit breakers, dead letter queues, and idempotent pipeline architectures that eliminate silent data loss. Let’s talk.
Get in touch