Why Data Pipeline Testing Is Different
Software testing frameworks have evolved over decades — unit tests, integration tests, end-to-end tests, contract tests. Data pipelines share these concerns but add a dimension that application code does not: the data itself is the primary artifact under test. A pipeline that runs without raising exceptions can still be catastrophically wrong if the data it produces has unexpected nulls, drift in value distributions, broken foreign keys, or a schema that downstream consumers can no longer parse.
The consequences are asymmetric. A broken API endpoint fails immediately and visibly. A pipeline producing subtly corrupted data may pass silently for days or weeks, poisoning dashboards, ML training sets, and business reports — with the blast radius proportional to how long the issue went undetected. This makes data quality assurance less about catching crashes and more about enforcing invariants: shape, completeness, freshness, referential integrity, and statistical plausibility.
This article covers the tools and patterns that work in production: Great Expectations for declarative data suites, schema enforcement with Pandera and Pydantic, contract testing for producer/consumer boundaries, dbt schema and freshness tests, and CI/CD gates that stop bad data from reaching downstream systems.
| Layer | What you test | Tool | Runs when |
|---|---|---|---|
| Schema validation | Column types, nullability, enums | Pandera, Pydantic, Avro | On every batch / message |
| Data quality expectations | Ranges, uniqueness, distributions | Great Expectations | Post-load checkpoint |
| Contract tests | Producer/consumer schema agreement | Pact, Schema Registry | CI before merge |
| dbt tests | Uniqueness, not-null, relationships | dbt test / dbt-expectations | After model run |
| Integration / DAG tests | Full pipeline end-to-end correctness | pytest, Airflow TestMode | CI on PR / nightly |
Schema Validation with Pandera and Pydantic
Schema validation is the cheapest form of data testing — it catches type mismatches, unexpected nulls, and out-of-range values at the point of ingestion before bad data propagates downstream. Two tools cover most use cases: Pydantic for row-level validation of JSON/dict records (perfect for API payloads, Kafka messages, and streaming ingestion), and Pandera for DataFrame-level validation in batch Pandas or Spark pipelines.
Pydantic for Row-Level Validation
Pydantic v2 provides fast, type-annotated validation with detailed error messages. Define a model once and reuse it at ingestion, in API handlers, and in tests. The @field_validator decorator adds custom business logic without leaving the schema definition.
# Schema definition — validates incoming event records from Kafka or API
# pydantic v2 — strict mode rejects coercions (e.g. "123" → 123 would fail)
from datetime import datetime
from decimal import Decimal
from enum import Enum
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Annotated
class OrderStatus(str, Enum):
PENDING = "pending"
CONFIRMED = "confirmed"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
class OrderLineItem(BaseModel):
product_id: str = Field(min_length=1, max_length=64, pattern=r"^[A-Z0-9-]+$")
quantity: int = Field(gt=0, le=10_000)
unit_price: Decimal = Field(gt=Decimal("0"), decimal_places=2)
@property
def subtotal(self) -> Decimal:
return self.unit_price * self.quantity
class OrderEvent(BaseModel):
model_config = {"strict": True} # no implicit type coercions
order_id: str = Field(min_length=10, max_length=40)
customer_id: str = Field(min_length=1)
status: OrderStatus
created_at: datetime
items: list[OrderLineItem] = Field(min_length=1, max_length=500)
total_amount: Decimal = Field(gt=Decimal("0"), decimal_places=2)
@field_validator("order_id")
@classmethod
def order_id_must_have_prefix(cls, v: str) -> str:
if not v.startswith("ORD-"):
raise ValueError(f"order_id must start with 'ORD-', got: {v!r}")
return v
@model_validator(mode="after")
def total_matches_line_items(self) -> "OrderEvent":
expected = sum(item.subtotal for item in self.items)
if abs(self.total_amount - expected) > Decimal("0.01"):
raise ValueError(
f"total_amount {self.total_amount} does not match "
f"sum of line items {expected}"
)
return self
# Usage in a Kafka consumer
import json
from pydantic import ValidationError
def process_order_message(raw_bytes: bytes) -> None:
try:
payload = json.loads(raw_bytes)
event = OrderEvent.model_validate(payload)
# event is fully validated — downstream code can trust all fields
handle_order(event)
except ValidationError as exc:
# Structured error list — log and route to dead-letter topic
errors = exc.errors(include_url=False)
dead_letter_producer.send("orders.dlq", {
"raw": payload,
"errors": errors,
})
metrics.increment("orders.validation_failures")Pandera for DataFrame Validation
Pandera applies schema validation to Pandas DataFrames (and Polars, Spark DataFrames via extensions). Schemas declare column types, constraints, and checks. The @pa.check_types decorator validates function arguments at call time, making validation a natural part of the pipeline function signature.
# Pandera schema for a batch pipeline stage
# Validates DataFrames as they flow between pipeline steps
import pandera as pa
import pandera.typing as pat
from pandera import DataFrameSchema, Column, Check
import pandas as pd
from datetime import date
# Class-based schema — reusable, type-checkable
class RawOrdersSchema(pa.DataFrameModel):
order_id: pat.Series[str] = pa.Field(str_startswith="ORD-", unique=True)
customer_id: pat.Series[str] = pa.Field(nullable=False)
status: pat.Series[str] = pa.Field(isin=["pending","confirmed","shipped","delivered","cancelled"])
order_date: pat.Series[date] = pa.Field(nullable=False)
total_amount: pat.Series[float] = pa.Field(gt=0.0)
item_count: pat.Series[int] = pa.Field(ge=1, le=500)
class Config:
coerce = True # attempt type coercion before validation
strict = "filter" # drop extra columns, don't raise on them
class EnrichedOrdersSchema(RawOrdersSchema):
customer_tier: pat.Series[str] = pa.Field(isin=["bronze","silver","gold","platinum"])
revenue_bucket: pat.Series[str] = pa.Field(nullable=True)
@pa.check("total_amount", name="revenue_in_expected_range")
@classmethod
def total_amount_range(cls, series: pat.Series[float]) -> pat.Series[bool]:
# p99 of total_amount should not exceed 500k — flag anomalies
p99 = series.quantile(0.99)
return pd.Series([p99 <= 500_000], index=[True])
# Decorator-based validation — schema enforced at function boundaries
@pa.check_types
def enrich_orders(
df: pat.DataFrame[RawOrdersSchema],
) -> pat.DataFrame[EnrichedOrdersSchema]:
df = df.copy()
df["customer_tier"] = assign_customer_tier(df["customer_id"])
df["revenue_bucket"] = pd.cut(
df["total_amount"],
bins=[0, 100, 500, 2000, float("inf")],
labels=["micro", "small", "medium", "large"],
).astype(str)
return df # Pandera validates the return type automatically
# Running validation explicitly with detailed error reporting
def validate_batch(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Returns (valid_rows, invalid_rows) after validation."""
schema = RawOrdersSchema.to_schema()
try:
valid_df = schema.validate(df, lazy=True) # collect ALL errors, not just first
return valid_df, pd.DataFrame()
except pa.errors.SchemaErrors as exc:
failure_cases = exc.failure_cases # DataFrame of failing rows with reasons
valid_idx = set(df.index) - set(failure_cases["index"].dropna())
return df.loc[list(valid_idx)], df.loc[list(set(failure_cases["index"].dropna()))]Note
lazy=True in Pandera's validate() to collect all validation failures in one pass rather than stopping at the first error. In production, report the full failure list as a batch metric — a sudden spike in validation failures is a leading indicator of upstream schema drift before it becomes a downstream incident.Great Expectations — Declarative Data Quality Suites
Schema validation catches structural problems. Great Expectations catches semantic problems — value distributions drifting, expected relationships between columns breaking, row counts falling outside historical norms. A GE Expectation Suiteis a versioned, declarative specification of what “good data” looks like for a given dataset. It can be generated from a profiler run against sample data, then edited and extended with domain-specific expectations.
Setting Up a Context and Expectation Suite
# Great Expectations v1 — file-based context setup
# Installation: pip install great-expectations
import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest
import pandas as pd
# Initialize a file-based Data Context (stores config in ./gx/)
context = gx.get_context(mode="file")
# Connect to a Pandas datasource
datasource = context.sources.add_pandas("orders_datasource")
asset = datasource.add_dataframe_asset("orders_batch")
# Load a sample batch to profile
df_sample = pd.read_parquet("s3://data-lake/orders/2026-04/sample.parquet")
batch_request = asset.build_batch_request(dataframe=df_sample)
batch_list = context.get_batch_list(batch_request=batch_request)
batch = batch_list[0]
# Auto-profile the batch to generate a starting suite
profiler = context.assistants.onboarding.run(
batch_request=batch_request,
exclude_column_names=["_ingested_at", "_pipeline_run_id"],
)
suite = profiler.get_expectation_suite(expectation_suite_name="orders.raw.v1")
context.add_expectation_suite(expectation_suite=suite)
print(f"Generated {len(suite.expectations)} expectations")
# Typical profiler output: 40-80 expectations covering type, range, uniquenessWriting Custom Expectations
Profiler-generated suites are a starting point. You'll always need domain-specific expectations that the profiler cannot infer: cross-column invariants, business rules, referential integrity against lookup tables, and trend checks. Add these directly to the suite or define a custom Expectation class for reuse across suites.
# Adding domain-specific expectations to a suite
# and defining a custom Expectation class
import great_expectations as gx
from great_expectations.core import ExpectationConfiguration
from great_expectations.expectations.expectation import ColumnPairMapExpectation
from great_expectations.core.expectation_configuration import ExpectationConfiguration
import pandas as pd
# Load existing suite
context = gx.get_context(mode="file")
suite = context.get_expectation_suite("orders.raw.v1")
# 1. Basic column expectations
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "order_id"},
))
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_unique",
kwargs={"column": "order_id"},
))
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": "status",
"value_set": ["pending", "confirmed", "shipped", "delivered", "cancelled"],
},
))
# 2. Statistical expectations — derived from historical data
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_mean_to_be_between",
kwargs={
"column": "total_amount",
"min_value": 50.0, # p5 of historical mean
"max_value": 8000.0, # p95 of historical mean — alert on extreme drift
},
))
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 1000, # minimum expected daily order count
"max_value": 500_000, # maximum expected — spike detector
},
))
# 3. Cross-column expectation — shipped orders must have a ship_date
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_pair_values_to_be_equal",
kwargs={
"column_A": "status",
"column_B": "ship_date",
# Custom expectation — see class definition below
},
))
context.save_expectation_suite(suite)
# Custom Expectation: expect shipped orders to have a non-null ship_date
from great_expectations.expectations.expectation import TableExpectation
from great_expectations.execution_engine import (
PandasExecutionEngine,
SparkDFExecutionEngine,
)
from great_expectations.expectations.metrics import TableMetricProvider
class ExpectShippedOrdersHaveShipDate(TableExpectation):
"""Shipped and delivered orders must have a non-null ship_date."""
metric_dependencies = ("table.columns",)
success_keys = ()
@classmethod
def _validate(cls, configuration, metrics, runtime_configuration=None, execution_engine=None):
# This simplified version uses a Pandas batch
return {"success": True, "result": {}} # override with real logic
# Better: use a plain pytest + pandas approach for complex cross-column checks
def test_shipped_orders_have_ship_date(df: pd.DataFrame) -> None:
shipped = df[df["status"].isin(["shipped", "delivered"])]
null_ship_date = shipped["ship_date"].isna().sum()
assert null_ship_date == 0, (
f"{null_ship_date} shipped/delivered orders have null ship_date"
)Checkpoint Runs in CI/CD
A Great Expectations Checkpoint ties together a batch request, an expectation suite, and an action list (what to do when validation passes or fails). In CI, a checkpoint run acts as a data quality gate — failing the pipeline if expectations are violated.
# gx/checkpoints/orders_raw_checkpoint.yml
# Runs after the raw orders table is loaded, before any downstream transforms
name: orders_raw_checkpoint
config_version: 1
validations:
- batch_request:
datasource_name: orders_warehouse
data_connector_name: default_inferred_data_connector_name
data_asset_name: orders_raw
data_connector_query:
partition_request:
partition_index: -1 # most recent partition
expectation_suite_name: orders.raw.v1
action_list:
# Always: update DataDocs HTML report
- name: store_validation_result
action:
class_name: StoreValidationResultAction
- name: update_data_docs
action:
class_name: UpdateDataDocsAction
# On failure: send Slack alert and raise exception (fails the pipeline)
- name: send_slack_notification_on_failure
action:
class_name: SlackNotificationAction
slack_webhook: ${GE_SLACK_WEBHOOK}
notify_on: failure
renderer:
module_name: great_expectations.render.renderer.slack_renderer
class_name: SlackRenderer
- name: fail_on_validation_error
action:
class_name: StoreEvaluationParametersAction# Python: run checkpoint programmatically from Airflow or CI
import great_expectations as gx
import sys
def run_data_quality_gate(
checkpoint_name: str,
batch_kwargs: dict | None = None,
) -> bool:
context = gx.get_context(mode="file")
result = context.run_checkpoint(
checkpoint_name=checkpoint_name,
batch_request=batch_kwargs,
)
if not result.success:
# Print failed expectations summary
for validation_result in result.run_results.values():
stats = validation_result["validation_result"]["statistics"]
print(
f"Checkpoint FAILED: "
f"{stats['unsuccessful_expectations']} / "
f"{stats['evaluated_expectations']} expectations failed"
)
# Print individual failures
for r in validation_result["validation_result"]["results"]:
if not r["success"]:
print(f" FAIL: {r['expectation_config']['expectation_type']} "
f"— {r['expectation_config']['kwargs']}")
return result.success
# Airflow operator usage
from airflow.decorators import task
@task
def validate_orders_raw() -> None:
success = run_data_quality_gate("orders_raw_checkpoint")
if not success:
raise ValueError("Data quality gate failed — see GE DataDocs for details")
# CI usage (exit code 1 on failure blocks the pipeline)
if __name__ == "__main__":
ok = run_data_quality_gate(sys.argv[1])
sys.exit(0 if ok else 1)Note
Contract Testing — Enforcing Producer/Consumer Boundaries
In microservices, a contract test verifies that a service producing data and a service consuming it agree on the schema, without requiring both to run simultaneously. The same pattern applies to data pipelines: the team that owns the orders table (producer) and the teams that build reports or ML features on top of it (consumers) should have an explicit, versioned contract about what columns exist, what types they have, and what values are valid.
Two complementary approaches: Schema Registry contracts (for event streaming via Kafka/Avro) and dbt contract enforcement (for warehouse tables). Both give producers a clear understanding of what downstream teams depend on, making breaking changes visible before they ship.
Avro Schema Contracts with Schema Registry
# Avro schema for order events — registered in Confluent Schema Registry
# File: schemas/orders/order_created/v2.avsc
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.orders",
"doc": "Fired when a new order is confirmed. v2 adds customer_tier field.",
"fields": [
{"name": "order_id", "type": "string", "doc": "ORD- prefixed UUID"},
{"name": "customer_id", "type": "string"},
{"name": "status", "type": {
"type": "enum",
"name": "OrderStatus",
"symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
}},
{"name": "total_amount", "type": {"type": "bytes", "logicalType": "decimal", "precision": 12, "scale": 2}},
{"name": "customer_tier","type": ["null", "string"], "default": null,
"doc": "Added in v2 — null for orders before 2026-01-01"}
]
}# Python: register schema, check compatibility, produce typed messages
# pip install confluent-kafka[avro] requests
import json
import requests
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
SCHEMA_REGISTRY_URL = "http://schema-registry:8081"
TOPIC = "orders.created"
sr_client = SchemaRegistryClient({"url": SCHEMA_REGISTRY_URL})
def register_schema_with_compatibility_check(
subject: str,
schema_str: str,
compatibility: str = "BACKWARD", # consumers must handle old + new
) -> int:
"""Register schema and verify it does not break existing consumers."""
# Set subject-level compatibility mode
sr_client.set_compatibility(subject_name=subject, level=compatibility)
# Test compatibility before registering
schema = sr_client.schema(schema_str=schema_str, schema_type="AVRO")
is_compatible = sr_client.test_compatibility(
subject_name=subject,
schema=schema,
)
if not is_compatible:
raise ValueError(
f"Schema for subject {subject!r} is NOT backward-compatible. "
"Consumers using the previous schema will fail to deserialize."
)
schema_id = sr_client.register_schema(subject_name=subject, schema=schema)
print(f"Registered schema ID {schema_id} for subject {subject!r}")
return schema_id
# CI step: verify schema compatibility before merging
def ci_schema_compatibility_check(schema_file: str, subject: str) -> None:
with open(schema_file) as f:
schema_str = f.read()
try:
register_schema_with_compatibility_check(subject, schema_str)
print(f"Schema compatibility check PASSED for {subject}")
except ValueError as e:
print(f"Schema compatibility check FAILED: {e}")
raise SystemExit(1)dbt Contract Enforcement for Warehouse Tables
dbt v1.5+ introduced model contracts, which enforce that a model's output schema matches the declared schema exactly. If you add a column, rename one, or change a type, the run fails unless you update the contract. This is the warehouse equivalent of a consumer-driven contract — downstream models that ref() your model depend on its contract, and breaking it is a compile-time error.
# models/marts/orders/fct_orders.yml
# dbt contract: downstream consumers can depend on this schema being stable
version: 2
models:
- name: fct_orders
config:
contract:
enforced: true # fails compilation if schema drifts from declaration
constraints:
- type: not_null
columns: [order_id, customer_id, order_date, total_amount_usd]
- type: unique
columns: [order_id]
- type: check
expression: "total_amount_usd > 0"
columns:
- name: order_id
data_type: varchar
constraints:
- type: not_null
- type: unique
tests:
- not_null
- unique
- name: customer_id
data_type: varchar
constraints:
- type: not_null
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: status
data_type: varchar
tests:
- accepted_values:
values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']
- name: order_date
data_type: date
constraints:
- type: not_null
- name: total_amount_usd
data_type: numeric(12,2)
constraints:
- type: not_null
- name: customer_tier
data_type: varchar
tests:
- accepted_values:
values: ['bronze', 'silver', 'gold', 'platinum']
quote: false# dbt-expectations: richer statistical tests beyond built-in dbt tests
# pip install dbt-expectations
# models/marts/orders/fct_orders.yml — additional dbt-expectations tests
- name: total_amount_usd
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0.01
max_value: 500000
- dbt_expectations.expect_column_mean_to_be_between:
min_value: 50
max_value: 5000
group_by: [order_date] # check mean per day — catches date anomalies
# Table-level freshness and row count tests
tests:
- dbt_expectations.expect_table_row_count_to_be_between:
min_value: 500
max_value: 1000000
- dbt_expectations.expect_table_row_count_to_equal_other_table:
compare_model: ref('stg_orders') # mart should have same count as staging
factor: 1.0
# Freshness check — fail if the table hasn't been updated in 24 hours
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
loaded_at_field: _loaded_atNote
dbt test --select state:modified+ in CI to test only models that changed and their downstream dependents. This cuts CI time dramatically for large projects while still catching regressions. Pair with dbt build --fail-fast in production to abort a run if any test fails before downstream models are built on bad data.Integration Testing — Testing Full DAGs with Isolated Data
Unit and schema tests verify individual functions and column constraints. Integration tests verify that the full pipeline — from source ingestion through all transforms to the final output — produces correct results for a known input. The key challenge is isolation: you need a reproducible input dataset small enough to run quickly, and an output target that does not touch production.
# pytest integration test for an Airflow DAG
# Uses a local SQLite database and a fixture that seeds known test data
# pip install apache-airflow pytest pytest-airflow
import pytest
import pandas as pd
from datetime import date
from sqlalchemy import create_engine
from airflow.models import DagBag
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture(scope="session")
def test_engine():
"""In-memory SQLite engine for pipeline tests."""
engine = create_engine("sqlite:///:memory:")
yield engine
engine.dispose()
@pytest.fixture()
def seed_orders(test_engine):
"""Seed a small, deterministic orders dataset."""
orders = pd.DataFrame({
"order_id": ["ORD-001", "ORD-002", "ORD-003"],
"customer_id": ["C1", "C2", "C1"],
"status": ["delivered", "confirmed", "shipped"],
"order_date": [date(2026, 4, 1), date(2026, 4, 2), date(2026, 4, 2)],
"total_amount": [150.00, 89.99, 432.50],
})
orders.to_sql("orders_raw", test_engine, if_exists="replace", index=False)
return orders
# ── Unit tests for transform functions ────────────────────────────────────────
from myproject.transforms.orders import (
assign_customer_tier,
calculate_revenue_bucket,
build_fct_orders,
)
def test_assign_customer_tier_gold(seed_orders, test_engine):
"""Customers with >= 2 orders in the period should be silver or above."""
df = pd.read_sql("SELECT * FROM orders_raw", test_engine)
result = assign_customer_tier(df)
c1_tier = result[result["customer_id"] == "C1"]["customer_tier"].iloc[0]
assert c1_tier in ("silver", "gold", "platinum")
def test_revenue_bucket_small():
row = pd.Series({"total_amount": 89.99})
assert calculate_revenue_bucket(row) == "small"
def test_revenue_bucket_medium():
row = pd.Series({"total_amount": 432.50})
assert calculate_revenue_bucket(row) == "medium"
# ── End-to-end DAG test ───────────────────────────────────────────────────────
def test_fct_orders_output_schema(seed_orders, test_engine):
"""Run the full transform and validate the output schema."""
raw = pd.read_sql("SELECT * FROM orders_raw", test_engine)
result = build_fct_orders(raw)
required_columns = {
"order_id", "customer_id", "status", "order_date",
"total_amount_usd", "customer_tier", "revenue_bucket",
}
assert required_columns.issubset(set(result.columns)), (
f"Missing columns: {required_columns - set(result.columns)}"
)
def test_fct_orders_no_nulls_in_required_columns(seed_orders, test_engine):
raw = pd.read_sql("SELECT * FROM orders_raw", test_engine)
result = build_fct_orders(raw)
for col in ["order_id", "customer_id", "status", "total_amount_usd"]:
null_count = result[col].isna().sum()
assert null_count == 0, f"Column {col!r} has {null_count} nulls"
def test_fct_orders_total_amount_usd_positive(seed_orders, test_engine):
raw = pd.read_sql("SELECT * FROM orders_raw", test_engine)
result = build_fct_orders(raw)
assert (result["total_amount_usd"] > 0).all()
def test_fct_orders_row_count_matches_source(seed_orders, test_engine):
raw = pd.read_sql("SELECT * FROM orders_raw", test_engine)
result = build_fct_orders(raw)
assert len(result) == len(raw), (
f"Row count mismatch: input={len(raw)}, output={len(result)}"
)CI/CD Data Quality Gates
Data quality checks are most valuable when they run automatically on every commit and block merges or deployments when they fail. A CI data quality pipeline typically has three stages: static analysis (schema compatibility checks, dbt compile), unit/integration tests (pytest with fixtures), and data quality validation (Great Expectations checkpoint against a sample dataset or a staging environment snapshot).
# .github/workflows/data-quality.yml
# Runs on every PR targeting main — blocks merge on any failure
name: Data Quality
on:
pull_request:
branches: [main]
paths:
- 'pipelines/**'
- 'models/**'
- 'schemas/**'
- 'tests/**'
jobs:
schema-compatibility:
name: Schema Compatibility
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: pip install confluent-kafka[avro] great-expectations pandera dbt-core dbt-snowflake
- name: Check Avro schema compatibility
env:
SCHEMA_REGISTRY_URL: ${{ secrets.SCHEMA_REGISTRY_URL }}
run: |
python scripts/ci_schema_check.py schemas/ --registry $SCHEMA_REGISTRY_URL
- name: Compile dbt project
env:
DBT_PROFILES_DIR: .dbt
SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
run: |
dbt compile --target ci
unit-tests:
name: Unit & Integration Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: pip install -r requirements-dev.txt
- name: Run pytest
run: |
pytest tests/ -v --tb=short --junit-xml=test-results/pytest.xml --cov=pipelines --cov-report=xml
- name: Upload test results
uses: actions/upload-artifact@v4
if: always()
with:
name: test-results
path: test-results/
data-quality-gate:
name: Data Quality Gate
runs-on: ubuntu-latest
needs: [schema-compatibility, unit-tests]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install Great Expectations
run: pip install great-expectations
- name: Run GE checkpoints against staging data
env:
GE_SLACK_WEBHOOK: ${{ secrets.GE_SLACK_WEBHOOK }}
run: |
python scripts/run_checkpoints.py --checkpoint orders_raw_checkpoint --checkpoint fct_orders_checkpoint --environment staging
- name: Upload DataDocs
uses: actions/upload-artifact@v4
if: always()
with:
name: ge-datadocs
path: gx/uncommitted/data_docs/Note
Production Data Observability — Beyond CI Tests
CI tests catch problems in code before deployment. Production monitoring catches problems in data after deployment — schema drift caused by upstream API changes, sudden drops in row count caused by a failed upstream sync, or distribution shift in a feature column used by an ML model. This is where dedicated data observability tools complement GE checkpoints.
Freshness monitoring
Track the MAX(_loaded_at) of every critical table and alert when it exceeds the expected update interval. A table that should refresh hourly and has not been updated in 3 hours is a pipeline failure — often caught before anyone notices stale dashboards. Use dbt's source freshness command or a Prometheus exporter that queries table metadata.
Row count anomaly detection
A sudden 50% drop in daily order count could be a business event (outage) or a pipeline failure (incorrect filter applied to the staging table). Track row counts per pipeline run as a time series and alert on Z-score > 3 from the 30-day rolling mean. Most data warehouses expose row counts in their information schema — a simple query is enough.
Null rate drift
Upstream API changes frequently introduce new nullable fields or stop populating previously required fields. Monitor the null rate per column per day for critical tables. A column that was 0% null for three months suddenly hitting 30% null is a contract violation from upstream — worth detecting before the ML team's feature pipeline breaks.
Schema change detection
Snapshot the column list and data types of every critical table daily and diff against the previous snapshot. A column rename or type change from an upstream team can break dozens of downstream models silently. Tools like Monte Carlo and Elementary automate this; a simple Python script querying information_schema.columns stored in a version-controlled YAML file works too.
# Elementary dbt package — data observability from within dbt
# Adds automatic anomaly detection, schema change monitoring, and DataDocs
# Installation: add to packages.yml
# packages.yml
packages:
- package: elementary-data/elementary
version: 0.14.1
# models/marts/orders/fct_orders.yml — add Elementary monitors
models:
- name: fct_orders
config:
elementary:
timestamp_column: order_date # partition column for freshness checks
columns:
- name: total_amount_usd
tests:
- elementary.column_anomalies:
column_anomalies:
# Automatically detects: null rate, zero rate, min/max/avg drift
# Alerts on deviations beyond 3 sigma from 14-day rolling baseline
timestamp_column: order_date
- name: status
tests:
- elementary.all_columns_anomalies:
column_anomalies:
- null_count
- null_percent
tests:
- elementary.table_anomalies:
table_anomalies:
- row_count # alert on unusual row count changes
- freshness # alert if table not updated on schedule
# After running: dbt test --select elementary
# View report: edr report (Elementary CLI)Data Pipeline Testing Checklist
Define contracts before writing tests
Start with a written contract: what columns exist, what types they have, what values are valid, and what row counts are expected. This is the source of truth for your Pandera schemas, GE suites, and dbt contracts. Without a contract, tests are arbitrary and brittle.
Validate at every pipeline boundary
Run schema validation at ingestion (before bad data enters the lake), after each major transform (before building downstream artifacts), and at the final output (before consumers read it). Each boundary is an opportunity to catch a different class of failure.
Route validation failures to a dead-letter queue
Invalid records should not be silently dropped or cause the entire batch to fail. Route them to a dead-letter partition or table, with the original record and the validation error attached. This enables manual review, reprocessing after a fix, and accurate bad-data rate metrics.
Keep your CI sample dataset current
Refresh the representative sample used in CI at least weekly. Sample datasets go stale — a schema addition in production that the sample does not include will cause your CI tests to pass while production fails. Automate the refresh as a scheduled pipeline run.
Version your expectation suites alongside your models
GE suites, Pandera schemas, and dbt YAML files are code — commit them in the same PR as the model changes they describe. A model refactor that changes output column names must update its expectation suite in the same commit. This prevents the common situation where tests become stale and eventually useless.
Distinguish blocking vs. warning failures
Not all expectation failures are equally critical. A primary key uniqueness violation should halt the pipeline — duplicates corrupt downstream joins. A statistical expectation for mean order value being slightly out of range might warrant a Slack alert but not a pipeline stop. Use GE's mostly parameter and dbt's warn_if / error_if thresholds to express this distinction.
Building or testing data pipelines and struggling with silent failures?
We help engineering teams implement robust data quality frameworks — from contract tests and Great Expectations suites to dbt schema tests, CI/CD data gates, and production monitoring. Let’s talk.
Get in Touch