Back to Blog
Data QualityObservabilitydbtMonte CarloData EngineeringSLA

Data Quality Observability — Monte Carlo, dbt Tests, and Freshness SLAs

A practical guide to data quality observability in production: the five pillars of data quality (freshness, completeness, consistency, accuracy, uniqueness), dbt generic and singular tests with severity levels and store_failures, custom generic test macros, Great Expectations Checkpoints and custom expectations, SodaCL data contracts with Soda Core, freshness SLOs instrumented with Prometheus and Alertmanager, Monte Carlo ML-based anomaly detection and circuit breakers, and a structured data incident triage runbook.

2026-05-15

The Silent Failure Problem in Data Pipelines

Bad data is worse than no data. When a service is down, your monitoring fires within seconds and engineers are paged. When your data pipelines silently produce stale, incomplete, or incorrect records, the problem may go undetected for hours, days, or weeks — while dashboards, ML models, and business decisions operate on poisoned inputs.

This is the core problem that data quality observability solves: bringing the same rigor of service reliability — SLOs, alerting, incident triage — to the correctness and freshness of your data. The discipline has matured rapidly, moving from ad-hoc SQL checks to dedicated platforms like Monte Carlo and open-source frameworks like Soda Core.

The dbt data tests documentation and Great Expectations Expectations Gallery are the authoritative references for the specific tools covered here.

The Five Pillars of Data Quality

Data quality is not a single metric. Before instrumenting anything, you need a shared vocabulary for what "quality" means across your organisation. The five-pillar model, popularised by Monte Carlo, gives you a consistent framework for classifying quality issues and building targeted checks for each dimension.

Freshness

How recently was this data updated? Freshness failures are the most common data quality incident: a Airflow DAG fails silently, a CDC connector falls behind, an API source stops delivering records. Define a maximum acceptable age for every critical table and alert when it is exceeded. Freshness is the first thing to check during any data incident.

Completeness

Are all expected records and fields present? Completeness checks detect partial loads (only 40% of expected rows arrived today), NULL rates that exceed thresholds (email is NULL in 30% of users — was there a schema change upstream?), and missing partitions. Volume anomaly detection on row counts is the simplest high-signal completeness check.

Consistency

Are values consistent across tables, systems, and time? Referential integrity checks (every order.customer_id exists in the customers table) and cross-system reconciliation (Stripe revenue == warehouse revenue ± 0.1%) fall here. Consistency failures often indicate schema drift or logic errors in transformation code.

Accuracy

Do values reflect reality? Accuracy is the hardest pillar to instrument automatically because it requires ground truth. Range checks (age cannot be 200, revenue cannot be negative), format checks (email addresses must match a regex, phone numbers must be E.164), and domain validation (country codes must be ISO 3166-1 alpha-2) are practical proxies for accuracy.

Uniqueness

Are primary keys and business identifiers truly unique? Duplicate rows in dimension tables cause fan-out joins that silently inflate aggregate metrics. Duplicate event IDs in fact tables cause double-counting. Uniqueness checks on primary keys, natural keys, and composite deduplication keys are non-negotiable for any analytical workload.

dbt Tests — Schema Tests and Singular Data Tests

dbt ships with a built-in test framework that integrates directly into your transformation workflow. Tests run after models are built and block downstream models from being materialised when they fail. This makes dbt tests the lowest-friction entry point for data quality: your checks live alongside your SQL, run in CI, and produce actionable failures.

Generic (Schema) Tests

Generic tests are configured in YAML schema files and apply to columns across any model. The four built-in generic tests cover the most common quality invariants. Add them to every critical column from day one.

# models/schema.yml — dbt generic tests on a fact table

version: 2

models:
  - name: fct_orders
    description: "One row per order, joined with customer and product dimensions."
    columns:
      - name: order_id
        description: "Surrogate primary key — must be unique and non-null."
        tests:
          - unique
          - not_null

      - name: customer_id
        description: "Foreign key to dim_customers."
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id

      - name: status
        description: "Order lifecycle status."
        tests:
          - accepted_values:
              values: ['pending', 'processing', 'shipped', 'delivered', 'cancelled', 'refunded']

      - name: order_total_usd
        description: "Order value in USD — must be non-negative."
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0"

      - name: created_at
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              # No future orders — allow 1 hour clock skew
              expression: "<= DATEADD('hour', 1, CURRENT_TIMESTAMP())"

  - name: dim_customers
    tests:
      # Table-level test: no duplicate email addresses across all rows
      - dbt_utils.unique_combination_of_columns:
          combination_of_columns:
            - email
            - tenant_id
    columns:
      - name: customer_id
        tests:
          - unique
          - not_null
      - name: email
        tests:
          - not_null

Singular Tests and Custom Generic Tests

Singular tests are standalone SQL files in the tests/ directory. They return the rows that fail the assertion. Custom generic tests are macros that accept arguments and can be reused across models — the right abstraction for repeated business rules.

-- tests/assert_revenue_reconciled.sql
-- Singular test: Stripe revenue vs warehouse revenue must agree within 0.1%
-- Returns rows (dates) where the gap exceeds the threshold

WITH stripe_daily AS (
    SELECT
        DATE(created_at) AS date,
        SUM(amount_usd)  AS stripe_revenue
    FROM {{ ref('stg_stripe_charges') }}
    WHERE status = 'succeeded'
    GROUP BY 1
),
warehouse_daily AS (
    SELECT
        order_date             AS date,
        SUM(order_total_usd)   AS warehouse_revenue
    FROM {{ ref('fct_orders') }}
    WHERE status != 'cancelled'
    GROUP BY 1
),
reconciled AS (
    SELECT
        s.date,
        s.stripe_revenue,
        w.warehouse_revenue,
        ABS(s.stripe_revenue - w.warehouse_revenue) / NULLIF(s.stripe_revenue, 0) AS gap_pct
    FROM stripe_daily s
    JOIN warehouse_daily w USING (date)
    WHERE s.date >= DATEADD('day', -7, CURRENT_DATE)
)
SELECT *
FROM reconciled
WHERE gap_pct > 0.001   -- Alert if gap > 0.1%
-- macros/test_not_null_proportion.sql
-- Custom generic test: assert that the NULL rate for a column is below a threshold

{% test not_null_proportion(model, column_name, max_null_rate=0.01) %}

WITH validation AS (
    SELECT
        COUNT(*)                                                      AS total_rows,
        COUNT(CASE WHEN {{ column_name }} IS NULL THEN 1 END)        AS null_rows
    FROM {{ model }}
),
validation_errors AS (
    SELECT
        total_rows,
        null_rows,
        null_rows::FLOAT / NULLIF(total_rows, 0) AS null_rate
    FROM validation
    WHERE null_rows::FLOAT / NULLIF(total_rows, 0) > {{ max_null_rate }}
)
SELECT *
FROM validation_errors

{% endtest %}
# Apply the custom test in schema.yml

models:
  - name: fct_events
    columns:
      - name: user_id
        tests:
          # Allow up to 5% null user_id (anonymous events are acceptable)
          - not_null_proportion:
              max_null_rate: 0.05
      - name: session_id
        tests:
          # session_id must always be present
          - not_null_proportion:
              max_null_rate: 0.0

Test Severity and store_failures

Not every test failure should block a pipeline run. Use severity: warn for checks where violations are expected during a data migration or where a small number of errors is acceptable. Use store_failures: true to materialise failing rows into a dedicated table for investigation.

# schema.yml — test severity and store_failures

models:
  - name: fct_orders
    columns:
      - name: phone_number
        tests:
          - not_null:
              severity: warn         # warn, not error — phone is optional
          - dbt_utils.expression_is_true:
              # E.164 format check
              expression: "REGEXP_LIKE(phone_number, '^\\+[1-9]\\d{6,14}$')"
              severity: warn
              store_failures: true   # materialise bad rows to dbt_test__audit.fct_orders_phone_number
              config:
                alias: fct_orders_invalid_phones

      - name: order_total_usd
        tests:
          - dbt_utils.expression_is_true:
              expression: ">= 0"
              severity: error        # hard fail — negative revenue is never acceptable
              store_failures: true

Note

Enable store_failures_as: view in your dbt_project.yml to use views instead of tables for failure storage — this avoids materialisation cost for failures that should be empty most of the time. Switch to store_failures_as: table only for tests where you want to retain failure history for trend analysis.

Great Expectations — Checkpoints and Data Docs

Great Expectations (GX) provides a Python-native framework for defining, running, and documenting data quality expectations. Unlike dbt tests (which run against transformed data in your warehouse), GX can validate data at any point in the pipeline — in Spark DataFrames, Pandas, raw S3 files, or Postgres tables — making it ideal for validating data at ingestion time before it reaches your warehouse.

# great_expectations_suite.py — GX Core 1.x API

import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest
import pandas as pd

context = gx.get_context()

# Define the datasource (Pandas in-memory for this example)
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset("orders_asset")
batch_def = data_asset.add_batch_definition_whole_dataframe("orders_batch")

# Load a sample DataFrame (replace with your actual ingestion source)
df = pd.read_parquet("s3://data-lake/raw/orders/date=2026-05-15/orders.parquet")
batch = batch_def.get_batch(batch_parameters={"dataframe": df})

# Define an Expectation Suite
suite = context.suites.add(
    gx.ExpectationSuite(name="orders_raw_suite")
)

# Completeness: no NULL order_ids
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)

# Uniqueness: order_id is a primary key
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)

# Accuracy: order_total_usd must be non-negative
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="order_total_usd",
        min_value=0,
        max_value=None,
    )
)

# Consistency: status must be in allowed set
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeInSet(
        column="status",
        value_set={"pending", "processing", "shipped", "delivered", "cancelled", "refunded"},
    )
)

# Completeness: row count must be within 20% of yesterday's count
suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=10_000,   # set dynamically from yesterday's count in production
        max_value=500_000,
    )
)

# Run a Checkpoint — GX equivalent of running all validations and recording results
checkpoint = context.checkpoints.add(
    gx.Checkpoint(
        name="orders_daily_checkpoint",
        validations=[
            {
                "batch_definition": batch_def,
                "expectation_suite_name": "orders_raw_suite",
            }
        ],
    )
)

result = checkpoint.run(batch_parameters={"dataframe": df})

if not result.success:
    raise RuntimeError(
        f"Data quality validation failed: {result.describe_dict()}"
    )
# custom_expectation.py — Writing a custom column-level expectation in GX

from great_expectations.expectations.expectation import ColumnExpectation
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.execution_engine import PandasExecutionEngine
from great_expectations.expectations.metrics import ColumnMetricProvider, column_condition_partial
import re


class ExpectColumnValuesToMatchE164Format(ColumnExpectation):
    """Validates that phone number values conform to E.164 international format."""

    expectation_type = "expect_column_values_to_match_e164_format"
    _E164_REGEX = re.compile(r"^\+[1-9]\d{6,14}$")

    metric_dependencies = ("column_values.match_e164",)
    success_keys = ("mostly",)
    default_kwarg_values = {"mostly": 1.0}

    def _validate(self, configuration, metrics, runtime_configuration=None, execution_engine=None):
        match_count = metrics.get("column_values.match_e164.count")
        total_count = metrics.get("column_values.match_e164.unexpected_count")
        mostly = self.get_success_kwargs(configuration).get("mostly", 1.0)
        pct_valid = match_count / max(match_count + total_count, 1)
        return {"success": pct_valid >= mostly, "result": {"observed_value": pct_valid}}

Soda Core — Data Contracts with SodaCL

Soda Core introduces SodaCL (Soda Checks Language), a YAML DSL for expressing data quality checks directly against SQL data sources. SodaCL checks are readable by non-engineers — data owners, analysts, and product managers can review and modify them without writing SQL. This makes Soda Core an excellent tool for implementing formal data contracts between producers and consumers.

# checks/fct_orders.yml — SodaCL checks for the orders fact table

checks for fct_orders:

  # Freshness: table must have been updated in the last 26 hours
  - freshness(updated_at) < 26h

  # Completeness: row count must not drop by more than 30% vs yesterday
  - change for row_count < -30%

  # Uniqueness: no duplicate primary keys
  - duplicate_count(order_id) = 0

  # Accuracy: no negative revenue
  - invalid_count(order_total_usd) = 0:
      valid min: 0

  # Completeness: order_id and customer_id must always be present
  - missing_count(order_id) = 0
  - missing_count(customer_id) = 0

  # Accuracy: status must be in allowed set
  - invalid_count(status) = 0:
      valid values: [pending, processing, shipped, delivered, cancelled, refunded]

  # Completeness: at most 5% null email (anonymous orders allowed)
  - missing_percent(customer_email) < 5%

  # Consistency: cross-table referential integrity
  - values in (customer_id) must exist in dim_customers (customer_id)
# run_soda_checks.py — Running Soda checks programmatically in a pipeline

from soda.scan import Scan

def run_quality_checks(table: str, checks_file: str, data_source: str) -> bool:
    scan = Scan()
    scan.set_scan_definition_name(f"daily_quality_{table}")
    scan.set_data_source_name(data_source)

    # Load SodaCL checks from YAML file
    scan.add_sodacl_yaml_file(checks_file)

    # Add connection configuration (loaded from environment)
    scan.add_configuration_yaml_str(f"""
data_sources:
  {data_source}:
    type: snowflake
    username: ${SNOWFLAKE_USER}
    password: ${SNOWFLAKE_PASSWORD}
    account: ${SNOWFLAKE_ACCOUNT}
    database: analytics
    schema: public
""")

    exit_code = scan.execute()

    if exit_code != 0:
        print(scan.get_logs_text())
        for check_result in scan.get_checks_fail():
            print(f"FAILED: {check_result.check.name} — {check_result.outcomes}")
        return False

    return True


# In your Airflow DAG or CI pipeline:
if __name__ == "__main__":
    passed = run_quality_checks(
        table="fct_orders",
        checks_file="checks/fct_orders.yml",
        data_source="snowflake_prod",
    )
    if not passed:
        raise SystemExit("Data quality checks failed — pipeline blocked")

Freshness SLOs — Measuring and Alerting on Data Staleness

A freshness SLO formalises your commitment about how current your data is. Without an explicit SLO, "the data should be recent" is an opinion — with an SLO, it is a contract. Define the SLO as a maximum acceptable lag: "The fct_orders table must contain records within 2 hours of their creation time, 99.5% of the time."

-- freshness_monitoring.sql — Query for Prometheus / monitoring scraper

-- Expose the age of the most recent record in each critical table
-- Run this query on a schedule and push metrics to Prometheus Pushgateway

SELECT
    'fct_orders'                            AS table_name,
    DATEDIFF('minute', MAX(created_at), CURRENT_TIMESTAMP()) AS lag_minutes,
    MAX(created_at)                         AS last_record_at
FROM fct_orders

UNION ALL

SELECT
    'fct_events',
    DATEDIFF('minute', MAX(event_time), CURRENT_TIMESTAMP()),
    MAX(event_time)
FROM fct_events

UNION ALL

SELECT
    'dim_products',
    DATEDIFF('minute', MAX(updated_at), CURRENT_TIMESTAMP()),
    MAX(updated_at)
FROM dim_products;
# freshness_exporter.py — Push data freshness metrics to Prometheus Pushgateway

import os
import time
import snowflake.connector
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway

PUSHGATEWAY_URL = os.environ["PUSHGATEWAY_URL"]
SNOWFLAKE_ACCOUNT = os.environ["SNOWFLAKE_ACCOUNT"]
SNOWFLAKE_USER = os.environ["SNOWFLAKE_USER"]
SNOWFLAKE_PASSWORD = os.environ["SNOWFLAKE_PASSWORD"]

FRESHNESS_QUERY = """
SELECT table_name, lag_minutes
FROM (
    SELECT 'fct_orders' AS table_name,
           DATEDIFF('minute', MAX(created_at), CURRENT_TIMESTAMP()) AS lag_minutes
    FROM analytics.public.fct_orders
    UNION ALL
    SELECT 'fct_events',
           DATEDIFF('minute', MAX(event_time), CURRENT_TIMESTAMP())
    FROM analytics.public.fct_events
    UNION ALL
    SELECT 'dim_customers',
           DATEDIFF('minute', MAX(updated_at), CURRENT_TIMESTAMP())
    FROM analytics.public.dim_customers
)
"""

def push_freshness_metrics() -> None:
    registry = CollectorRegistry()
    gauge = Gauge(
        "data_table_lag_minutes",
        "Minutes since the most recent record was written to the table",
        labelnames=["table_name"],
        registry=registry,
    )

    conn = snowflake.connector.connect(
        account=SNOWFLAKE_ACCOUNT,
        user=SNOWFLAKE_USER,
        password=SNOWFLAKE_PASSWORD,
        database="analytics",
        schema="public",
    )

    try:
        with conn.cursor() as cur:
            cur.execute(FRESHNESS_QUERY)
            for table_name, lag_minutes in cur.fetchall():
                gauge.labels(table_name=table_name).set(lag_minutes)
    finally:
        conn.close()

    push_to_gateway(PUSHGATEWAY_URL, job="data_freshness", registry=registry)


if __name__ == "__main__":
    push_freshness_metrics()
# prometheus_alerts.yml — Alerting rules for data freshness SLOs

groups:
  - name: data_quality
    rules:
      # fct_orders must be refreshed at least every 2 hours
      - alert: DataFreshnessViolation_fct_orders
        expr: data_table_lag_minutes{table_name="fct_orders"} > 120
        for: 5m
        labels:
          severity: critical
          team: data-engineering
        annotations:
          summary: "fct_orders is stale ({{ $value }} minutes lag)"
          description: >
            The fct_orders table has not been updated for {{ $value }} minutes.
            SLO threshold is 120 minutes. Check the Airflow DAG
            'fct_orders_daily' for failures.
          runbook_url: "https://wiki.internal/data/runbooks/fct_orders_freshness"

      # fct_events allows up to 30 minutes lag (near-real-time)
      - alert: DataFreshnessViolation_fct_events
        expr: data_table_lag_minutes{table_name="fct_events"} > 30
        for: 2m
        labels:
          severity: warning
          team: data-engineering
        annotations:
          summary: "fct_events is stale ({{ $value }} minutes lag)"
          description: >
            The fct_events table lag exceeds 30 minutes (current: {{ $value }}).
            Check the Kafka consumer group 'events-to-snowflake' for lag buildup.

Note

Define freshness SLO thresholds conservatively at first, then tighten them as you understand your pipeline's actual performance envelope. An alert that fires during every maintenance window trains people to ignore it. Start with a threshold 3× your typical refresh interval and reduce it as your pipeline reliability improves.

Monte Carlo — ML-Based Anomaly Detection

Monte Carlo is the leading commercial data observability platform. Rather than requiring you to hand-craft every quality check, it uses ML models trained on your data's historical patterns to detect anomalies automatically — row count spikes, volume drops, schema changes, field distribution shifts, and freshness violations — without you needing to predict every possible failure mode upfront.

Automatic Monitors

Monte Carlo automatically trains monitors for volume, freshness, schema changes, and field distribution for every table it discovers in your warehouse. No configuration required. The ML models establish a baseline from 14-30 days of history and alert on statistically significant deviations. This gives you coverage across your entire warehouse within hours of connecting Monte Carlo — including tables you didn't know needed monitoring.

Circuit Breakers

Monte Carlo can block downstream pipeline runs when upstream data is anomalous. Configured as API calls within Airflow, dbt, or custom orchestrators, circuit breakers prevent bad data from propagating through your transformation layers. A circuit breaker on fct_orders that detects a 50% volume drop will pause the downstream revenue reporting DAG before it produces incorrect numbers.

Lineage-Aware Incident Routing

Monte Carlo builds an automated data lineage graph by parsing SQL query history. When an incident fires on a table, Monte Carlo shows you every upstream source that feeds it and every downstream consumer affected — dashboards, ML models, reports — so you can immediately assess blast radius and notify the right teams. This transforms triage from hours of manual investigation to minutes of lineage-guided root cause analysis.

# monte_carlo_circuit_breaker.py — Block Airflow DAG on data quality incident
# Uses the Monte Carlo SDK: pip install pycarlo

from pycarlo.core import Client, Session
from pycarlo.features.circuit_breakers import CircuitBreaker

session = Session(mcd_id=MCD_ID, mcd_token=MCD_TOKEN)
client = Client(session=session)

def check_circuit_breaker(table_mcon: str, breach_count_threshold: int = 0) -> bool:
    """
    Returns True if the circuit is OPEN (data is healthy).
    Returns False if the circuit is BROKEN (anomalies detected — block pipeline).
    """
    cb = CircuitBreaker(client=client)
    result = cb.trigger(
        operator="GT",
        threshold=breach_count_threshold,
        table_mcon=table_mcon,
    )
    return result.is_circuit_open


# In an Airflow DAG task:
from airflow.decorators import task

@task
def quality_gate_fct_orders():
    import os
    MCD_ID = os.environ["MONTE_CARLO_ID"]
    MCD_TOKEN = os.environ["MONTE_CARLO_TOKEN"]
    TABLE_MCON = "MCON++your_account++fct_orders++"  # from Monte Carlo UI

    if not check_circuit_breaker(TABLE_MCON):
        raise ValueError(
            "Circuit breaker OPEN: fct_orders has active quality incidents. "
            "Downstream DAG execution blocked."
        )

Data Incident Triage — Runbooks and Root Cause Analysis

Data quality incidents are different from service incidents: they are often invisible to end-users initially, the blast radius expands over time (more downstream consumers get poisoned data), and the root cause is frequently multiple steps removed from where the alert fires. Structured runbooks are essential.

Step 1: Assess the blast radius immediately

When a freshness or quality alert fires, the first question is not 'what failed?' but 'who is affected?'. Check your lineage tool (Monte Carlo, DataHub, or dbt's built-in lineage) for every downstream consumer of the affected table. Notify the owners of affected dashboards and ML models before they discover stale data themselves.

Step 2: Classify the failure type

Pipeline failure (Airflow DAG error, dbt run error) vs. silent data corruption (pipeline succeeded but produced wrong data) vs. source system failure (upstream API stopped delivering records). Each type has a different investigation path. Check Airflow logs first — most incidents are pipeline failures with obvious error messages. If the DAG succeeded, the problem is upstream or in the transformation logic.

Step 3: Establish the bad data window

Determine exactly when the data quality started degrading. Use your freshness metrics timeseries to find when the lag began growing. Query the table's MAX(updated_at) against your monitoring data to bound the window. This determines how far back a backfill must go and which downstream reports need to be invalidated and re-run.

Step 4: Fix forward vs. rollback

For transformation errors: fix the dbt model, run a targeted backfill for the affected date range, trigger downstream models. For source system failures: coordinate with the upstream team, re-request missing data if available, mark affected partitions as invalid in your data catalog. Always prefer idempotent backfills over corrections — your transformation code must produce the same output when run twice on the same input.

Step 5: Blameless post-mortem and check authoring

Every data quality incident that was not caught by an existing check is an opportunity to add one. Within 48 hours of resolution, write a dbt test, Soda check, or GX expectation that would have caught this failure earlier. Track mean time to detection (MTTD) and mean time to recovery (MTTR) for data incidents alongside your service SRE metrics.

Further Reading

Work with us

Struggling with silent data quality failures or building a data observability layer for your platform?

We design and implement data quality observability frameworks — from dbt test suites and Great Expectations Checkpoints to Soda Core data contracts, freshness SLO instrumentation, Monte Carlo circuit breaker integration, and incident triage runbooks. Let’s talk.

Get in touch

Related Articles