What Is Airflow 3.0?
Apache Airflow 3.0 is the first major version bump since Airflow 2.0 in December 2020. The headline changes are architectural: the Task SDK is now a separate installable package decoupled from the Airflow core, Datasets are renamed to Assets with richer event-driven scheduling semantics, DAG versioning makes backfills reproducible even when DAG code changes between runs, the web UI is rewritten in React, and REST API v2 replaces the experimental v1 that shipped with Airflow 2.
Airflow 3.0 also introduces breaking changes that require code updates before upgrading: several import paths changed, the execution_date context variable is replaced by logical_date, and operators moved out of core into provider packages. For teams running Airflow 2.x in production with complex DAG portfolios, the migration is non-trivial but well-documented — this article covers each change and the steps needed to upgrade safely.
Task SDK
Tasks now run in a separate process using the standalone apache-airflow-task-sdk package. Worker nodes no longer need a full Airflow installation.
Assets
Datasets renamed to Assets with richer semantics: aliases, conditional scheduling, and watchers that trigger DAGs on external asset updates.
DAG Versioning
Each DAG code change creates a new version. Backfills and reruns execute against the version that originally ran, not the current code.
Airflow 3.0 vs 2.x — Key Differences
| Area | Airflow 2.x | Airflow 3.0 |
|---|---|---|
| Task execution | LocalExecutor / CeleryExecutor in core | Task SDK + Edge Executor for remote workers |
| Dataset triggers | Dataset (simple outlet scheduling) | Asset with aliases, watchers, and conditions |
| DAG versioning | No versioning — latest code always runs | Immutable versions per code change |
| REST API | Experimental REST API v1 | Stable REST API v2 (OpenAPI 3.1) |
| Web UI | Flask/Jinja2 (Airflow UI) | React SPA with improved Grid/Graph views |
| context['execution_date'] | Available (legacy) | Removed — use logical_date |
| Operator location | Many operators in airflow.operators.* | Operators in provider packages only |
The New Task SDK
In Airflow 2.x, every worker node running tasks needed a full Airflow installation — the same codebase as the scheduler. This created version-lock between scheduler upgrades and worker deployments and meant worker containers were large. Airflow 3.0 separates task execution into the apache-airflow-task-sdk package.
The Task SDK is a lightweight package that provides the decorator API (@task), XCom access, and task context — without pulling in the full Airflow dependency tree. DAG authors who use the TaskFlow API can write task code that imports only from airflow.sdk and deploy it to worker environments that install only apache-airflow-task-sdk.
# Install the full Airflow (scheduler, webserver, triggerer)
pip install "apache-airflow==3.0.0"
# Install only the Task SDK (workers, task containers)
pip install "apache-airflow-task-sdk==3.0.0"
# ── Writing tasks with the SDK ─────────────────────────────────────────
# Airflow 3.0 — import from airflow.sdk (recommended)
from airflow.sdk import dag, task, Asset
from datetime import datetime, timedelta
# Legacy import path still works but is deprecated:
# from airflow.decorators import dag, task
orders_asset = Asset("s3://warehouse/orders/daily/")
@dag(
dag_id="process_orders",
schedule=orders_asset, # trigger when this Asset is updated
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["orders", "production"],
)
def process_orders_dag():
@task
def validate_orders(ti=None) -> dict:
# ti is the TaskInstance context object
logical_date = ti.logical_date # replaces execution_date
print(f"Validating orders for {logical_date.date()}")
return {"date": str(logical_date.date()), "valid": True}
@task
def transform_orders(validation: dict) -> int:
if not validation["valid"]:
raise ValueError("Validation failed")
# Simulate transformation
return 1_000
@task(outlets=[Asset("s3://warehouse/orders_processed/daily/")])
def load_orders(count: int) -> None:
print(f"Loaded {count} order records")
result = validate_orders()
count = transform_orders(result)
load_orders(count)
dag_instance = process_orders_dag()Note
airflow.sdk import path is the stable public API surface for Airflow 3.0. Code that imports from airflow.decorators, airflow.operators.python, or airflow.models will still work in 3.0 but those paths emit deprecation warnings and may be removed in 4.0. Migrate to airflow.sdk now to make future upgrades easier.Airflow Assets — Datasets Redesigned
Airflow 2.4 introduced Datasets as a way to trigger downstream DAGs when an upstream DAG updated a named data resource. The concept was powerful but limited: triggers were binary (updated or not), there was no way to express conditional logic, and the namespace was flat strings. Airflow 3.0 renames Datasets to Assets and significantly expands the model.
from airflow.sdk import Asset, AssetAlias, dag, task
from airflow.sdk.definitions.asset import AssetWatcher
from datetime import datetime
# ── Basic asset definition ─────────────────────────────────────────────
raw_orders = Asset("s3://raw/orders/")
clean_orders = Asset("s3://clean/orders/")
orders_report = Asset("s3://reports/orders/")
# ── Asset alias: decouple producer from consumer ──────────────────────
# Producers write to the alias; consumers schedule on the alias.
# Swap producers without changing downstream DAG schedules.
orders_alias = AssetAlias("orders-daily")
@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def ingest_orders():
@task(outlets=[raw_orders, orders_alias])
def fetch_and_store() -> None:
print("Ingesting orders → raw_orders + orders-daily alias")
fetch_and_store()
# ── Consumer: schedule on the alias ──────────────────────────────────
@dag(schedule=orders_alias, start_date=datetime(2026, 1, 1), catchup=False)
def transform_on_orders_alias():
@task(outlets=[clean_orders])
def transform() -> None:
print("Triggered via orders-daily alias")
transform()
# ── Conditional scheduling: require multiple assets ──────────────────
from airflow.sdk.definitions.asset import AssetAll, AssetAny
# Run only when BOTH assets have been updated
@dag(
schedule=AssetAll(clean_orders, Asset("s3://clean/payments/")),
start_date=datetime(2026, 1, 1),
catchup=False,
)
def orders_payments_join():
@task(outlets=[orders_report])
def join_and_report() -> None:
print("Both orders and payments ready — running join")
join_and_report()
# Run when EITHER asset updates
@dag(
schedule=AssetAny(raw_orders, Asset("s3://raw/orders_v2/")),
start_date=datetime(2026, 1, 1),
catchup=False,
)
def ingest_any_source():
@task
def ingest() -> None:
print("At least one source updated")
ingest()
# ── Asset watcher: trigger on external updates ───────────────────────
# Trigger a DAG when an external system updates an asset
# (no outlet from an Airflow task required)
external_crm_export = Asset(
"s3://crm/exports/daily/",
watchers=[
AssetWatcher(
name="crm-export-watcher",
trigger=..., # S3EventTrigger or custom trigger
)
],
)Note
DAG Versioning
In Airflow 2.x, a backfill always executed against the current version of the DAG code. If you changed a DAG between the original run date and the backfill, the backfill used the new logic — making historical runs non-reproducible. Airflow 3.0 solves this with immutable DAG versions: every time the serialized DAG changes (new task, changed parameters, updated schedule), Airflow creates a new version and stores the old one.
Backfills in Airflow 3.0 default to running against the version that was active at each logical_date. You can override this to backfill with the current version if you explicitly need the new behavior. This feature is especially important for compliance workloads where historical reruns must match original processing logic.
# ── DAG versioning — CLI commands ────────────────────────────────────
# List all versions of a DAG
airflow dags versions list --dag-id process_orders
# Output:
# Version Active Created Hash
# 3 True 2026-06-16T10:00:00Z a1b2c3d4
# 2 False 2026-06-10T08:00:00Z e5f6g7h8
# 1 False 2026-06-01T00:00:00Z i9j0k1l2
# ── Backfill against a specific version ──────────────────────────────
# Default: backfill uses the version active at each logical_date
airflow dags backfill --dag-id process_orders --start-date 2026-06-01 --end-date 2026-06-10
# Force backfill with current (latest) version:
airflow dags backfill --dag-id process_orders --start-date 2026-06-01 --end-date 2026-06-10 --run-backwards False --use-current-dag-version
# ── View DAG diff between versions ──────────────────────────────────
airflow dags versions diff --dag-id process_orders --version-a 2 --version-b 3Breaking Changes from Airflow 2.x
Airflow 3.0 removes or renames several APIs that were deprecated in 2.x. Most changes are in import paths and context variable names — the underlying task logic rarely needs modification. Run airflow upgrade-check before upgrading to get a report of all issues in your DAG codebase. This tool is comparable to how Dagster handles breaking API changes with compatibility shims before final removal.
# ── 1. execution_date removed — use logical_date ─────────────────────
# Airflow 2.x (broken in 3.0):
def my_task(**context):
execution_date = context["execution_date"] # KeyError in 3.0
ds = context["ds"] # still works
next_ds = context["next_ds"] # still works
# Airflow 3.0 (correct):
def my_task(**context):
logical_date = context["logical_date"] # pendulum.DateTime
ds = context["ds"] # date string — unchanged
# With TaskFlow API:
@task
def my_task(ti=None):
logical_date = ti.logical_date
data_interval_start = ti.data_interval_start
data_interval_end = ti.data_interval_end
# ── 2. Import path changes ────────────────────────────────────────────
# Airflow 2.x → Airflow 3.0 import mapping:
# airflow.decorators.dag → airflow.sdk.dag
# airflow.decorators.task → airflow.sdk.task
# airflow.models.dag.DAG → airflow.sdk.DAG (or keep models for non-SDK use)
# airflow.datasets.Dataset → airflow.sdk.Asset
# airflow.timetables.* → airflow.sdk.timetables.*
# ── 3. Operators moved to providers ──────────────────────────────────
# Airflow 2.x (broken in 3.0 — removed from core):
from airflow.operators.bash import BashOperator # REMOVED from core
from airflow.operators.python import PythonOperator # REMOVED from core
from airflow.operators.email import EmailOperator # REMOVED from core
from airflow.sensors.filesystem import FileSensor # REMOVED from core
# Airflow 3.0 (install provider packages):
# pip install apache-airflow-providers-standard
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.operators.email import EmailOperator
from airflow.providers.standard.sensors.filesystem import FileSensor
# ── 4. XCom push/pull API changes ────────────────────────────────────
# Airflow 2.x (still works but deprecated):
def push_value(ti, **context):
ti.xcom_push(key="result", value=42)
def pull_value(ti, **context):
val = ti.xcom_pull(task_ids="push_task", key="result")
# Airflow 3.0 TaskFlow (recommended — type-safe, no manual push/pull):
@task
def compute() -> int:
return 42
@task
def consume(value: int) -> None:
print(f"Got: {value}")
# result is passed automatically between tasks:
consume(compute())
# ── 5. SubDAGs removed ───────────────────────────────────────────────
# SubDAGs are removed in 3.0. Migrate to TaskGroups (same UI grouping,
# no separate DAG run, no separate executor slot overhead):
# Airflow 2.x (SubDAG — removed in 3.0):
# from airflow.operators.subdag import SubDagOperator # REMOVED
# Airflow 3.0 (TaskGroup):
from airflow.sdk import dag, task
from airflow.utils.task_group import TaskGroup
@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def my_pipeline():
with TaskGroup("validation") as validation_group:
@task
def check_schema() -> bool:
return True
@task
def check_nulls() -> bool:
return True
schema_ok = check_schema()
nulls_ok = check_nulls()
@task
def load(schema: bool, nulls: bool) -> None:
print(f"Loading. Schema: {schema}, Nulls: {nulls}")
load(validation_group["check_schema"]().__class__, ...)Step-by-Step Migration from Airflow 2.x
Migrating a production Airflow installation requires careful sequencing: you cannot upgrade the scheduler in-place without verifying that all DAGs are compatible first. The safest approach is to run the upgrade check on a copy of your DAG repository before touching the production scheduler.
# ── Step 1: Run the upgrade check on your DAG repo ──────────────────
pip install "apache-airflow==3.0.0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.0/constraints-3.12.txt"
airflow upgrade-check
# Outputs a report of all detected compatibility issues:
# - execution_date usage
# - deprecated import paths
# - SubDAG usage
# - Removed operators
# Fix ALL reported issues before proceeding.
# ── Step 2: Update provider packages ─────────────────────────────────
# Map old operators to new provider package imports.
# Install the standard provider for bash/python/email/filesystem:
pip install "apache-airflow-providers-standard>=1.0.0"
# Google Cloud:
pip install "apache-airflow-providers-google>=10.0.0"
# AWS:
pip install "apache-airflow-providers-amazon>=8.0.0"
# Check provider compatibility matrix:
# https://airflow.apache.org/docs/apache-airflow-providers/index.html
# ── Step 3: Migrate Dataset → Asset in all DAG files ─────────────────
# Automated sed replacement (backup first!):
find dags/ -name "*.py" -exec sed -i 's/from airflow.datasets import Dataset/from airflow.sdk import Asset/g' {} ;
find dags/ -name "*.py" -exec sed -i 's/Dataset(/Asset(/g' {} ;
# ── Step 4: Replace execution_date with logical_date ─────────────────
find dags/ -name "*.py" -exec sed -i 's/context["execution_date"]/context["logical_date"]/g' {} ;
find dags/ -name "*.py" -exec sed -i 's/context.get("execution_date")/context.get("logical_date")/g' {} ;
# ── Step 5: Upgrade the database schema ──────────────────────────────
# Run AFTER stopping the Airflow 2.x scheduler (never run against live scheduler)
airflow db migrate
# Verify schema version:
airflow db check
# ── Step 6: Start Airflow 3.0 services ───────────────────────────────
# Scheduler (single or HA with multiple replicas)
airflow scheduler
# New in 3.0: DAG processor is a separate process
airflow dag-processor
# API server (replaces webserver for API traffic in 3.0)
airflow api-server
# UI (React SPA served separately in 3.0)
airflow webserver
# Triggerer (unchanged from 2.x)
airflow triggererNote
airflow webserver process into separate api-server and webserver processes. In Kubernetes deployments this means two separate Deployments. The DAG processor is also a separate process — this prevents slow DAG parsing from blocking the scheduler loop, a common production pain point in Airflow 2.x with large DAG repositories.After migrating, validate your DAGs with contract tests and schema assertions. See Data Pipeline Testing — Contract Tests, Great Expectations, and Schema Validation for patterns that catch regressions introduced during executor or provider changes.
Edge Executor — Remote Task Execution Without Celery
Airflow 2.x required either CeleryExecutor (with Redis or RabbitMQ as broker) or KubernetesExecutor (spawning a pod per task) for distributed execution. Both carry operational overhead. Airflow 3.0 introduces the Edge Executor: a lightweight HTTP-based executor where remote workers poll the Airflow API server for tasks, run them using the Task SDK, and report results back.
Edge workers require only apache-airflow-task-sdk and apache-airflow-providers-edge3 — no message broker, no full Airflow installation. This is particularly useful for running tasks in restricted environments (on-premises, edge locations, isolated VPCs) that can reach the Airflow API server over HTTPS but cannot access a Celery broker.
# ── Install Edge worker (minimal dependencies) ────────────────────────
pip install "apache-airflow-task-sdk==3.0.0"
pip install "apache-airflow-providers-edge3>=1.0.0"
# ── Configure Airflow scheduler to use Edge Executor ─────────────────
# airflow.cfg:
[core]
executor = airflow.providers.edge3.executors.edge_executor.EdgeExecutor
[edge]
api_url = https://airflow.internal:8080 # Airflow API server URL
api_token = ${AIRFLOW_EDGE_API_TOKEN} # JWT token for worker auth
concurrency = 16 # max concurrent tasks per worker
# ── Start an edge worker (on remote host/container/VM) ────────────────
# The worker polls the API server — no inbound connections needed
airflow edge worker --queue default --queue high-priority --concurrency 8
# ── Tag tasks to run on specific queues ───────────────────────────────
from airflow.sdk import dag, task
from datetime import datetime
@dag(schedule="@hourly", start_date=datetime(2026, 1, 1), catchup=False)
def distributed_pipeline():
@task(queue="high-priority")
def critical_transform() -> dict:
return {"status": "ok"}
@task(queue="default")
def write_results(result: dict) -> None:
print(f"Writing: {result}")
write_results(critical_transform())
# ── Kubernetes Helm chart for Airflow 3.0 with Edge Executor ──────────
# values.yaml:
executor: "EdgeExecutor"
workers:
replicas: 3
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "2000m"
memory: "4Gi"
dagProcessor:
enabled: true # new in Airflow 3.0 Helm chart
replicas: 1
apiServer:
enabled: true # separate from webserver in 3.0
replicas: 2REST API v2 — Stable and OpenAPI 3.1
Airflow 2.x shipped with an experimental REST API at /api/v1. Airflow 3.0 replaces it with a stable v2 API at /api/v2 built on FastAPI with OpenAPI 3.1 spec generation. The v1 API is removed — clients that call /api/v1 must be updated.
# ── Common REST API v2 operations ────────────────────────────────────
AIRFLOW_URL="https://airflow.internal:8080"
TOKEN="eyJhbGciOiJIUzI1NiJ9..." # JWT from API server
# Trigger a DAG run with parameters
curl -X POST "${AIRFLOW_URL}/api/v2/dags/process_orders/dagRuns" -H "Authorization: Bearer ${TOKEN}" -H "Content-Type: application/json" -d '{
"logical_date": "2026-06-16T00:00:00Z",
"conf": {"source": "s3://raw/2026-06-16/", "dry_run": false}
}'
# List DAG runs with filters
curl "${AIRFLOW_URL}/api/v2/dags/process_orders/dagRuns?state=failed&limit=10" -H "Authorization: Bearer ${TOKEN}"
# Get task instance logs
curl "${AIRFLOW_URL}/api/v2/dags/process_orders/dagRuns/${RUN_ID}/taskInstances/validate_orders/logs/1" -H "Authorization: Bearer ${TOKEN}"
# Pause / unpause a DAG
curl -X PATCH "${AIRFLOW_URL}/api/v2/dags/process_orders" -H "Authorization: Bearer ${TOKEN}" -H "Content-Type: application/json" -d '{"is_paused": true}'
# List Assets and their last update
curl "${AIRFLOW_URL}/api/v2/assets?limit=50" -H "Authorization: Bearer ${TOKEN}"
# Manually mark an Asset as updated (trigger downstream DAGs)
curl -X POST "${AIRFLOW_URL}/api/v2/assets/events" -H "Authorization: Bearer ${TOKEN}" -H "Content-Type: application/json" -d '{"asset_uri": "s3://warehouse/orders/daily/", "extra": {"source": "manual"}}'
# ── Python client (generated from OpenAPI spec) ───────────────────────
# pip install apache-airflow-client
from airflow_client.client import ApiClient, Configuration
from airflow_client.client.api import dag_run_api
config = Configuration(
host="https://airflow.internal:8080/api/v2",
access_token="${AIRFLOW_TOKEN}",
)
with ApiClient(config) as api_client:
api = dag_run_api.DAGRunApi(api_client)
run = api.trigger_dag_run(
dag_id="process_orders",
trigger_dag_run_post_body={
"logical_date": "2026-06-16T00:00:00Z",
"conf": {"dry_run": False},
},
)
print(f"Started run: {run.dag_run_id}")The stable REST API v2 makes it practical to build AI agents that autonomously monitor, trigger, and repair pipelines. See Agentic Data Workflows — Using AI Agents to Automate Pipeline Orchestration for concrete tool definitions and integration patterns.
Production Configuration Changes
Several airflow.cfg settings were renamed or removed in 3.0. Before upgrading, compare your current configuration against the 3.0 defaults. The most impactful changes are in the [core], [webserver], and [scheduler] sections. Understanding these changes is even more important than the Airflow 2.x production patterns for scheduler high-availability and DAG serialization — those patterns still apply but the configuration keys have changed.
# ── airflow.cfg — notable changes from 2.x to 3.0 ───────────────────
[core]
# 2.x: load_examples = True
# 3.0: unchanged — still True by default; set False in production
load_examples = False
# 2.x: store_serialized_dags = True (added 2.4)
# 3.0: REMOVED — serialization is always on and cannot be disabled
# store_serialized_dags = True ← delete this line
# 2.x: min_serialized_dag_update_interval = 30
# 3.0: renamed
min_serialized_dag_update_interval = 30 # still valid
# 2.x: executor = LocalExecutor
# 3.0: for distributed execution, use EdgeExecutor instead of CeleryExecutor
executor = LocalExecutor # still valid for single-node
[scheduler]
# 2.x: parsing_processes = 2
# 3.0: moved to [dag_processor] section
# parsing_processes = 2 ← DELETE
[dag_processor]
# New section in 3.0 — configure DAG processor (separate process)
parsing_processes = 4 # number of parallel DAG parsing workers
dag_dir_list_interval = 30 # seconds between scans of dags/ folder
file_parsing_sort_mode = modified_time # prioritize recently changed DAGs
[webserver]
# 2.x: web_server_host = 0.0.0.0
# 3.0: webserver serves only the React UI — API traffic goes to api-server
web_server_host = 0.0.0.0
web_server_port = 8080
[api_server]
# New section in 3.0
host = 0.0.0.0
port = 9090 # separate port from webserver
workers = 4 # gunicorn/uvicorn workers
[database]
# 2.x: sql_alchemy_conn
# 3.0: renamed (old name still works with deprecation warning)
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres/airflow
sql_engine_encoding = utf-8
# ── Environment variable overrides (recommended pattern) ─────────────
# All cfg settings: AIRFLOW__{SECTION}__{KEY} (double underscore)
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql+psycopg2://..."
export AIRFLOW__CORE__FERNET_KEY="${FERNET_KEY}"
export AIRFLOW__CORE__EXECUTOR="LocalExecutor"
export AIRFLOW__DAG_PROCESSOR__PARSING_PROCESSES="4"Airflow 3.0 Migration Checklist
Run airflow upgrade-check against your entire dags/ directory on a branch before touching production — fix every reported issue and run the check again until it passes cleanly
Replace all execution_date references with logical_date throughout DAG code — execution_date is removed from the task context in 3.0 and will raise KeyError silently if left in place
Migrate from airflow.operators.bash, airflow.operators.python, and airflow.sensors.filesystem to apache-airflow-providers-standard — these operators are no longer part of Airflow core
Rename all Dataset() usages to Asset() and update imports from airflow.datasets to airflow.sdk — the migration is a search-replace but test thoroughly as Asset scheduling semantics have changed
Remove any SubDAG patterns and replace with TaskGroup — SubDAGs are removed in 3.0 and TaskGroups provide the same UI grouping without a separate DAG run or executor slot
Update airflow.cfg: remove store_serialized_dags (always on in 3.0), move parsing_processes from [scheduler] to [dag_processor], and add the new [api_server] section for the separate API process
Add the dag-processor as a separate Kubernetes Deployment or systemd service — in Airflow 3.0 DAG parsing runs in its own process and must be started explicitly
Update any direct REST API integrations from /api/v1 to /api/v2 — the v1 API is removed; generate a new Python client from the OpenAPI 3.1 spec at /api/v2/openapi.json
Pin provider package versions in requirements.txt alongside apache-airflow==3.0.0 — provider packages now follow their own release cadence and can introduce breaking changes independently
Test backfills after migration by running a historical backfill on a non-critical DAG and verifying that versioned reruns produce identical output to the original run
Hitting single-machine limits on ML training or hyperparameter search, managing separate Spark, training, and serving clusters, or looking to consolidate your ML infrastructure on Kubernetes?
We design and implement distributed ML platforms with Ray — from Ray Core actor design and Ray Data preprocessing pipeline configuration to Ray Train multi-GPU training setup with fault-tolerant S3 checkpointing, Ray Tune HPO with ASHA and Optuna search, MLflow integration for experiment tracking across all trials, Ray Serve deployment graph design with request batching and autoscaling, KubeRay operator installation with RayCluster and RayJob CRD configuration for ephemeral training jobs, RayService rolling upgrade configuration, GPU node pool setup with spot instance toleration and Karpenter autoscaler integration, and Prometheus metrics scraping for cluster observability. Let’s talk.
Let's Talk