Why dbt Tests Are Your Last Line of Defense
A data warehouse that looks right is more dangerous than one that is obviously broken. Silent data quality issues — duplicate order IDs, nulls in join keys, stale dimension records, out-of-range revenue figures — propagate downstream into dashboards and ML training sets where they erode trust without raising an obvious error. dbt's testing framework gives analytics engineers a structured, version-controlled way to assert data quality at every layer of the transformation graph — from raw sources to mart tables that power executive dashboards.
Unlike application unit tests that run against mocked objects, dbt tests execute as SQL queries against real data in your warehouse. They catch not just logic errors in your Jinja/SQL transformations, but also upstream data surprises — a vendor sending negative quantities, a CRM emitting duplicate contact IDs, a Kafka consumer backfilling records with wrong timestamps. Tests that run in CI before deployment and on a schedule in production create a continuous quality gate. For teams applying the advanced patterns described in dbt advanced patterns, a solid test strategy is what makes refactoring large macro- and package-heavy projects safe.
Built-in Generic Tests — unique, not_null, accepted_values, relationships
dbt ships with four generic tests that cover the most common data quality dimensions. They are declared in YAML schema files alongside your model definitions and compiled to SQL at runtime. Generic tests are the cheapest tests to write and should be applied liberally to every model column that carries a semantic contract.
# models/staging/schema.yml
version: 2
sources:
- name: raw_orders
database: analytics
schema: raw
loaded_at_field: _loaded_at # used by source freshness checks
freshness:
warn_after: { count: 6, period: hour }
error_after: { count: 24, period: hour }
tables:
- name: orders
columns:
- name: order_id
tests: [unique, not_null]
- name: status
tests:
- accepted_values:
values: ['placed', 'shipped', 'delivered', 'cancelled', 'returned']
- name: customer_id
tests:
- not_null
- relationships:
to: source('raw_customers', 'customers')
field: customer_id
models:
- name: stg_orders
description: "Cleaned and typed orders from raw source"
columns:
- name: order_id
tests: [unique, not_null]
- name: order_status
tests:
- accepted_values:
values: ['placed', 'shipped', 'delivered', 'cancelled', 'returned']
- name: customer_id
tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
- name: order_amount_usd
tests: [not_null]
- name: created_at
tests: [not_null]
- name: fct_orders
description: "Order fact table — one row per order"
columns:
- name: order_id
tests:
- unique:
severity: error # block deployment on failure
- not_null:
severity: error
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
severity: warn # warn during backfills
- name: revenue_usd
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
severity: errorNote
severity property controls whether a failing test blocks CI (error) or just emits a warning (warn). Use error on primary keys, join keys, and inputs to critical metrics. Use warn during backfills where upstream data is intentionally incomplete. You can also set threshold-based severity with warn_if: ">10" and error_if: ">100".Singular Tests — Custom SQL Assertions for Business Logic
When generic tests are not expressive enough, singular tests let you write arbitrary SQL that asserts a condition. A singular test is a .sql file inside a tests/ directory. dbt runs the query and expects it to return zero rows — any row returned is a failing assertion. This pattern is ideal for multi-column business rules, cross-model consistency checks, and aggregate invariants that cannot be expressed as column-level constraints.
-- tests/assert_order_amounts_positive.sql
-- Passes when zero rows are returned (no negative orders)
select
order_id,
revenue_usd
from {{ ref('fct_orders') }}
where revenue_usd < 0
-- tests/assert_no_future_orders.sql
-- Orders should not have created_at after current timestamp
select
order_id,
created_at
from {{ ref('fct_orders') }}
where created_at > current_timestamp
-- tests/assert_order_status_transitions_valid.sql
-- An order can only transition: placed -> shipped -> delivered
-- It should never skip from placed directly to delivered
select
o.order_id,
o.order_status,
prev.order_status as previous_status
from {{ ref('fct_order_status_history') }} o
left join {{ ref('fct_order_status_history') }} prev
on o.order_id = prev.order_id
and prev.changed_at = (
select max(changed_at)
from {{ ref('fct_order_status_history') }} h
where h.order_id = o.order_id
and h.changed_at < o.changed_at
)
where o.order_status = 'delivered'
and prev.order_status = 'placed'
-- tests/assert_revenue_matches_line_items.sql
-- Fact table revenue must match sum of line items (within rounding tolerance)
select
o.order_id,
o.revenue_usd as header_revenue,
coalesce(li.line_total, 0) as line_item_total,
abs(o.revenue_usd - coalesce(li.line_total, 0)) as discrepancy
from {{ ref('fct_orders') }} o
left join (
select order_id, sum(unit_price * quantity) as line_total
from {{ ref('fct_order_line_items') }}
group by 1
) li using (order_id)
where abs(o.revenue_usd - coalesce(li.line_total, 0)) > 0.01 -- allow $0.01 roundingdbt Unit Tests — Mocking Input Data Since dbt 1.8
Prior to dbt 1.8, it was impossible to test dbt model logic independent of the actual warehouse data. The dbt unit testing feature introduced in dbt Core 1.8 (May 2024) allows you to mock input tables with inline fixture data and assert on the expected output rows. This is the correct tool for testing complex Jinja logic, window functions, CASE expressions, and edge cases that are hard to reproduce in production data.
# models/marts/schema.yml — unit test definition (dbt 1.8+)
unit_tests:
- name: test_order_status_label_mapping
description: "CASE expression maps raw statuses to display labels correctly"
model: fct_orders
given:
- input: ref('stg_orders')
rows:
- { order_id: 1, order_status: 'placed', revenue_usd: 100.00 }
- { order_id: 2, order_status: 'shipped', revenue_usd: 200.00 }
- { order_id: 3, order_status: 'delivered', revenue_usd: 300.00 }
- { order_id: 4, order_status: 'cancelled', revenue_usd: 0.00 }
- { order_id: 5, order_status: 'returned', revenue_usd: 50.00 }
- input: ref('dim_customers')
rows:
- { customer_id: 99, customer_name: 'Test Corp', tier: 'enterprise' }
expect:
rows:
- { order_id: 1, status_label: 'Pending' }
- { order_id: 2, status_label: 'In Transit' }
- { order_id: 3, status_label: 'Complete' }
- { order_id: 4, status_label: 'Cancelled' }
- { order_id: 5, status_label: 'Returned' }
- name: test_revenue_tier_calculation
description: "Revenue tier bucketing logic assigns correct tier labels"
model: fct_customer_metrics
given:
- input: ref('fct_orders')
rows:
- { customer_id: 1, revenue_usd: 50.00 } # low
- { customer_id: 2, revenue_usd: 500.00 } # mid
- { customer_id: 3, revenue_usd: 5000.00 } # high
- { customer_id: 4, revenue_usd: 50000.0 } # enterprise
expect:
rows:
- { customer_id: 1, revenue_tier: 'low' }
- { customer_id: 2, revenue_tier: 'mid' }
- { customer_id: 3, revenue_tier: 'high' }
- { customer_id: 4, revenue_tier: 'enterprise' }
# Run only unit tests
# dbt test --select "test_type:unit"
# Run all tests for a model including unit tests
# dbt build --select fct_ordersNote
expect.rows are ignored. This makes unit tests narrowly focused — test one behavior at a time. For incremental models, set overrides.is_incremental: true in the unit test YAML to simulate the incremental code path.Custom Generic Tests with Macros
When you need a reusable assertion that takes parameters — like "this column must be greater than another column" or "values must match a regex pattern" — custom generic tests are the answer. A custom generic test is a Jinja macro in a macros/tests/ directory that accepts model and column_name as positional arguments and returns SQL that should return zero rows on success.
{# macros/tests/test_not_negative.sql #}
{# Usage: - not_negative (as a column test) #}
{% test not_negative(model, column_name) %}
select {{ column_name }}
from {{ model }}
where {{ column_name }} < 0
{% endtest %}
{# macros/tests/test_is_valid_email.sql #}
{% test is_valid_email(model, column_name) %}
select {{ column_name }}
from {{ model }}
where {{ column_name }} is not null
and {{ column_name }} not like '%@%.%'
{% endtest %}
{# macros/tests/test_column_is_greater_than.sql #}
{# Usage: - column_is_greater_than: {compare_column: other_col} #}
{% test column_is_greater_than(model, column_name, compare_column) %}
select {{ column_name }}, {{ compare_column }}
from {{ model }}
where {{ column_name }} <= {{ compare_column }}
{% endtest %}
{# macros/tests/test_row_count_between.sql #}
{# Usage: - row_count_between: {min_value: 1000, max_value: 10000000} #}
{% test row_count_between(model, column_name, min_value, max_value) %}
select count(*) as row_count
from {{ model }}
having count(*) < {{ min_value }}
or count(*) > {{ max_value }}
{% endtest %}# Using custom generic tests in schema.yml
models:
- name: fct_orders
columns:
- name: revenue_usd
tests:
- not_negative
- column_is_greater_than:
compare_column: discount_usd
- name: customer_email
tests:
- is_valid_email
- name: dim_customers
tests:
- row_count_between:
min_value: 1000
max_value: 10000000
# model-level test — column_name is ignoreddbt-utils and dbt-expectations — Production-Ready Test Packages
Writing every custom test from scratch is unnecessary. dbt-utils (maintained by dbt Labs) and dbt-expectations (inspired by the Great Expectations library) provide a rich catalog of battle-tested generic tests. Adding them to your project via packages.yml unlocks dozens of assertions covering cardinality, distribution, cross-column relationships, and string patterns — all applied with the same declarative YAML syntax as built-in tests. This is one of the foundational practices covered in dbt production patterns: always install dbt-utils before writing your first custom test.
# packages.yml
packages:
- package: dbt-labs/dbt_utils
version: [">=1.0.0", "<2.0.0"]
- package: calogica/dbt_expectations
version: [">=0.10.0", "<1.0.0"]
# Install packages
# dbt deps# models/marts/schema.yml — using dbt-utils and dbt-expectations
models:
- name: fct_orders
tests:
# dbt-utils: table-level tests
- dbt_utils.equal_rowcount:
compare_model: ref('stg_orders') # staging and fact must have same row count
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- order_id
- order_line_number # composite uniqueness for line items
columns:
- name: order_id
tests:
- dbt_utils.not_empty_string
- name: revenue_usd
tests:
- dbt_utils.expression_is_true:
expression: ">= 0"
# dbt-expectations: statistical bounds
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 99999
- dbt_expectations.expect_column_mean_to_be_between:
min_value: 50
max_value: 500 # flag if average order value goes anomalous
- name: order_date
tests:
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: date
- dbt_expectations.expect_column_values_to_be_between:
min_value: "'2020-01-01'"
max_value: "'2030-12-31'"
row_condition: "order_status != 'test'"
- name: customer_email
tests:
- dbt_expectations.expect_column_values_to_match_regex:
regex: "^[a-zA-Z0-9._%+\\-]+@[a-zA-Z0-9.\\-]+\\.[a-zA-Z]{2,}$"
- name: payment_method
tests:
- dbt_expectations.expect_column_distinct_count_to_be_less_than:
value: 20 # payment methods should not explode in cardinality
- name: dim_customers
tests:
- dbt_expectations.expect_table_row_count_to_be_between:
min_value: 1000
max_value: 10000000
- dbt_expectations.expect_table_row_count_to_equal_other_table:
compare_model: ref('stg_customers')
columns:
- name: customer_id
tests:
- dbt_expectations.expect_column_values_to_not_be_null
- dbt_expectations.expect_column_values_to_be_unique
- name: lifetime_value_usd
tests:
- dbt_expectations.expect_column_quantile_values_to_be_between:
quantile: 0.99
min_value: 0
max_value: 50000 # 99th percentile LTV sanity checkSource Freshness — Detecting Stale Upstream Data
Data freshness failures are a class of their own: the pipeline ran without errors, models compiled and materialized correctly, but the data being transformed is hours or days old because an upstream ingestion job silently stalled. dbt source freshness checks query a timestamp column (loaded_at_field) in each source table and compare the maximum value against a configured age threshold. This is distinct from model tests — it validates the pipeline inputs, not the transformation logic. Integrating freshness checks with broader data pipeline testing strategies gives you end-to-end quality coverage from source ingestion through mart delivery.
# models/staging/sources.yml — source freshness configuration
version: 2
sources:
- name: salesforce
database: analytics
schema: salesforce_raw
tables:
- name: opportunities
loaded_at_field: _fivetran_synced # common for Fivetran sources
freshness:
warn_after: { count: 12, period: hour }
error_after: { count: 24, period: hour }
- name: accounts
loaded_at_field: _fivetran_synced
freshness:
warn_after: { count: 24, period: hour }
error_after: { count: 72, period: hour }
- name: stripe
database: analytics
schema: stripe_raw
tables:
- name: charges
loaded_at_field: created # use source system timestamp
freshness:
warn_after: { count: 2, period: hour }
error_after: { count: 6, period: hour }
- name: refunds
loaded_at_field: created
freshness:
warn_after: { count: 6, period: hour }
error_after: { count: 12, period: hour }
# Run source freshness checks
# dbt source freshness
# Include freshness in CI (fail if sources are stale before building models)
# dbt source freshness --select source:salesforce
# dbt build
# Output example:
# Found 4 sources. Freshness will be calculated for 4 of them.
# Freshness: salesforce.opportunities SUCCESS [max_loaded_at=2026-06-08 09:00 UTC]
# Freshness: stripe.charges WARNING [max_loaded_at=2026-06-08 06:30 UTC] (13.5 hours)
# Freshness: stripe.refunds ERROR [max_loaded_at=2026-06-07 21:00 UTC] (24 hours)Note
dbt source freshness as the first step in your CI pipeline, before dbt build. If a critical source is stale and you build the models anyway, you will materialize outdated data into production tables and overwrite correct data with stale data — a worse outcome than no build at all. Use error_after thresholds that exit non-zero so CI can gate on them. In Airflow, wrap this as a separate task that must succeed before any downstream dbt tasks run.Test Selection, Exclusion, and Incremental Testing
Running every test on every run is not always practical. A warehouse with hundreds of models and thousands of tests could take 10-15 minutes to fully test — acceptable for a nightly schedule but unacceptable for a per-PR CI run. dbt's node selection syntax gives you fine-grained control over which tests run when.
# ── Selection syntax cheat sheet ─────────────────────────────────────────────
# Run all tests
dbt test
# Run tests for a specific model and all its upstream dependencies
dbt test --select fct_orders+
# Run tests for models that have changed in the current PR (slim CI)
dbt test --select state:modified+ --defer --state .dbt_state_artifacts/
# Run only generic (schema) tests — skip singular tests
dbt test --select test_type:generic
# Run only singular tests
dbt test --select test_type:singular
# Run only unit tests (dbt 1.8+)
dbt test --select test_type:unit
# Run tests tagged 'critical' across all models
dbt test --select tag:critical
# Exclude slow or expensive tests from CI
dbt test --exclude tag:slow
# Run tests for an entire DAG layer (all mart models)
dbt test --select marts.*
# ── Tagging tests for selective execution ────────────────────────────────────
# models/marts/schema.yml
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique:
severity: error
tags: ['critical', 'ci'] # included in fast CI runs
- not_null:
tags: ['critical', 'ci']
- name: revenue_usd
tests:
- dbt_expectations.expect_column_mean_to_be_between:
min_value: 50
max_value: 500
tags: ['statistical', 'slow'] # skip in fast CI, run nightly
# ── Slim CI: only test what changed ───────────────────────────────────────────
# 1. In main branch CI, save state artifacts after successful build
dbt build && cp -r target/ .dbt_state_artifacts/
# 2. In PR CI, compare against saved state
dbt build --select state:modified+ --defer --state .dbt_state_artifacts/Layered Test Strategy — Staging, Intermediate, and Marts
Not every model needs the same test density. Over-testing early layers wastes warehouse compute and creates false alert fatigue; under-testing mart layers lets business logic errors slip into dashboards. The practical approach is a tiered test strategy that matches test type and severity to the model's position in the DAG and the risk of each column.
- Sources — source freshness thresholds, not_null and unique on natural keys, and accepted_values for low-cardinality columns. These tests catch ingestion problems early before they propagate.
- Staging models — unique and not_null on surrogate or natural keys. Relationship tests to validate foreign keys against other staging models. Keep severity at
warnfor relationship tests since staging data is often incomplete. - Intermediate models — dbt unit tests for complex CASE expressions and window functions. Row count checks to confirm joins are not exploding (
equal_rowcount). Expression assertions for derived columns (non-negative amounts, valid date ranges). - Mart models — strict unique/not_null on all dimension and fact primary keys with
severity: error. Statistical tests (mean, quantile bounds) for key metrics. Singular tests for cross-model business invariants. Full referential integrity checks between fact and dimension tables.
# models/staging/schema.yml — lightweight staging tests
models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique: { severity: error }
- not_null: { severity: error }
- name: customer_id
tests:
- relationships:
to: ref('stg_customers')
field: customer_id
severity: warn # warn only — source may lag
---
# models/intermediate/schema.yml — logic validation
unit_tests:
- name: test_revenue_net_of_discounts
model: int_orders_enriched
given:
- input: ref('stg_orders')
rows:
- { order_id: 1, gross_amount: 100.00, discount_amount: 10.00 }
- { order_id: 2, gross_amount: 200.00, discount_amount: 0.00 }
expect:
rows:
- { order_id: 1, net_revenue: 90.00 }
- { order_id: 2, net_revenue: 200.00 }
---
# models/marts/schema.yml — strict mart enforcement
models:
- name: fct_orders
tests:
- dbt_utils.equal_rowcount:
compare_model: ref('stg_orders')
columns:
- name: order_id
tests:
- unique: { severity: error, tags: [critical] }
- not_null: { severity: error, tags: [critical] }
- name: revenue_usd
tests:
- not_null: { severity: error }
- dbt_utils.expression_is_true:
expression: ">= 0"
severity: error
- dbt_expectations.expect_column_mean_to_be_between:
min_value: 50
max_value: 500
tags: [statistical, slow]CI/CD Integration — Running dbt Tests in GitHub Actions
dbt tests must run in CI for them to actually block bad code from reaching production. A well-structured GitHub Actions workflow checks source freshness, runs slim CI tests on changed models, and posts test results as PR annotations. On merge to main, a separate deployment job runs the full test suite.
# .github/workflows/dbt-ci.yml
name: dbt CI
on:
pull_request:
paths:
- 'models/**'
- 'tests/**'
- 'macros/**'
- 'packages.yml'
- 'dbt_project.yml'
jobs:
dbt-test:
runs-on: ubuntu-latest
env:
DBT_PROFILES_DIR: .
DBT_TARGET: ci
DBT_BIGQUERY_KEYFILE: ${{ secrets.GCP_SA_KEYFILE }}
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dbt and packages
run: |
pip install dbt-bigquery==1.8.*
dbt deps
- name: Download production state artifacts
uses: actions/download-artifact@v4
with:
name: dbt-state
path: .dbt_state_artifacts/
continue-on-error: true # first run has no state yet
- name: Check source freshness
run: dbt source freshness
# Exits non-zero if any error_after threshold is breached
- name: Build and test changed models (slim CI)
run: |
dbt build \
--select state:modified+ \
--defer \
--state .dbt_state_artifacts/ \
--exclude tag:slow
- name: Save updated state artifacts
if: github.ref == 'refs/heads/main'
uses: actions/upload-artifact@v4
with:
name: dbt-state
path: target/
retention-days: 7
dbt-full-test:
runs-on: ubuntu-latest
needs: dbt-test
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Install dbt
run: pip install dbt-bigquery==1.8.* && dbt deps
- name: Full build and test on merge to main
run: dbt build --target prod
- name: Run statistical and slow tests on schedule
if: github.event_name == 'schedule'
run: dbt test --select tag:statistical tag:slowNote
--defer flag tells dbt to resolve refs for unchanged models from the production state artifacts rather than requiring them to be materialized in the CI environment. This means CI only needs to build and test the models that actually changed in the PR, while tests that reference upstream production models (like relationship tests) still pass because they query production data via the deferred ref. Combined with state:modified+, this reduces CI run time from 20-30 minutes to under 5 minutes for typical PRs.Storing and Querying Test Results for Observability
dbt writes test results to target/run_results.json after every run. Parsing this file and loading results into a dedicated test tracking table gives you a historical record of data quality over time — which tests failed, how many rows were affected, and whether failures are trending up or down. This is the foundation of a dbt observability practice: treat test results as time-series data, not just pass/fail events.
# scripts/load_test_results.py — parse and load dbt test results to BigQuery
import json
import datetime
import uuid
from pathlib import Path
from google.cloud import bigquery
def load_dbt_test_results(run_results_path: str, project_id: str, dataset: str):
results_json = json.loads(Path(run_results_path).read_text())
rows = []
run_at = results_json["metadata"]["generated_at"]
dbt_version = results_json["metadata"]["dbt_schema_version"]
for result in results_json["results"]:
if not result["unique_id"].startswith("test."):
continue
rows.append({
"run_id": str(uuid.uuid4()),
"run_at": run_at,
"test_name": result["unique_id"],
"status": result["status"], # pass, fail, warn, error
"failures": result.get("failures", 0),
"execution_ms": result.get("execution_time", 0) * 1000,
"dbt_version": dbt_version,
})
client = bigquery.Client(project=project_id)
table_ref = f"{project_id}.{dataset}.dbt_test_results"
errors = client.insert_rows_json(table_ref, rows)
if errors:
raise RuntimeError(f"BigQuery insert errors: {errors}")
print(f"Loaded {len(rows)} test results to {table_ref}")
# ── Query test failure trends in BigQuery ────────────────────────────────────
# SELECT
# DATE(run_at) as run_date,
# test_name,
# countif(status = 'fail') as failures,
# countif(status = 'pass') as passes,
# avg(failures) as avg_failing_rows
# FROM analytics.dbt_observability.dbt_test_results
# WHERE run_at >= timestamp_sub(current_timestamp(), interval 30 day)
# GROUP BY 1, 2
# ORDER BY 1 DESC, failures DESCWork with us
Running dbt in production and struggling with test coverage, slow CI builds, or silent data quality regressions slipping into dashboards?
We design and implement production dbt testing strategies — from generic test configuration with severity thresholds on primary and foreign keys, singular SQL assertions for business invariants, dbt 1.8 unit tests for complex model logic, dbt-utils and dbt-expectations package setup, source freshness monitoring, slim CI with state:modified+ and --defer to cut build times from 20 minutes to under 5, and GitHub Actions workflows that gate deployments on data quality. Let’s talk.
Get in touch