Back to Blog
Data EngineeringTestingGreat ExpectationsSchema ValidationData QualityPython

Data Pipeline Testing — Contract Tests, Great Expectations, and Schema Validation

A practical guide to testing data pipelines in production: contract tests between producers and consumers, schema validation with Pydantic, Pandera, and Avro, Great Expectations suites with custom expectations and checkpoint runs, dbt schema tests, and CI/CD data quality gates that block bad data before it reaches downstream consumers.

2026-04-27

Why Data Pipeline Testing Is Different

Software testing frameworks have evolved over decades — unit tests, integration tests, end-to-end tests, contract tests. Data pipelines share these concerns but add a dimension that application code does not: the data itself is the primary artifact under test. A pipeline that runs without raising exceptions can still be catastrophically wrong if the data it produces has unexpected nulls, drift in value distributions, broken foreign keys, or a schema that downstream consumers can no longer parse.

The consequences are asymmetric. A broken API endpoint fails immediately and visibly. A pipeline producing subtly corrupted data may pass silently for days or weeks, poisoning dashboards, ML training sets, and business reports — with the blast radius proportional to how long the issue went undetected. This makes data quality assurance less about catching crashes and more about enforcing invariants: shape, completeness, freshness, referential integrity, and statistical plausibility.

This article covers the tools and patterns that work in production: Great Expectations for declarative data suites, schema enforcement with Pandera and Pydantic, contract testing for producer/consumer boundaries, dbt schema and freshness tests, and CI/CD gates that stop bad data from reaching downstream systems.

LayerWhat you testToolRuns when
Schema validationColumn types, nullability, enumsPandera, Pydantic, AvroOn every batch / message
Data quality expectationsRanges, uniqueness, distributionsGreat ExpectationsPost-load checkpoint
Contract testsProducer/consumer schema agreementPact, Schema RegistryCI before merge
dbt testsUniqueness, not-null, relationshipsdbt test / dbt-expectationsAfter model run
Integration / DAG testsFull pipeline end-to-end correctnesspytest, Airflow TestModeCI on PR / nightly

Schema Validation with Pandera and Pydantic

Schema validation is the cheapest form of data testing — it catches type mismatches, unexpected nulls, and out-of-range values at the point of ingestion before bad data propagates downstream. Two tools cover most use cases: Pydantic for row-level validation of JSON/dict records (perfect for API payloads, Kafka messages, and streaming ingestion), and Pandera for DataFrame-level validation in batch Pandas or Spark pipelines.

Pydantic for Row-Level Validation

Pydantic v2 provides fast, type-annotated validation with detailed error messages. Define a model once and reuse it at ingestion, in API handlers, and in tests. The @field_validator decorator adds custom business logic without leaving the schema definition.

# Schema definition — validates incoming event records from Kafka or API
# pydantic v2 — strict mode rejects coercions (e.g. "123" → 123 would fail)

from datetime import datetime
from decimal import Decimal
from enum import Enum
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Annotated

class OrderStatus(str, Enum):
    PENDING   = "pending"
    CONFIRMED = "confirmed"
    SHIPPED   = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"

class OrderLineItem(BaseModel):
    product_id: str = Field(min_length=1, max_length=64, pattern=r"^[A-Z0-9-]+$")
    quantity:   int = Field(gt=0, le=10_000)
    unit_price: Decimal = Field(gt=Decimal("0"), decimal_places=2)

    @property
    def subtotal(self) -> Decimal:
        return self.unit_price * self.quantity

class OrderEvent(BaseModel):
    model_config = {"strict": True}  # no implicit type coercions

    order_id:   str          = Field(min_length=10, max_length=40)
    customer_id: str         = Field(min_length=1)
    status:     OrderStatus
    created_at: datetime
    items:      list[OrderLineItem] = Field(min_length=1, max_length=500)
    total_amount: Decimal    = Field(gt=Decimal("0"), decimal_places=2)

    @field_validator("order_id")
    @classmethod
    def order_id_must_have_prefix(cls, v: str) -> str:
        if not v.startswith("ORD-"):
            raise ValueError(f"order_id must start with 'ORD-', got: {v!r}")
        return v

    @model_validator(mode="after")
    def total_matches_line_items(self) -> "OrderEvent":
        expected = sum(item.subtotal for item in self.items)
        if abs(self.total_amount - expected) > Decimal("0.01"):
            raise ValueError(
                f"total_amount {self.total_amount} does not match "
                f"sum of line items {expected}"
            )
        return self


# Usage in a Kafka consumer
import json
from pydantic import ValidationError

def process_order_message(raw_bytes: bytes) -> None:
    try:
        payload = json.loads(raw_bytes)
        event   = OrderEvent.model_validate(payload)
        # event is fully validated — downstream code can trust all fields
        handle_order(event)
    except ValidationError as exc:
        # Structured error list — log and route to dead-letter topic
        errors = exc.errors(include_url=False)
        dead_letter_producer.send("orders.dlq", {
            "raw":    payload,
            "errors": errors,
        })
        metrics.increment("orders.validation_failures")

Pandera for DataFrame Validation

Pandera applies schema validation to Pandas DataFrames (and Polars, Spark DataFrames via extensions). Schemas declare column types, constraints, and checks. The @pa.check_types decorator validates function arguments at call time, making validation a natural part of the pipeline function signature.

# Pandera schema for a batch pipeline stage
# Validates DataFrames as they flow between pipeline steps

import pandera as pa
import pandera.typing as pat
from pandera import DataFrameSchema, Column, Check
import pandas as pd
from datetime import date

# Class-based schema — reusable, type-checkable
class RawOrdersSchema(pa.DataFrameModel):
    order_id:    pat.Series[str]   = pa.Field(str_startswith="ORD-", unique=True)
    customer_id: pat.Series[str]   = pa.Field(nullable=False)
    status:      pat.Series[str]   = pa.Field(isin=["pending","confirmed","shipped","delivered","cancelled"])
    order_date:  pat.Series[date]  = pa.Field(nullable=False)
    total_amount: pat.Series[float] = pa.Field(gt=0.0)
    item_count:  pat.Series[int]   = pa.Field(ge=1, le=500)

    class Config:
        coerce = True          # attempt type coercion before validation
        strict = "filter"      # drop extra columns, don't raise on them

class EnrichedOrdersSchema(RawOrdersSchema):
    customer_tier: pat.Series[str] = pa.Field(isin=["bronze","silver","gold","platinum"])
    revenue_bucket: pat.Series[str] = pa.Field(nullable=True)

    @pa.check("total_amount", name="revenue_in_expected_range")
    @classmethod
    def total_amount_range(cls, series: pat.Series[float]) -> pat.Series[bool]:
        # p99 of total_amount should not exceed 500k — flag anomalies
        p99 = series.quantile(0.99)
        return pd.Series([p99 <= 500_000], index=[True])


# Decorator-based validation — schema enforced at function boundaries
@pa.check_types
def enrich_orders(
    df: pat.DataFrame[RawOrdersSchema],
) -> pat.DataFrame[EnrichedOrdersSchema]:
    df = df.copy()
    df["customer_tier"] = assign_customer_tier(df["customer_id"])
    df["revenue_bucket"] = pd.cut(
        df["total_amount"],
        bins=[0, 100, 500, 2000, float("inf")],
        labels=["micro", "small", "medium", "large"],
    ).astype(str)
    return df  # Pandera validates the return type automatically


# Running validation explicitly with detailed error reporting
def validate_batch(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Returns (valid_rows, invalid_rows) after validation."""
    schema = RawOrdersSchema.to_schema()
    try:
        valid_df = schema.validate(df, lazy=True)  # collect ALL errors, not just first
        return valid_df, pd.DataFrame()
    except pa.errors.SchemaErrors as exc:
        failure_cases = exc.failure_cases  # DataFrame of failing rows with reasons
        valid_idx = set(df.index) - set(failure_cases["index"].dropna())
        return df.loc[list(valid_idx)], df.loc[list(set(failure_cases["index"].dropna()))]

Note

Use lazy=True in Pandera's validate() to collect all validation failures in one pass rather than stopping at the first error. In production, report the full failure list as a batch metric — a sudden spike in validation failures is a leading indicator of upstream schema drift before it becomes a downstream incident.

Great Expectations — Declarative Data Quality Suites

Schema validation catches structural problems. Great Expectations catches semantic problems — value distributions drifting, expected relationships between columns breaking, row counts falling outside historical norms. A GE Expectation Suiteis a versioned, declarative specification of what “good data” looks like for a given dataset. It can be generated from a profiler run against sample data, then edited and extended with domain-specific expectations.

Setting Up a Context and Expectation Suite

# Great Expectations v1 — file-based context setup
# Installation: pip install great-expectations

import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest
import pandas as pd

# Initialize a file-based Data Context (stores config in ./gx/)
context = gx.get_context(mode="file")

# Connect to a Pandas datasource
datasource = context.sources.add_pandas("orders_datasource")
asset = datasource.add_dataframe_asset("orders_batch")

# Load a sample batch to profile
df_sample = pd.read_parquet("s3://data-lake/orders/2026-04/sample.parquet")

batch_request = asset.build_batch_request(dataframe=df_sample)
batch_list = context.get_batch_list(batch_request=batch_request)
batch = batch_list[0]

# Auto-profile the batch to generate a starting suite
profiler = context.assistants.onboarding.run(
    batch_request=batch_request,
    exclude_column_names=["_ingested_at", "_pipeline_run_id"],
)

suite = profiler.get_expectation_suite(expectation_suite_name="orders.raw.v1")
context.add_expectation_suite(expectation_suite=suite)

print(f"Generated {len(suite.expectations)} expectations")
# Typical profiler output: 40-80 expectations covering type, range, uniqueness

Writing Custom Expectations

Profiler-generated suites are a starting point. You'll always need domain-specific expectations that the profiler cannot infer: cross-column invariants, business rules, referential integrity against lookup tables, and trend checks. Add these directly to the suite or define a custom Expectation class for reuse across suites.

# Adding domain-specific expectations to a suite
# and defining a custom Expectation class

import great_expectations as gx
from great_expectations.core import ExpectationConfiguration
from great_expectations.expectations.expectation import ColumnPairMapExpectation
from great_expectations.core.expectation_configuration import ExpectationConfiguration
import pandas as pd

# Load existing suite
context = gx.get_context(mode="file")
suite   = context.get_expectation_suite("orders.raw.v1")

# 1. Basic column expectations
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "order_id"},
))

suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_unique",
    kwargs={"column": "order_id"},
))

suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_in_set",
    kwargs={
        "column": "status",
        "value_set": ["pending", "confirmed", "shipped", "delivered", "cancelled"],
    },
))

# 2. Statistical expectations — derived from historical data
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_mean_to_be_between",
    kwargs={
        "column": "total_amount",
        "min_value": 50.0,    # p5 of historical mean
        "max_value": 8000.0,  # p95 of historical mean — alert on extreme drift
    },
))

suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_table_row_count_to_be_between",
    kwargs={
        "min_value": 1000,    # minimum expected daily order count
        "max_value": 500_000, # maximum expected — spike detector
    },
))

# 3. Cross-column expectation — shipped orders must have a ship_date
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_pair_values_to_be_equal",
    kwargs={
        "column_A": "status",
        "column_B": "ship_date",
        # Custom expectation — see class definition below
    },
))

context.save_expectation_suite(suite)


# Custom Expectation: expect shipped orders to have a non-null ship_date
from great_expectations.expectations.expectation import TableExpectation
from great_expectations.execution_engine import (
    PandasExecutionEngine,
    SparkDFExecutionEngine,
)
from great_expectations.expectations.metrics import TableMetricProvider

class ExpectShippedOrdersHaveShipDate(TableExpectation):
    """Shipped and delivered orders must have a non-null ship_date."""

    metric_dependencies = ("table.columns",)
    success_keys = ()

    @classmethod
    def _validate(cls, configuration, metrics, runtime_configuration=None, execution_engine=None):
        # This simplified version uses a Pandas batch
        return {"success": True, "result": {}}  # override with real logic


# Better: use a plain pytest + pandas approach for complex cross-column checks
def test_shipped_orders_have_ship_date(df: pd.DataFrame) -> None:
    shipped = df[df["status"].isin(["shipped", "delivered"])]
    null_ship_date = shipped["ship_date"].isna().sum()
    assert null_ship_date == 0, (
        f"{null_ship_date} shipped/delivered orders have null ship_date"
    )

Checkpoint Runs in CI/CD

A Great Expectations Checkpoint ties together a batch request, an expectation suite, and an action list (what to do when validation passes or fails). In CI, a checkpoint run acts as a data quality gate — failing the pipeline if expectations are violated.

# gx/checkpoints/orders_raw_checkpoint.yml
# Runs after the raw orders table is loaded, before any downstream transforms

name: orders_raw_checkpoint
config_version: 1

validations:
  - batch_request:
      datasource_name:   orders_warehouse
      data_connector_name: default_inferred_data_connector_name
      data_asset_name:   orders_raw
      data_connector_query:
        partition_request:
          partition_index: -1   # most recent partition
    expectation_suite_name: orders.raw.v1

action_list:
  # Always: update DataDocs HTML report
  - name: store_validation_result
    action:
      class_name: StoreValidationResultAction

  - name: update_data_docs
    action:
      class_name: UpdateDataDocsAction

  # On failure: send Slack alert and raise exception (fails the pipeline)
  - name: send_slack_notification_on_failure
    action:
      class_name: SlackNotificationAction
      slack_webhook: ${GE_SLACK_WEBHOOK}
      notify_on: failure
      renderer:
        module_name: great_expectations.render.renderer.slack_renderer
        class_name: SlackRenderer

  - name: fail_on_validation_error
    action:
      class_name: StoreEvaluationParametersAction
# Python: run checkpoint programmatically from Airflow or CI

import great_expectations as gx
import sys

def run_data_quality_gate(
    checkpoint_name: str,
    batch_kwargs: dict | None = None,
) -> bool:
    context = gx.get_context(mode="file")

    result = context.run_checkpoint(
        checkpoint_name=checkpoint_name,
        batch_request=batch_kwargs,
    )

    if not result.success:
        # Print failed expectations summary
        for validation_result in result.run_results.values():
            stats = validation_result["validation_result"]["statistics"]
            print(
                f"Checkpoint FAILED: "
                f"{stats['unsuccessful_expectations']} / "
                f"{stats['evaluated_expectations']} expectations failed"
            )
            # Print individual failures
            for r in validation_result["validation_result"]["results"]:
                if not r["success"]:
                    print(f"  FAIL: {r['expectation_config']['expectation_type']} "
                          f"— {r['expectation_config']['kwargs']}")

    return result.success


# Airflow operator usage
from airflow.decorators import task

@task
def validate_orders_raw() -> None:
    success = run_data_quality_gate("orders_raw_checkpoint")
    if not success:
        raise ValueError("Data quality gate failed — see GE DataDocs for details")

# CI usage (exit code 1 on failure blocks the pipeline)
if __name__ == "__main__":
    ok = run_data_quality_gate(sys.argv[1])
    sys.exit(0 if ok else 1)

Note

Generate DataDocs and publish them to S3 (or another static hosting) on every checkpoint run. DataDocs produce a human-readable HTML report of every expectation result, including failure reasons and sample rows. This is the fastest way for data engineers and analysts to debug quality failures without reading raw JSON results.

Contract Testing — Enforcing Producer/Consumer Boundaries

In microservices, a contract test verifies that a service producing data and a service consuming it agree on the schema, without requiring both to run simultaneously. The same pattern applies to data pipelines: the team that owns the orders table (producer) and the teams that build reports or ML features on top of it (consumers) should have an explicit, versioned contract about what columns exist, what types they have, and what values are valid.

Two complementary approaches: Schema Registry contracts (for event streaming via Kafka/Avro) and dbt contract enforcement (for warehouse tables). Both give producers a clear understanding of what downstream teams depend on, making breaking changes visible before they ship.

Avro Schema Contracts with Schema Registry

# Avro schema for order events — registered in Confluent Schema Registry
# File: schemas/orders/order_created/v2.avsc

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.orders",
  "doc": "Fired when a new order is confirmed. v2 adds customer_tier field.",
  "fields": [
    {"name": "order_id",     "type": "string",            "doc": "ORD- prefixed UUID"},
    {"name": "customer_id",  "type": "string"},
    {"name": "status",       "type": {
      "type": "enum",
      "name": "OrderStatus",
      "symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
    }},
    {"name": "total_amount", "type": {"type": "bytes", "logicalType": "decimal", "precision": 12, "scale": 2}},
    {"name": "customer_tier","type": ["null", "string"], "default": null,
     "doc": "Added in v2 — null for orders before 2026-01-01"}
  ]
}
# Python: register schema, check compatibility, produce typed messages
# pip install confluent-kafka[avro] requests

import json
import requests
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField

SCHEMA_REGISTRY_URL = "http://schema-registry:8081"
TOPIC = "orders.created"

sr_client = SchemaRegistryClient({"url": SCHEMA_REGISTRY_URL})

def register_schema_with_compatibility_check(
    subject: str,
    schema_str: str,
    compatibility: str = "BACKWARD",  # consumers must handle old + new
) -> int:
    """Register schema and verify it does not break existing consumers."""
    # Set subject-level compatibility mode
    sr_client.set_compatibility(subject_name=subject, level=compatibility)

    # Test compatibility before registering
    schema = sr_client.schema(schema_str=schema_str, schema_type="AVRO")
    is_compatible = sr_client.test_compatibility(
        subject_name=subject,
        schema=schema,
    )

    if not is_compatible:
        raise ValueError(
            f"Schema for subject {subject!r} is NOT backward-compatible. "
            "Consumers using the previous schema will fail to deserialize."
        )

    schema_id = sr_client.register_schema(subject_name=subject, schema=schema)
    print(f"Registered schema ID {schema_id} for subject {subject!r}")
    return schema_id


# CI step: verify schema compatibility before merging
def ci_schema_compatibility_check(schema_file: str, subject: str) -> None:
    with open(schema_file) as f:
        schema_str = f.read()
    try:
        register_schema_with_compatibility_check(subject, schema_str)
        print(f"Schema compatibility check PASSED for {subject}")
    except ValueError as e:
        print(f"Schema compatibility check FAILED: {e}")
        raise SystemExit(1)

dbt Contract Enforcement for Warehouse Tables

dbt v1.5+ introduced model contracts, which enforce that a model's output schema matches the declared schema exactly. If you add a column, rename one, or change a type, the run fails unless you update the contract. This is the warehouse equivalent of a consumer-driven contract — downstream models that ref() your model depend on its contract, and breaking it is a compile-time error.

# models/marts/orders/fct_orders.yml
# dbt contract: downstream consumers can depend on this schema being stable

version: 2

models:
  - name: fct_orders
    config:
      contract:
        enforced: true      # fails compilation if schema drifts from declaration

    constraints:
      - type: not_null
        columns: [order_id, customer_id, order_date, total_amount_usd]
      - type: unique
        columns: [order_id]
      - type: check
        expression: "total_amount_usd > 0"

    columns:
      - name: order_id
        data_type: varchar
        constraints:
          - type: not_null
          - type: unique
        tests:
          - not_null
          - unique

      - name: customer_id
        data_type: varchar
        constraints:
          - type: not_null
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id

      - name: status
        data_type: varchar
        tests:
          - accepted_values:
              values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']

      - name: order_date
        data_type: date
        constraints:
          - type: not_null

      - name: total_amount_usd
        data_type: numeric(12,2)
        constraints:
          - type: not_null

      - name: customer_tier
        data_type: varchar
        tests:
          - accepted_values:
              values: ['bronze', 'silver', 'gold', 'platinum']
              quote: false
# dbt-expectations: richer statistical tests beyond built-in dbt tests
# pip install dbt-expectations

# models/marts/orders/fct_orders.yml — additional dbt-expectations tests

      - name: total_amount_usd
        tests:
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0.01
              max_value: 500000
          - dbt_expectations.expect_column_mean_to_be_between:
              min_value: 50
              max_value: 5000
              group_by: [order_date]     # check mean per day — catches date anomalies

# Table-level freshness and row count tests
    tests:
      - dbt_expectations.expect_table_row_count_to_be_between:
          min_value: 500
          max_value: 1000000

      - dbt_expectations.expect_table_row_count_to_equal_other_table:
          compare_model: ref('stg_orders')   # mart should have same count as staging
          factor: 1.0

# Freshness check — fail if the table hasn't been updated in 24 hours
    freshness:
      warn_after: {count: 12, period: hour}
      error_after: {count: 24, period: hour}
    loaded_at_field: _loaded_at

Note

Run dbt test --select state:modified+ in CI to test only models that changed and their downstream dependents. This cuts CI time dramatically for large projects while still catching regressions. Pair with dbt build --fail-fast in production to abort a run if any test fails before downstream models are built on bad data.

Integration Testing — Testing Full DAGs with Isolated Data

Unit and schema tests verify individual functions and column constraints. Integration tests verify that the full pipeline — from source ingestion through all transforms to the final output — produces correct results for a known input. The key challenge is isolation: you need a reproducible input dataset small enough to run quickly, and an output target that does not touch production.

# pytest integration test for an Airflow DAG
# Uses a local SQLite database and a fixture that seeds known test data
# pip install apache-airflow pytest pytest-airflow

import pytest
import pandas as pd
from datetime import date
from sqlalchemy import create_engine
from airflow.models import DagBag

# ── Fixtures ──────────────────────────────────────────────────────────────────

@pytest.fixture(scope="session")
def test_engine():
    """In-memory SQLite engine for pipeline tests."""
    engine = create_engine("sqlite:///:memory:")
    yield engine
    engine.dispose()

@pytest.fixture()
def seed_orders(test_engine):
    """Seed a small, deterministic orders dataset."""
    orders = pd.DataFrame({
        "order_id":     ["ORD-001", "ORD-002", "ORD-003"],
        "customer_id":  ["C1", "C2", "C1"],
        "status":       ["delivered", "confirmed", "shipped"],
        "order_date":   [date(2026, 4, 1), date(2026, 4, 2), date(2026, 4, 2)],
        "total_amount": [150.00, 89.99, 432.50],
    })
    orders.to_sql("orders_raw", test_engine, if_exists="replace", index=False)
    return orders


# ── Unit tests for transform functions ────────────────────────────────────────

from myproject.transforms.orders import (
    assign_customer_tier,
    calculate_revenue_bucket,
    build_fct_orders,
)

def test_assign_customer_tier_gold(seed_orders, test_engine):
    """Customers with >= 2 orders in the period should be silver or above."""
    df = pd.read_sql("SELECT * FROM orders_raw", test_engine)
    result = assign_customer_tier(df)
    c1_tier = result[result["customer_id"] == "C1"]["customer_tier"].iloc[0]
    assert c1_tier in ("silver", "gold", "platinum")

def test_revenue_bucket_small():
    row = pd.Series({"total_amount": 89.99})
    assert calculate_revenue_bucket(row) == "small"

def test_revenue_bucket_medium():
    row = pd.Series({"total_amount": 432.50})
    assert calculate_revenue_bucket(row) == "medium"


# ── End-to-end DAG test ───────────────────────────────────────────────────────

def test_fct_orders_output_schema(seed_orders, test_engine):
    """Run the full transform and validate the output schema."""
    raw = pd.read_sql("SELECT * FROM orders_raw", test_engine)
    result = build_fct_orders(raw)

    required_columns = {
        "order_id", "customer_id", "status", "order_date",
        "total_amount_usd", "customer_tier", "revenue_bucket",
    }
    assert required_columns.issubset(set(result.columns)), (
        f"Missing columns: {required_columns - set(result.columns)}"
    )

def test_fct_orders_no_nulls_in_required_columns(seed_orders, test_engine):
    raw = pd.read_sql("SELECT * FROM orders_raw", test_engine)
    result = build_fct_orders(raw)

    for col in ["order_id", "customer_id", "status", "total_amount_usd"]:
        null_count = result[col].isna().sum()
        assert null_count == 0, f"Column {col!r} has {null_count} nulls"

def test_fct_orders_total_amount_usd_positive(seed_orders, test_engine):
    raw = pd.read_sql("SELECT * FROM orders_raw", test_engine)
    result = build_fct_orders(raw)
    assert (result["total_amount_usd"] > 0).all()

def test_fct_orders_row_count_matches_source(seed_orders, test_engine):
    raw = pd.read_sql("SELECT * FROM orders_raw", test_engine)
    result = build_fct_orders(raw)
    assert len(result) == len(raw), (
        f"Row count mismatch: input={len(raw)}, output={len(result)}"
    )

CI/CD Data Quality Gates

Data quality checks are most valuable when they run automatically on every commit and block merges or deployments when they fail. A CI data quality pipeline typically has three stages: static analysis (schema compatibility checks, dbt compile), unit/integration tests (pytest with fixtures), and data quality validation (Great Expectations checkpoint against a sample dataset or a staging environment snapshot).

# .github/workflows/data-quality.yml
# Runs on every PR targeting main — blocks merge on any failure

name: Data Quality

on:
  pull_request:
    branches: [main]
    paths:
      - 'pipelines/**'
      - 'models/**'
      - 'schemas/**'
      - 'tests/**'

jobs:
  schema-compatibility:
    name: Schema Compatibility
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"

      - name: Install dependencies
        run: pip install confluent-kafka[avro] great-expectations pandera dbt-core dbt-snowflake

      - name: Check Avro schema compatibility
        env:
          SCHEMA_REGISTRY_URL: ${{ secrets.SCHEMA_REGISTRY_URL }}
        run: |
          python scripts/ci_schema_check.py schemas/ --registry $SCHEMA_REGISTRY_URL

      - name: Compile dbt project
        env:
          DBT_PROFILES_DIR: .dbt
          SNOWFLAKE_ACCOUNT:  ${{ secrets.SNOWFLAKE_ACCOUNT }}
          SNOWFLAKE_USER:     ${{ secrets.SNOWFLAKE_USER }}
          SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
        run: |
          dbt compile --target ci

  unit-tests:
    name: Unit & Integration Tests
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"

      - name: Install dependencies
        run: pip install -r requirements-dev.txt

      - name: Run pytest
        run: |
          pytest tests/ -v --tb=short             --junit-xml=test-results/pytest.xml             --cov=pipelines --cov-report=xml

      - name: Upload test results
        uses: actions/upload-artifact@v4
        if: always()
        with:
          name: test-results
          path: test-results/

  data-quality-gate:
    name: Data Quality Gate
    runs-on: ubuntu-latest
    needs: [schema-compatibility, unit-tests]
    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"

      - name: Install Great Expectations
        run: pip install great-expectations

      - name: Run GE checkpoints against staging data
        env:
          GE_SLACK_WEBHOOK: ${{ secrets.GE_SLACK_WEBHOOK }}
        run: |
          python scripts/run_checkpoints.py             --checkpoint orders_raw_checkpoint             --checkpoint fct_orders_checkpoint             --environment staging

      - name: Upload DataDocs
        uses: actions/upload-artifact@v4
        if: always()
        with:
          name: ge-datadocs
          path: gx/uncommitted/data_docs/

Note

Do not run Great Expectations checkpoints against full production datasets in CI — they are slow and expensive. Instead, maintain a CI sample dataset: a recent, representative 1–5% sample of production data stored in a versioned location (e.g., S3 or a staging database). Refresh this sample weekly. Your expectations must hold on the sample; if they do not, the pipeline is broken before it merges.

Production Data Observability — Beyond CI Tests

CI tests catch problems in code before deployment. Production monitoring catches problems in data after deployment — schema drift caused by upstream API changes, sudden drops in row count caused by a failed upstream sync, or distribution shift in a feature column used by an ML model. This is where dedicated data observability tools complement GE checkpoints.

Freshness monitoring

Track the MAX(_loaded_at) of every critical table and alert when it exceeds the expected update interval. A table that should refresh hourly and has not been updated in 3 hours is a pipeline failure — often caught before anyone notices stale dashboards. Use dbt's source freshness command or a Prometheus exporter that queries table metadata.

Row count anomaly detection

A sudden 50% drop in daily order count could be a business event (outage) or a pipeline failure (incorrect filter applied to the staging table). Track row counts per pipeline run as a time series and alert on Z-score > 3 from the 30-day rolling mean. Most data warehouses expose row counts in their information schema — a simple query is enough.

Null rate drift

Upstream API changes frequently introduce new nullable fields or stop populating previously required fields. Monitor the null rate per column per day for critical tables. A column that was 0% null for three months suddenly hitting 30% null is a contract violation from upstream — worth detecting before the ML team's feature pipeline breaks.

Schema change detection

Snapshot the column list and data types of every critical table daily and diff against the previous snapshot. A column rename or type change from an upstream team can break dozens of downstream models silently. Tools like Monte Carlo and Elementary automate this; a simple Python script querying information_schema.columns stored in a version-controlled YAML file works too.

# Elementary dbt package — data observability from within dbt
# Adds automatic anomaly detection, schema change monitoring, and DataDocs
# Installation: add to packages.yml

# packages.yml
packages:
  - package: elementary-data/elementary
    version: 0.14.1

# models/marts/orders/fct_orders.yml — add Elementary monitors

models:
  - name: fct_orders
    config:
      elementary:
        timestamp_column: order_date     # partition column for freshness checks

    columns:
      - name: total_amount_usd
        tests:
          - elementary.column_anomalies:
              column_anomalies:
            # Automatically detects: null rate, zero rate, min/max/avg drift
            # Alerts on deviations beyond 3 sigma from 14-day rolling baseline
              timestamp_column: order_date

      - name: status
        tests:
          - elementary.all_columns_anomalies:
              column_anomalies:
                - null_count
                - null_percent

    tests:
      - elementary.table_anomalies:
          table_anomalies:
            - row_count       # alert on unusual row count changes
            - freshness       # alert if table not updated on schedule


# After running: dbt test --select elementary
# View report: edr report  (Elementary CLI)

Data Pipeline Testing Checklist

Define contracts before writing tests

Start with a written contract: what columns exist, what types they have, what values are valid, and what row counts are expected. This is the source of truth for your Pandera schemas, GE suites, and dbt contracts. Without a contract, tests are arbitrary and brittle.

Validate at every pipeline boundary

Run schema validation at ingestion (before bad data enters the lake), after each major transform (before building downstream artifacts), and at the final output (before consumers read it). Each boundary is an opportunity to catch a different class of failure.

Route validation failures to a dead-letter queue

Invalid records should not be silently dropped or cause the entire batch to fail. Route them to a dead-letter partition or table, with the original record and the validation error attached. This enables manual review, reprocessing after a fix, and accurate bad-data rate metrics.

Keep your CI sample dataset current

Refresh the representative sample used in CI at least weekly. Sample datasets go stale — a schema addition in production that the sample does not include will cause your CI tests to pass while production fails. Automate the refresh as a scheduled pipeline run.

Version your expectation suites alongside your models

GE suites, Pandera schemas, and dbt YAML files are code — commit them in the same PR as the model changes they describe. A model refactor that changes output column names must update its expectation suite in the same commit. This prevents the common situation where tests become stale and eventually useless.

Distinguish blocking vs. warning failures

Not all expectation failures are equally critical. A primary key uniqueness violation should halt the pipeline — duplicates corrupt downstream joins. A statistical expectation for mean order value being slightly out of range might warrant a Slack alert but not a pipeline stop. Use GE's mostly parameter and dbt's warn_if / error_if thresholds to express this distinction.

Building or testing data pipelines and struggling with silent failures?

We help engineering teams implement robust data quality frameworks — from contract tests and Great Expectations suites to dbt schema tests, CI/CD data gates, and production monitoring. Let’s talk.

Get in Touch

Related Articles