Back to Blog
Apache AirflowData EngineeringDAGPythonOrchestrationKubernetes

Apache Airflow in Production — DAG Design, Backfills, and Dependency Management

A practical guide to Apache Airflow in production: idempotent DAG design with the TaskFlow API, task dependencies and TaskGroups, dynamic task mapping with .expand(), ExternalTaskSensor for cross-DAG dependencies, safe backfill strategies, config-driven DAG factory patterns, KubernetesPodOperator for isolated task environments, Helm chart deployment, and CI/CD pipelines for DAG parsing validation and unit testing.

2026-05-11

Why cron is not enough for data pipelines

Cron handles simple schedules well: run this script at 2 AM. It breaks down the moment pipelines have dependencies, require retries, need backfills after outages, or must parallelise work across dozens of datasets. A pipeline that extracts data from ten sources, transforms each one, loads them into a warehouse, then runs cross-source quality checks cannot express its dependency graph in crontab. When one extraction fails, you need selective re-runs — not a full restart. When the warehouse was unavailable for six hours, you need to backfill the missed intervals without duplicating data already processed.

Apache Airflow is the industry-standard Python-native workflow orchestrator for data pipelines. A DAG — Directed Acyclic Graph — is a Python file that declares tasks, their dependencies, and the schedule on which they run. Airflow's scheduler reads DAG files, triggers task instances based on schedule and upstream success, and provides a web UI for monitoring, retrying, and backfilling. Airflow 2.x introduced the TaskFlow API, dynamic task mapping, and a fully redesigned scheduler architecture that scales to thousands of concurrent task instances without polling the database on every heartbeat.

Core Concepts — DAGs, Operators, XCom, and Connections

DAG (Directed Acyclic Graph)

A Python file that defines the workflow. Airflow imports DAG files from the DAGs folder (or a Git-synced volume in production). Each DAG has an id, a schedule (cron expression, timedelta, or @daily/@hourly shorthand), default_args (owner, retries, email), and a set of tasks linked by dependency edges.

Operator / Task

An Operator is the class that defines what a task does. PythonOperator executes a Python function. BashOperator runs a shell command. KubernetesPodOperator spawns a Kubernetes pod. SQLExecuteQueryOperator runs SQL on any database connection. In the TaskFlow API (@task decorator), a Python function is automatically wrapped in a PythonOperator.

XCom

The mechanism for passing small values between tasks. A task pushes an XCom by returning a value; downstream tasks pull it with ti.xcom_pull(). XComs are stored in the Airflow metadata database — keep them small (IDs, counts, status flags). Pass large data via S3 or GCS and exchange only paths or URIs via XCom.

Connection

A named credential stored in Airflow (optionally backed by HashiCorp Vault or AWS Secrets Manager). Operators reference connections by conn_id. Manage connections via the UI, CLI, or environment variables. Never hardcode credentials in DAG files.

Variable

A global key-value store for runtime configuration. Variables are fetched at task runtime — call Variable.get('key') inside operator callables, not at module level, or the scheduler will query the database on every DAG parse (every 30–60 seconds, proportional to DAG count).

DAG Design Principles — Idempotency and Atomicity

The single most important property for production DAGs is idempotency: running a task for the same logical date any number of times should produce the same result. This is what enables Airflow's backfill and retry mechanisms to work safely. An idempotent task does not append rows — it upserts them. It does not create files with timestamps — it writes to deterministic paths derived from the execution interval. It does not assume the previous run's state — it re-derives everything from inputs.

Atomicity means each task should do one thing and succeed or fail as a unit. A task that extracts data AND loads it cannot be partially retried. Split extract, transform, and load into separate tasks to make each independently retryable.

# dags/order_summary.py — production-grade DAG skeleton
#
# Conventions:
#  - data_interval_start / data_interval_end (Airflow 2.2+ preferred API)
#  - idempotent writes using INSERT ... ON CONFLICT DO UPDATE
#  - catchup=False for new DAGs
#  - explicit retries, retry_delay, and exponential backoff
#  - tags for UI filtering

from __future__ import annotations

import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook


@dag(
    dag_id="order_summary_daily",
    schedule="0 3 * * *",                   # 03:00 UTC every day
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,                           # do not backfill historical runs on deploy
    max_active_runs=1,                       # one run at a time — avoid concurrent writes
    default_args={
        "owner": "data-engineering",
        "retries": 3,
        "retry_delay": pendulum.duration(minutes=5),
        "retry_exponential_backoff": True,
        "max_retry_delay": pendulum.duration(minutes=60),
        "email_on_failure": True,
        "email": ["data-alerts@company.com"],
    },
    tags=["orders", "daily", "warehouse"],
)
def order_summary_daily():

    @task
    def extract_orders(data_interval_start=None, data_interval_end=None) -> dict:
        """Extract orders for the closed day interval."""
        hook = PostgresHook(postgres_conn_id="orders_db")
        rows = hook.get_records(
            """
            SELECT order_id, customer_id, total_usd, created_at
            FROM orders
            WHERE created_at >= %(start)s AND created_at < %(end)s
            """,
            parameters={"start": data_interval_start, "end": data_interval_end},
        )
        # Write data to S3; return only the path via XCom (keep XComs small)
        s3_path = upload_to_s3(rows, prefix=f"raw/orders/{data_interval_start.date()}")
        return {"s3_path": s3_path, "row_count": len(rows)}

    @task
    def transform_orders(extract_result: dict) -> dict:
        """Aggregate orders by customer for the given day."""
        df = read_from_s3(extract_result["s3_path"])
        summary = (
            df.groupby("customer_id")
            .agg(order_count=("order_id", "count"), total_usd=("total_usd", "sum"))
            .reset_index()
        )
        out_path = extract_result["s3_path"].replace("raw/", "transformed/")
        write_to_s3(summary, out_path)
        return {"s3_path": out_path, "row_count": len(summary)}

    @task
    def load_summary(transform_result: dict, data_interval_start=None) -> None:
        """UPSERT into the warehouse — safe to re-run any number of times."""
        hook = PostgresHook(postgres_conn_id="warehouse_db")
        df = read_from_s3(transform_result["s3_path"])
        hook.insert_rows(
            table="customer_order_summary",
            rows=df.values.tolist(),
            target_fields=["customer_id", "order_count", "total_usd", "summary_date"],
            replace=True,                    # INSERT ... ON CONFLICT DO UPDATE
            replace_index=["customer_id", "summary_date"],
        )

    extracted = extract_orders()
    transformed = transform_orders(extracted)
    load_summary(transformed)


order_summary_daily()

Note

Use data_interval_start and data_interval_end rather than the legacy execution_date in Airflow 2.2+. A daily DAG with data_interval_start=2026-05-10 00:00explicitly processes May 10th's data regardless of when the run actually executes — making the semantics unambiguous during backfills and manual reruns.

Task Dependencies — TaskGroups and Fan-Out / Fan-In

Airflow expresses task dependencies with the >> and << operators or the set_upstream / set_downstream methods. For complex pipelines, TaskGroups provide logical grouping that collapses in the UI and enables clean fan-in/fan-out patterns without deeply nested dependency expressions.

# Multi-source ETL with TaskGroups: fan-out extracts, parallel transforms, fan-in quality gate

from __future__ import annotations

import pendulum
from airflow.decorators import dag, task, task_group
from airflow.operators.empty import EmptyOperator


@dag(
    dag_id="multi_source_etl",
    schedule="0 4 * * *",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    max_active_runs=1,
    tags=["etl", "multi-source"],
)
def multi_source_etl():

    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end", trigger_rule="all_success")

    @task_group(group_id="extract")
    def extract_group():
        @task
        def extract_crm() -> str:
            return pull_from_crm()

        @task
        def extract_erp() -> str:
            return pull_from_erp()

        @task
        def extract_events() -> str:
            return pull_from_event_store()

        return extract_crm(), extract_erp(), extract_events()

    @task_group(group_id="transform")
    def transform_group(crm_path: str, erp_path: str, events_path: str):
        @task
        def transform_customers(path: str) -> str:
            return run_customer_transform(path)

        @task
        def transform_revenue(crm: str, erp: str) -> str:
            return run_revenue_join(crm, erp)

        @task
        def transform_funnel(events: str, crm: str) -> str:
            return run_funnel_analysis(events, crm)

        return (
            transform_customers(crm_path),
            transform_revenue(crm_path, erp_path),
            transform_funnel(events_path, crm_path),
        )

    @task
    def quality_gate(*transformed_paths: str) -> None:
        """Fan-in: all transforms must succeed before quality checks run."""
        for path in transformed_paths:
            assert_row_counts(path)
            assert_no_nulls(path)

    @task
    def load_warehouse(*paths: str) -> None:
        for path in paths:
            load_to_bigquery(path)

    crm, erp, events = extract_group()
    customers, revenue, funnel = transform_group(crm, erp, events)
    quality = quality_gate(customers, revenue, funnel)
    load = load_warehouse(customers, revenue, funnel)

    start >> [crm, erp, events]
    [quality, load] >> end


multi_source_etl()

Dynamic Task Mapping — Parallelise Over Variable-Length Lists

Airflow 2.3 introduced dynamic task mapping — expanding a single task definition into N parallel instances at runtime based on a list produced by an upstream task. The number of instances is decided at runtime, not at DAG parse time. Each instance gets its own logs, retry counter, and UI state, making failures easy to identify and individually retried.

# Dynamic task mapping: one task instance per S3 partition, count determined at runtime

from __future__ import annotations

import pendulum
from airflow.decorators import dag, task


@dag(
    dag_id="partitioned_ingestion",
    schedule="0 5 * * *",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    max_active_runs=1,
    tags=["ingestion", "dynamic-mapping"],
)
def partitioned_ingestion():

    @task
    def discover_partitions(data_interval_start=None) -> list[dict]:
        """Return one partition spec per parallel task instance."""
        partitions = list_s3_prefixes(
            bucket="raw-data",
            prefix=f"events/{data_interval_start.date()}/",
        )
        return [
            {"partition_key": p, "date": str(data_interval_start.date())}
            for p in partitions
        ]

    @task(
        retries=2,
        retry_delay=pendulum.duration(minutes=2),
        max_active_tis_per_dag=20,       # cap concurrency — don't slam the target system
    )
    def process_partition(partition: dict) -> dict:
        """Each task instance processes one partition independently."""
        key = partition["partition_key"]
        raw_df = read_s3_partition(key)
        transformed = apply_transformations(raw_df, partition["date"])
        out_key = key.replace("raw-data/", "processed/")
        write_s3(transformed, out_key)
        return {"partition_key": key, "row_count": len(transformed), "status": "ok"}

    @task
    def aggregate_results(results: list[dict]) -> None:
        """Fan-in: collect all per-partition results and write an audit record."""
        total_rows = sum(r["row_count"] for r in results)
        failed = [r for r in results if r["status"] != "ok"]
        if failed:
            raise ValueError(f"{len(failed)} partitions failed: {failed}")
        write_ingestion_audit(total_rows=total_rows, partition_count=len(results))

    partitions = discover_partitions()
    results = process_partition.expand(partition=partitions)
    aggregate_results(results)


partitioned_ingestion()

Note

Set max_active_tis_per_dag on mapped tasks to prevent runaway concurrency. If discover_partitions returns 500 items and there is no cap, the executor will attempt to schedule all 500 simultaneously — overwhelming downstream databases, rate-limited APIs, or the Airflow metadata DB itself.

Sensors and ExternalTaskSensor — Cross-DAG Dependencies

Sensors poll an external condition until it is true, then succeed. The ExternalTaskSensor polls another DAG's task state — the standard pattern for expressing dependencies between separate pipeline DAGs without coupling them into a monolith. Use mode="reschedule"in production: it releases the worker slot between polls so idle sensors don't occupy Celery workers or Kubernetes pods indefinitely.

# Cross-DAG dependency: downstream_analytics waits for multi_source_etl to complete
# and for an external file to arrive in S3 before running

from __future__ import annotations

import pendulum
from airflow.decorators import dag, task
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor


@dag(
    dag_id="downstream_analytics",
    schedule="0 6 * * *",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    max_active_runs=1,
    tags=["analytics"],
)
def downstream_analytics():

    # Wait for the upstream ETL DAG's terminal task to succeed for the same interval
    wait_for_etl = ExternalTaskSensor(
        task_id="wait_for_etl",
        external_dag_id="multi_source_etl",
        external_task_id="end",             # None = wait for the whole DAG
        execution_delta=pendulum.duration(hours=2),   # upstream runs 2h earlier
        mode="reschedule",                  # release slot between retries
        poke_interval=60,                   # check every 60 seconds
        timeout=3600,                       # fail after 1 h if upstream never completes
        soft_fail=False,
    )

    # Also wait for an external CRM export to land in S3
    wait_for_crm_export = S3KeySensor(
        task_id="wait_for_crm_export",
        bucket_name="external-exports",
        bucket_key="crm/{{ ds }}/customers.parquet",   # Jinja — ds = run date string
        aws_conn_id="aws_default",
        mode="reschedule",
        poke_interval=120,
        timeout=7200,
    )

    @task
    def run_analytics() -> None:
        """Run only after all upstream sources are confirmed ready."""
        compute_cohort_analysis()
        compute_ltv_model()

    [wait_for_etl, wait_for_crm_export] >> run_analytics()


downstream_analytics()

Backfills and Catchup — Safely Replaying Historical Runs

Backfilling — running a DAG for past date intervals — is one of Airflow's most powerful features and one of the most common sources of production incidents. catchup=True on a daily DAG deployed six months after its start_date immediately queues 180 concurrent runs — unless max_active_runs throttles them. The CLI backfill command gives precise control over which DAG, date range, and task subset to replay.

# CLI backfill patterns — run from inside the Airflow scheduler pod

# Full backfill: re-run all intervals in a date range
airflow dags backfill   --dag-id order_summary_daily   --start-date 2026-04-01   --end-date 2026-04-30   --reset-dagruns              # clear existing run state to allow re-execution

# Partial backfill: re-run only specific tasks (e.g. after fixing a transform bug)
airflow dags backfill   --dag-id order_summary_daily   --start-date 2026-04-15   --end-date 2026-04-30   --task-regex "transform_orders|load_summary"   --reset-dagruns

# Dry run — print what would be scheduled without executing anything
airflow dags backfill   --dag-id order_summary_daily   --start-date 2026-05-01   --end-date 2026-05-10   --dry-run

# Test a single task instance locally without a running scheduler
airflow tasks test order_summary_daily extract_orders 2026-05-10

# Trigger a single manual run with custom config
airflow dags trigger order_summary_daily --conf '{"force_reprocess": true}'

Note

Always deploy new DAGs with catchup=False unless you have explicitly designed and tested a backfill strategy for them. Set max_active_runs=1 as the default — it prevents concurrent runs from racing on shared write targets and keeps backfills from flooding the executor.

Dynamic DAG Generation — Config-Driven DAG Factories

When you have structurally identical pipelines for many tenants, data sources, or environments, the DAG factory patternis the standard solution: a Python script reads a configuration file and generates one DAG object per entry, registering each in the global namespace where Airflow's scheduler discovers it. All pipeline logic lives in one place; configuration drives the variation.

# dags/dag_factory.py — one DAG per tenant, driven by config/tenants.yaml
#
# config/tenants.yaml example:
#   tenants:
#     - id: acme_corp
#       source_conn: postgres_acme
#       target_schema: acme
#       schedule: "0 2 * * *"
#       tables: [orders, customers, products]
#     - id: globex
#       source_conn: mysql_globex
#       target_schema: globex
#       schedule: "0 3 * * *"
#       tables: [transactions, users]

from __future__ import annotations

import pendulum
import yaml
from pathlib import Path
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook


def make_tenant_dag(tenant: dict):
    tenant_id = tenant["id"]

    @dag(
        dag_id=f"ingest_{tenant_id}",
        schedule=tenant["schedule"],
        start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
        catchup=False,
        max_active_runs=1,
        default_args={"retries": 2, "retry_delay": pendulum.duration(minutes=5)},
        tags=["ingest", "tenant", tenant_id],
    )
    def tenant_dag():
        for table in tenant["tables"]:

            @task(task_id=f"extract_{table}")
            def extract(t=table, cfg=tenant, data_interval_start=None) -> str:
                hook = PostgresHook(postgres_conn_id=cfg["source_conn"])
                rows = hook.get_records(
                    f"SELECT * FROM {t} WHERE updated_at >= %(since)s",
                    parameters={"since": data_interval_start},
                )
                return write_staging(rows, tenant_id=cfg["id"], table=t,
                                     date=str(data_interval_start.date()))

            @task(task_id=f"load_{table}")
            def load(path: str, t=table, cfg=tenant) -> None:
                df = read_staging(path)
                upsert_to_warehouse(df, schema=cfg["target_schema"], table=t)

            load(extract())

    return tenant_dag()


# Load config and register all generated DAGs in the global namespace
# The Airflow scheduler discovers objects in globals() whose type is DAG
_config_path = Path(__file__).parent.parent / "config" / "tenants.yaml"
_config = yaml.safe_load(_config_path.read_text())

for _tenant in _config["tenants"]:
    globals()[f"dag_ingest_{_tenant['id']}"] = make_tenant_dag(_tenant)

KubernetesPodOperator — Isolated Task Environments

The KubernetesExecutor runs each Airflow task as a separate Kubernetes pod, launched on-demand and cleaned up after completion. The KubernetesPodOperatorlets individual tasks — even in a CeleryExecutor setup — run in arbitrary container images. This is the standard approach for tasks requiring heavy or conflicting dependencies (ML libraries, dbt, Spark) that don't belong in the main Airflow image.

# KubernetesPodOperator: run heavy ML training in a dedicated container
# Each task gets its own pod, image, resources, and secrets

from __future__ import annotations

import pendulum
from airflow.decorators import dag
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s


@dag(
    dag_id="ml_training_pipeline",
    schedule="0 1 * * 0",           # weekly, Sunday 01:00 UTC
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["ml", "training"],
)
def ml_training_pipeline():

    preprocess = KubernetesPodOperator(
        task_id="preprocess_features",
        image="myregistry.io/feature-engineering:1.4.2",
        name="airflow-preprocess-{{ ds_nodash }}",     # unique pod name per run
        namespace="airflow",
        image_pull_policy="IfNotPresent",
        get_logs=True,
        log_events_on_failure=True,
        is_delete_operator_pod=True,                   # clean up pod after completion
        arguments=["--date={{ ds }}", "--output=s3://ml-data/features/{{ ds }}/"],
        env_vars=[
            k8s.V1EnvVar(name="AWS_REGION", value="eu-west-1"),
            k8s.V1EnvVar(
                name="DB_PASSWORD",
                value_from=k8s.V1EnvVarSource(
                    secret_key_ref=k8s.V1SecretKeySelector(
                        name="ml-db-secret", key="password"
                    )
                ),
            ),
        ],
        resources=k8s.V1ResourceRequirements(
            requests={"cpu": "1", "memory": "4Gi"},
            limits={"cpu": "4", "memory": "16Gi"},
        ),
        node_selector={"workload": "data-processing"},
        tolerations=[
            k8s.V1Toleration(key="data-processing", operator="Exists", effect="NoSchedule")
        ],
    )

    train = KubernetesPodOperator(
        task_id="train_model",
        image="myregistry.io/model-training:2.1.0",
        name="airflow-train-{{ ds_nodash }}",
        namespace="airflow",
        get_logs=True,
        is_delete_operator_pod=True,
        arguments=[
            "--features=s3://ml-data/features/{{ ds }}/",
            "--output=s3://ml-models/{{ ds }}/",
            "--experiment=weekly-churn-v2",
        ],
        resources=k8s.V1ResourceRequirements(
            requests={"cpu": "4", "memory": "32Gi", "nvidia.com/gpu": "1"},
            limits={"cpu": "8", "memory": "64Gi", "nvidia.com/gpu": "1"},
        ),
        node_selector={"accelerator": "nvidia-t4"},
    )

    preprocess >> train


ml_training_pipeline()

Deploying Airflow with Helm — Production Configuration

The official Apache Airflow Helm chart supports all executors, Git-sync for DAG delivery, external secret backends, and configurable resource profiles. The key production decisions are: executor type (KubernetesExecutor for isolation, CeleryExecutor for lower pod-startup latency), DAG delivery (Git-sync sidecar vs persistent volume), and metadata database (externally managed PostgreSQL — never the bundled chart-managed Postgres for production).

# airflow-values.yaml — production Helm configuration
# helm repo add apache-airflow https://airflow.apache.org
# helm install airflow apache-airflow/airflow #   -f airflow-values.yaml -n airflow --version 1.13.1

executor: KubernetesExecutor

# External PostgreSQL — never use the bundled Postgres in production
data:
  metadataConnection:
    user: airflow
    pass: ~                           # injected via Kubernetes secret
    protocol: postgresql
    host: airflow-postgres.db.internal
    port: 5432
    db: airflow

postgresql:
  enabled: false                      # disable bundled Postgres chart

redis:
  enabled: false                      # only needed for CeleryExecutor

# Fernet key encrypts sensitive fields in the metadata DB
# Generate: python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
fernetKeySecretName: airflow-fernet-key

# Git-sync sidecar: DAGs are pulled from a Git repo every 60 seconds
dags:
  gitSync:
    enabled: true
    repo: https://github.com/myorg/airflow-dags.git
    branch: main
    rev: HEAD
    depth: 1
    subPath: "dags"
    period: 60s
    credentialsSecret: airflow-git-credentials

webserver:
  replicas: 2
  resources:
    requests: { cpu: "500m", memory: "1Gi" }
    limits:   { cpu: "1000m", memory: "2Gi" }

scheduler:
  replicas: 2                         # HA scheduler — requires PostgreSQL backend
  resources:
    requests: { cpu: "1000m", memory: "2Gi" }
    limits:   { cpu: "2000m", memory: "4Gi" }

logs:
  persistence:
    enabled: true
    size: 50Gi
    storageClassName: standard

# Secrets backend: pull Connections and Variables from HashiCorp Vault
env:
  - name: AIRFLOW__SECRETS__BACKEND
    value: "airflow.providers.hashicorp.secrets.vault.VaultBackend"
  - name: AIRFLOW__SECRETS__BACKEND_KWARGS
    valueFrom:
      secretKeyRef:
        name: airflow-vault-config
        key: backend-kwargs

config:
  core:
    max_active_runs_per_dag: "5"
    parallelism: "64"
    dag_discovery_safe_mode: "True"
  scheduler:
    min_file_process_interval: "30"
    dag_dir_list_interval: "60"
  webserver:
    expose_config: "False"            # never expose airflow.cfg in the UI

CI/CD for DAGs — Linting, Unit Tests, and Deployment Gates

DAG files are Python code and should be treated with the same rigour as application code. A CI pipeline for an Airflow DAG repository should parse all DAG files to catch import errors before deployment, run unit tests for custom operators and task logic, and validate DAG structure — no cycles, expected task IDs, correct catchup and max_active_runs settings.

# .github/workflows/airflow-ci.yaml

name: Airflow DAGs CI

on:
  pull_request:
    branches: [main]
    paths: ["dags/**", "plugins/**", "tests/**", "requirements*.txt"]

jobs:
  lint-and-test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: "3.11", cache: pip }

      - name: Install dependencies
        run: pip install -r requirements-dev.txt

      - name: Lint with Ruff
        run: ruff check dags/ plugins/ tests/

      - name: Type check
        run: mypy dags/ plugins/ --ignore-missing-imports

      - name: Parse all DAG files (catches import errors before deployment)
        run: |
          python - <<'EOF'
          import importlib.util, sys, glob
          errors = []
          for f in glob.glob("dags/**/*.py", recursive=True):
              spec = importlib.util.spec_from_file_location("dag", f)
              mod = importlib.util.module_from_spec(spec)
              try:
                  spec.loader.exec_module(mod)
              except Exception as e:
                  errors.append(f"{f}: {e}")
          if errors:
              print("DAG parse errors:", *errors, sep="
"); sys.exit(1)
          EOF

      - name: Run unit tests
        run: pytest tests/ -v --tb=short -x

      - name: Validate DAG structure (enforce conventions)
        run: |
          python - <<'EOF'
          from airflow.models import DagBag
          db = DagBag(dag_folder="dags/", include_examples=False)
          if db.import_errors:
              import pprint, sys; pprint.pprint(db.import_errors); sys.exit(1)
          for dag_id, dag in db.dags.items():
              assert not dag.catchup, f"{dag_id}: catchup must be False"
              assert dag.max_active_runs <= 5, f"{dag_id}: max_active_runs > 5"
          print(f"All {len(db.dags)} DAGs validated successfully")
          EOF
# tests/test_order_summary_dag.py — structural and unit tests

import pendulum
import pytest
from unittest.mock import patch
from airflow.models import DagBag


@pytest.fixture
def dagbag():
    return DagBag(dag_folder="dags/", include_examples=False)


def test_dag_loads(dagbag):
    assert "order_summary_daily" in dagbag.dags
    assert not dagbag.import_errors


def test_task_count(dagbag):
    dag = dagbag.dags["order_summary_daily"]
    assert len(dag.tasks) == 3


def test_task_ids(dagbag):
    dag = dagbag.dags["order_summary_daily"]
    assert {t.task_id for t in dag.tasks} == {
        "extract_orders", "transform_orders", "load_summary"
    }


def test_dependency_order(dagbag):
    dag = dagbag.dags["order_summary_daily"]
    extract_downstream = {t.task_id for t in dag.get_task("extract_orders").downstream_list}
    assert "transform_orders" in extract_downstream
    transform_downstream = {t.task_id for t in dag.get_task("transform_orders").downstream_list}
    assert "load_summary" in transform_downstream


def test_catchup_disabled(dagbag):
    assert dagbag.dags["order_summary_daily"].catchup is False


def test_max_active_runs(dagbag):
    assert dagbag.dags["order_summary_daily"].max_active_runs == 1


@patch("dags.order_summary.upload_to_s3", return_value="s3://bucket/path.parquet")
@patch("dags.order_summary.PostgresHook")
def test_extract_returns_s3_path(mock_hook, mock_s3):
    mock_hook.return_value.get_records.return_value = [(1, 42, 99.0, "2026-05-11")]
    result = {"s3_path": "s3://bucket/path.parquet", "row_count": 1}
    assert result["row_count"] == 1
    assert result["s3_path"].startswith("s3://")

Apache Airflow Production Checklist

All tasks are idempotent — writes use UPSERT, not INSERT

A task re-run for the same execution interval must produce identical output. Use INSERT ... ON CONFLICT DO UPDATE, MERGE, or overwrite semantics. Append-only writes will duplicate data on every retry and every backfill, producing incorrect aggregates that are hard to detect and expensive to fix.

catchup=False is the default; backfill strategy is documented per DAG

New DAGs ship with catchup=False until the team has explicitly tested backfill behaviour. Document in the DAG's description which date range is safe to backfill, whether concurrent backfill runs are safe, and what cleanup is needed if a backfill produces wrong output.

max_active_runs=1 on DAGs with shared write targets

Concurrent runs of the same DAG cause race conditions when writing to shared tables without row-level idempotency, or when calling APIs that are not safe to call concurrently. max_active_runs=1 prevents this and is a safe default for most batch pipelines.

Sensors use mode='reschedule'

Sensors in poke mode hold a worker slot for the entire polling duration. With mode='reschedule', the slot is released between polls and the task is rescheduled for the next check. Four sensors polling for 30 minutes in poke mode occupy 4 workers permanently; in reschedule mode they occupy near zero.

Variable.get() is called at task runtime, not at module level

Variable.get() at module level runs on every DAG parse (every 30–60 seconds per scheduler cycle). This hammers the metadata database with queries proportional to DAG count multiplied by parse frequency. Call Variable.get() inside PythonOperator callables, or use Jinja template variables instead.

Connections are backed by an external secret backend

Airflow stores connections in its metadata database by default. In production, back connections and variables with HashiCorp Vault or your cloud provider's secret manager. This separates secret rotation from DAG deployment and enables least-privilege access per service account.

XCom payloads are small — exchange paths, not data

XComs are stored as blobs in the Airflow metadata database (PostgreSQL). Passing DataFrames, large JSON payloads, or binary data via XCom bloats the database and slows scheduler queries. Pass S3 or GCS paths via XCom; pass data via storage. The XCom payload should rarely exceed a few kilobytes.

DAG CI pipeline parses all files on every PR

A DAG with a broken import silently fails to load — the scheduler skips it without alerting anyone. A CI step that instantiates all DAG files via DagBag catches Python errors, missing imports, and broken decorators before they reach the scheduler. This is the single highest-ROI check in an Airflow CI pipeline.

Two scheduler replicas with an external PostgreSQL metadata database

Airflow 2.0+ supports active-active HA scheduling with two replicas using optimistic locking on the metadata database. The metadata DB must be PostgreSQL (not SQLite). The bundled in-cluster chart Postgres is not production-grade — a single pod failure takes down the entire Airflow installation.

Work with us

Running Apache Airflow in production and hitting scaling or reliability challenges with your DAG design?

We design and implement production-grade Airflow pipelines — from idempotent DAG architecture and dependency management to KubernetesExecutor setup, Helm deployments on Kubernetes, DAG factory patterns for multi-tenant pipelines, and CI/CD pipelines for safe DAG testing and deployment. Let’s talk.

Get in touch

Related Articles