The Silent Failure Problem in Data Pipelines
Bad data is worse than no data. When a service is down, your monitoring fires within seconds and engineers are paged. When your data pipelines silently produce stale, incomplete, or incorrect records, the problem may go undetected for hours, days, or weeks — while dashboards, ML models, and business decisions operate on poisoned inputs.
This is the core problem that data quality observability solves: bringing the same rigor of service reliability — SLOs, alerting, incident triage — to the correctness and freshness of your data. The discipline has matured rapidly, moving from ad-hoc SQL checks to dedicated platforms like Monte Carlo and open-source frameworks like Soda Core.
The dbt data tests documentation and Great Expectations Expectations Gallery are the authoritative references for the specific tools covered here.
The Five Pillars of Data Quality
Data quality is not a single metric. Before instrumenting anything, you need a shared vocabulary for what "quality" means across your organisation. The five-pillar model, popularised by Monte Carlo, gives you a consistent framework for classifying quality issues and building targeted checks for each dimension.
Freshness
How recently was this data updated? Freshness failures are the most common data quality incident: a Airflow DAG fails silently, a CDC connector falls behind, an API source stops delivering records. Define a maximum acceptable age for every critical table and alert when it is exceeded. Freshness is the first thing to check during any data incident.
Completeness
Are all expected records and fields present? Completeness checks detect partial loads (only 40% of expected rows arrived today), NULL rates that exceed thresholds (email is NULL in 30% of users — was there a schema change upstream?), and missing partitions. Volume anomaly detection on row counts is the simplest high-signal completeness check.
Consistency
Are values consistent across tables, systems, and time? Referential integrity checks (every order.customer_id exists in the customers table) and cross-system reconciliation (Stripe revenue == warehouse revenue ± 0.1%) fall here. Consistency failures often indicate schema drift or logic errors in transformation code.
Accuracy
Do values reflect reality? Accuracy is the hardest pillar to instrument automatically because it requires ground truth. Range checks (age cannot be 200, revenue cannot be negative), format checks (email addresses must match a regex, phone numbers must be E.164), and domain validation (country codes must be ISO 3166-1 alpha-2) are practical proxies for accuracy.
Uniqueness
Are primary keys and business identifiers truly unique? Duplicate rows in dimension tables cause fan-out joins that silently inflate aggregate metrics. Duplicate event IDs in fact tables cause double-counting. Uniqueness checks on primary keys, natural keys, and composite deduplication keys are non-negotiable for any analytical workload.
dbt Tests — Schema Tests and Singular Data Tests
dbt ships with a built-in test framework that integrates directly into your transformation workflow. Tests run after models are built and block downstream models from being materialised when they fail. This makes dbt tests the lowest-friction entry point for data quality: your checks live alongside your SQL, run in CI, and produce actionable failures.
Generic (Schema) Tests
Generic tests are configured in YAML schema files and apply to columns across any model. The four built-in generic tests cover the most common quality invariants. Add them to every critical column from day one.
# models/schema.yml — dbt generic tests on a fact table
version: 2
models:
- name: fct_orders
description: "One row per order, joined with customer and product dimensions."
columns:
- name: order_id
description: "Surrogate primary key — must be unique and non-null."
tests:
- unique
- not_null
- name: customer_id
description: "Foreign key to dim_customers."
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: status
description: "Order lifecycle status."
tests:
- accepted_values:
values: ['pending', 'processing', 'shipped', 'delivered', 'cancelled', 'refunded']
- name: order_total_usd
description: "Order value in USD — must be non-negative."
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: created_at
tests:
- not_null
- dbt_utils.expression_is_true:
# No future orders — allow 1 hour clock skew
expression: "<= DATEADD('hour', 1, CURRENT_TIMESTAMP())"
- name: dim_customers
tests:
# Table-level test: no duplicate email addresses across all rows
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- email
- tenant_id
columns:
- name: customer_id
tests:
- unique
- not_null
- name: email
tests:
- not_nullSingular Tests and Custom Generic Tests
Singular tests are standalone SQL files in the tests/ directory. They return the rows that fail the assertion. Custom generic tests are macros that accept arguments and can be reused across models — the right abstraction for repeated business rules.
-- tests/assert_revenue_reconciled.sql
-- Singular test: Stripe revenue vs warehouse revenue must agree within 0.1%
-- Returns rows (dates) where the gap exceeds the threshold
WITH stripe_daily AS (
SELECT
DATE(created_at) AS date,
SUM(amount_usd) AS stripe_revenue
FROM {{ ref('stg_stripe_charges') }}
WHERE status = 'succeeded'
GROUP BY 1
),
warehouse_daily AS (
SELECT
order_date AS date,
SUM(order_total_usd) AS warehouse_revenue
FROM {{ ref('fct_orders') }}
WHERE status != 'cancelled'
GROUP BY 1
),
reconciled AS (
SELECT
s.date,
s.stripe_revenue,
w.warehouse_revenue,
ABS(s.stripe_revenue - w.warehouse_revenue) / NULLIF(s.stripe_revenue, 0) AS gap_pct
FROM stripe_daily s
JOIN warehouse_daily w USING (date)
WHERE s.date >= DATEADD('day', -7, CURRENT_DATE)
)
SELECT *
FROM reconciled
WHERE gap_pct > 0.001 -- Alert if gap > 0.1%-- macros/test_not_null_proportion.sql
-- Custom generic test: assert that the NULL rate for a column is below a threshold
{% test not_null_proportion(model, column_name, max_null_rate=0.01) %}
WITH validation AS (
SELECT
COUNT(*) AS total_rows,
COUNT(CASE WHEN {{ column_name }} IS NULL THEN 1 END) AS null_rows
FROM {{ model }}
),
validation_errors AS (
SELECT
total_rows,
null_rows,
null_rows::FLOAT / NULLIF(total_rows, 0) AS null_rate
FROM validation
WHERE null_rows::FLOAT / NULLIF(total_rows, 0) > {{ max_null_rate }}
)
SELECT *
FROM validation_errors
{% endtest %}# Apply the custom test in schema.yml
models:
- name: fct_events
columns:
- name: user_id
tests:
# Allow up to 5% null user_id (anonymous events are acceptable)
- not_null_proportion:
max_null_rate: 0.05
- name: session_id
tests:
# session_id must always be present
- not_null_proportion:
max_null_rate: 0.0Test Severity and store_failures
Not every test failure should block a pipeline run. Use severity: warn for checks where violations are expected during a data migration or where a small number of errors is acceptable. Use store_failures: true to materialise failing rows into a dedicated table for investigation.
# schema.yml — test severity and store_failures
models:
- name: fct_orders
columns:
- name: phone_number
tests:
- not_null:
severity: warn # warn, not error — phone is optional
- dbt_utils.expression_is_true:
# E.164 format check
expression: "REGEXP_LIKE(phone_number, '^\\+[1-9]\\d{6,14}$')"
severity: warn
store_failures: true # materialise bad rows to dbt_test__audit.fct_orders_phone_number
config:
alias: fct_orders_invalid_phones
- name: order_total_usd
tests:
- dbt_utils.expression_is_true:
expression: ">= 0"
severity: error # hard fail — negative revenue is never acceptable
store_failures: trueNote
store_failures_as: view in your dbt_project.yml to use views instead of tables for failure storage — this avoids materialisation cost for failures that should be empty most of the time. Switch to store_failures_as: table only for tests where you want to retain failure history for trend analysis.Great Expectations — Checkpoints and Data Docs
Great Expectations (GX) provides a Python-native framework for defining, running, and documenting data quality expectations. Unlike dbt tests (which run against transformed data in your warehouse), GX can validate data at any point in the pipeline — in Spark DataFrames, Pandas, raw S3 files, or Postgres tables — making it ideal for validating data at ingestion time before it reaches your warehouse.
# great_expectations_suite.py — GX Core 1.x API
import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest
import pandas as pd
context = gx.get_context()
# Define the datasource (Pandas in-memory for this example)
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset("orders_asset")
batch_def = data_asset.add_batch_definition_whole_dataframe("orders_batch")
# Load a sample DataFrame (replace with your actual ingestion source)
df = pd.read_parquet("s3://data-lake/raw/orders/date=2026-05-15/orders.parquet")
batch = batch_def.get_batch(batch_parameters={"dataframe": df})
# Define an Expectation Suite
suite = context.suites.add(
gx.ExpectationSuite(name="orders_raw_suite")
)
# Completeness: no NULL order_ids
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
# Uniqueness: order_id is a primary key
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
# Accuracy: order_total_usd must be non-negative
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="order_total_usd",
min_value=0,
max_value=None,
)
)
# Consistency: status must be in allowed set
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set={"pending", "processing", "shipped", "delivered", "cancelled", "refunded"},
)
)
# Completeness: row count must be within 20% of yesterday's count
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=10_000, # set dynamically from yesterday's count in production
max_value=500_000,
)
)
# Run a Checkpoint — GX equivalent of running all validations and recording results
checkpoint = context.checkpoints.add(
gx.Checkpoint(
name="orders_daily_checkpoint",
validations=[
{
"batch_definition": batch_def,
"expectation_suite_name": "orders_raw_suite",
}
],
)
)
result = checkpoint.run(batch_parameters={"dataframe": df})
if not result.success:
raise RuntimeError(
f"Data quality validation failed: {result.describe_dict()}"
)# custom_expectation.py — Writing a custom column-level expectation in GX
from great_expectations.expectations.expectation import ColumnExpectation
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.execution_engine import PandasExecutionEngine
from great_expectations.expectations.metrics import ColumnMetricProvider, column_condition_partial
import re
class ExpectColumnValuesToMatchE164Format(ColumnExpectation):
"""Validates that phone number values conform to E.164 international format."""
expectation_type = "expect_column_values_to_match_e164_format"
_E164_REGEX = re.compile(r"^\+[1-9]\d{6,14}$")
metric_dependencies = ("column_values.match_e164",)
success_keys = ("mostly",)
default_kwarg_values = {"mostly": 1.0}
def _validate(self, configuration, metrics, runtime_configuration=None, execution_engine=None):
match_count = metrics.get("column_values.match_e164.count")
total_count = metrics.get("column_values.match_e164.unexpected_count")
mostly = self.get_success_kwargs(configuration).get("mostly", 1.0)
pct_valid = match_count / max(match_count + total_count, 1)
return {"success": pct_valid >= mostly, "result": {"observed_value": pct_valid}}Soda Core — Data Contracts with SodaCL
Soda Core introduces SodaCL (Soda Checks Language), a YAML DSL for expressing data quality checks directly against SQL data sources. SodaCL checks are readable by non-engineers — data owners, analysts, and product managers can review and modify them without writing SQL. This makes Soda Core an excellent tool for implementing formal data contracts between producers and consumers.
# checks/fct_orders.yml — SodaCL checks for the orders fact table
checks for fct_orders:
# Freshness: table must have been updated in the last 26 hours
- freshness(updated_at) < 26h
# Completeness: row count must not drop by more than 30% vs yesterday
- change for row_count < -30%
# Uniqueness: no duplicate primary keys
- duplicate_count(order_id) = 0
# Accuracy: no negative revenue
- invalid_count(order_total_usd) = 0:
valid min: 0
# Completeness: order_id and customer_id must always be present
- missing_count(order_id) = 0
- missing_count(customer_id) = 0
# Accuracy: status must be in allowed set
- invalid_count(status) = 0:
valid values: [pending, processing, shipped, delivered, cancelled, refunded]
# Completeness: at most 5% null email (anonymous orders allowed)
- missing_percent(customer_email) < 5%
# Consistency: cross-table referential integrity
- values in (customer_id) must exist in dim_customers (customer_id)# run_soda_checks.py — Running Soda checks programmatically in a pipeline
from soda.scan import Scan
def run_quality_checks(table: str, checks_file: str, data_source: str) -> bool:
scan = Scan()
scan.set_scan_definition_name(f"daily_quality_{table}")
scan.set_data_source_name(data_source)
# Load SodaCL checks from YAML file
scan.add_sodacl_yaml_file(checks_file)
# Add connection configuration (loaded from environment)
scan.add_configuration_yaml_str(f"""
data_sources:
{data_source}:
type: snowflake
username: ${SNOWFLAKE_USER}
password: ${SNOWFLAKE_PASSWORD}
account: ${SNOWFLAKE_ACCOUNT}
database: analytics
schema: public
""")
exit_code = scan.execute()
if exit_code != 0:
print(scan.get_logs_text())
for check_result in scan.get_checks_fail():
print(f"FAILED: {check_result.check.name} — {check_result.outcomes}")
return False
return True
# In your Airflow DAG or CI pipeline:
if __name__ == "__main__":
passed = run_quality_checks(
table="fct_orders",
checks_file="checks/fct_orders.yml",
data_source="snowflake_prod",
)
if not passed:
raise SystemExit("Data quality checks failed — pipeline blocked")Freshness SLOs — Measuring and Alerting on Data Staleness
A freshness SLO formalises your commitment about how current your data is. Without an explicit SLO, "the data should be recent" is an opinion — with an SLO, it is a contract. Define the SLO as a maximum acceptable lag: "The fct_orders table must contain records within 2 hours of their creation time, 99.5% of the time."
-- freshness_monitoring.sql — Query for Prometheus / monitoring scraper
-- Expose the age of the most recent record in each critical table
-- Run this query on a schedule and push metrics to Prometheus Pushgateway
SELECT
'fct_orders' AS table_name,
DATEDIFF('minute', MAX(created_at), CURRENT_TIMESTAMP()) AS lag_minutes,
MAX(created_at) AS last_record_at
FROM fct_orders
UNION ALL
SELECT
'fct_events',
DATEDIFF('minute', MAX(event_time), CURRENT_TIMESTAMP()),
MAX(event_time)
FROM fct_events
UNION ALL
SELECT
'dim_products',
DATEDIFF('minute', MAX(updated_at), CURRENT_TIMESTAMP()),
MAX(updated_at)
FROM dim_products;# freshness_exporter.py — Push data freshness metrics to Prometheus Pushgateway
import os
import time
import snowflake.connector
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
PUSHGATEWAY_URL = os.environ["PUSHGATEWAY_URL"]
SNOWFLAKE_ACCOUNT = os.environ["SNOWFLAKE_ACCOUNT"]
SNOWFLAKE_USER = os.environ["SNOWFLAKE_USER"]
SNOWFLAKE_PASSWORD = os.environ["SNOWFLAKE_PASSWORD"]
FRESHNESS_QUERY = """
SELECT table_name, lag_minutes
FROM (
SELECT 'fct_orders' AS table_name,
DATEDIFF('minute', MAX(created_at), CURRENT_TIMESTAMP()) AS lag_minutes
FROM analytics.public.fct_orders
UNION ALL
SELECT 'fct_events',
DATEDIFF('minute', MAX(event_time), CURRENT_TIMESTAMP())
FROM analytics.public.fct_events
UNION ALL
SELECT 'dim_customers',
DATEDIFF('minute', MAX(updated_at), CURRENT_TIMESTAMP())
FROM analytics.public.dim_customers
)
"""
def push_freshness_metrics() -> None:
registry = CollectorRegistry()
gauge = Gauge(
"data_table_lag_minutes",
"Minutes since the most recent record was written to the table",
labelnames=["table_name"],
registry=registry,
)
conn = snowflake.connector.connect(
account=SNOWFLAKE_ACCOUNT,
user=SNOWFLAKE_USER,
password=SNOWFLAKE_PASSWORD,
database="analytics",
schema="public",
)
try:
with conn.cursor() as cur:
cur.execute(FRESHNESS_QUERY)
for table_name, lag_minutes in cur.fetchall():
gauge.labels(table_name=table_name).set(lag_minutes)
finally:
conn.close()
push_to_gateway(PUSHGATEWAY_URL, job="data_freshness", registry=registry)
if __name__ == "__main__":
push_freshness_metrics()# prometheus_alerts.yml — Alerting rules for data freshness SLOs
groups:
- name: data_quality
rules:
# fct_orders must be refreshed at least every 2 hours
- alert: DataFreshnessViolation_fct_orders
expr: data_table_lag_minutes{table_name="fct_orders"} > 120
for: 5m
labels:
severity: critical
team: data-engineering
annotations:
summary: "fct_orders is stale ({{ $value }} minutes lag)"
description: >
The fct_orders table has not been updated for {{ $value }} minutes.
SLO threshold is 120 minutes. Check the Airflow DAG
'fct_orders_daily' for failures.
runbook_url: "https://wiki.internal/data/runbooks/fct_orders_freshness"
# fct_events allows up to 30 minutes lag (near-real-time)
- alert: DataFreshnessViolation_fct_events
expr: data_table_lag_minutes{table_name="fct_events"} > 30
for: 2m
labels:
severity: warning
team: data-engineering
annotations:
summary: "fct_events is stale ({{ $value }} minutes lag)"
description: >
The fct_events table lag exceeds 30 minutes (current: {{ $value }}).
Check the Kafka consumer group 'events-to-snowflake' for lag buildup.Note
Monte Carlo — ML-Based Anomaly Detection
Monte Carlo is the leading commercial data observability platform. Rather than requiring you to hand-craft every quality check, it uses ML models trained on your data's historical patterns to detect anomalies automatically — row count spikes, volume drops, schema changes, field distribution shifts, and freshness violations — without you needing to predict every possible failure mode upfront.
Automatic Monitors
Monte Carlo automatically trains monitors for volume, freshness, schema changes, and field distribution for every table it discovers in your warehouse. No configuration required. The ML models establish a baseline from 14-30 days of history and alert on statistically significant deviations. This gives you coverage across your entire warehouse within hours of connecting Monte Carlo — including tables you didn't know needed monitoring.
Circuit Breakers
Monte Carlo can block downstream pipeline runs when upstream data is anomalous. Configured as API calls within Airflow, dbt, or custom orchestrators, circuit breakers prevent bad data from propagating through your transformation layers. A circuit breaker on fct_orders that detects a 50% volume drop will pause the downstream revenue reporting DAG before it produces incorrect numbers.
Lineage-Aware Incident Routing
Monte Carlo builds an automated data lineage graph by parsing SQL query history. When an incident fires on a table, Monte Carlo shows you every upstream source that feeds it and every downstream consumer affected — dashboards, ML models, reports — so you can immediately assess blast radius and notify the right teams. This transforms triage from hours of manual investigation to minutes of lineage-guided root cause analysis.
# monte_carlo_circuit_breaker.py — Block Airflow DAG on data quality incident
# Uses the Monte Carlo SDK: pip install pycarlo
from pycarlo.core import Client, Session
from pycarlo.features.circuit_breakers import CircuitBreaker
session = Session(mcd_id=MCD_ID, mcd_token=MCD_TOKEN)
client = Client(session=session)
def check_circuit_breaker(table_mcon: str, breach_count_threshold: int = 0) -> bool:
"""
Returns True if the circuit is OPEN (data is healthy).
Returns False if the circuit is BROKEN (anomalies detected — block pipeline).
"""
cb = CircuitBreaker(client=client)
result = cb.trigger(
operator="GT",
threshold=breach_count_threshold,
table_mcon=table_mcon,
)
return result.is_circuit_open
# In an Airflow DAG task:
from airflow.decorators import task
@task
def quality_gate_fct_orders():
import os
MCD_ID = os.environ["MONTE_CARLO_ID"]
MCD_TOKEN = os.environ["MONTE_CARLO_TOKEN"]
TABLE_MCON = "MCON++your_account++fct_orders++" # from Monte Carlo UI
if not check_circuit_breaker(TABLE_MCON):
raise ValueError(
"Circuit breaker OPEN: fct_orders has active quality incidents. "
"Downstream DAG execution blocked."
)Data Incident Triage — Runbooks and Root Cause Analysis
Data quality incidents are different from service incidents: they are often invisible to end-users initially, the blast radius expands over time (more downstream consumers get poisoned data), and the root cause is frequently multiple steps removed from where the alert fires. Structured runbooks are essential.
Step 1: Assess the blast radius immediately
When a freshness or quality alert fires, the first question is not 'what failed?' but 'who is affected?'. Check your lineage tool (Monte Carlo, DataHub, or dbt's built-in lineage) for every downstream consumer of the affected table. Notify the owners of affected dashboards and ML models before they discover stale data themselves.
Step 2: Classify the failure type
Pipeline failure (Airflow DAG error, dbt run error) vs. silent data corruption (pipeline succeeded but produced wrong data) vs. source system failure (upstream API stopped delivering records). Each type has a different investigation path. Check Airflow logs first — most incidents are pipeline failures with obvious error messages. If the DAG succeeded, the problem is upstream or in the transformation logic.
Step 3: Establish the bad data window
Determine exactly when the data quality started degrading. Use your freshness metrics timeseries to find when the lag began growing. Query the table's MAX(updated_at) against your monitoring data to bound the window. This determines how far back a backfill must go and which downstream reports need to be invalidated and re-run.
Step 4: Fix forward vs. rollback
For transformation errors: fix the dbt model, run a targeted backfill for the affected date range, trigger downstream models. For source system failures: coordinate with the upstream team, re-request missing data if available, mark affected partitions as invalid in your data catalog. Always prefer idempotent backfills over corrections — your transformation code must produce the same output when run twice on the same input.
Step 5: Blameless post-mortem and check authoring
Every data quality incident that was not caught by an existing check is an opportunity to add one. Within 48 hours of resolution, write a dbt test, Soda check, or GX expectation that would have caught this failure earlier. Track mean time to detection (MTTD) and mean time to recovery (MTTR) for data incidents alongside your service SRE metrics.
Further Reading
- dbt — Data Tests Documentation — comprehensive guide to generic and singular tests, severity, and store_failures
- Great Expectations — Official Documentation — Checkpoints, Expectation Suites, custom expectations, and Data Docs
- Soda Core — SodaCL Overview — the complete SodaCL language reference for data contracts and quality checks
- Monte Carlo — Data Quality Fundamentals — the five pillars framework and ML-based anomaly detection explained
- dbt-utils — Generic Test Library — extended test macros including expression_is_true, unique_combination_of_columns, and recency checks
Work with us
Struggling with silent data quality failures or building a data observability layer for your platform?
We design and implement data quality observability frameworks — from dbt test suites and Great Expectations Checkpoints to Soda Core data contracts, freshness SLO instrumentation, Monte Carlo circuit breaker integration, and incident triage runbooks. Let’s talk.
Get in touch