Back to Blog
Apache AirflowAirflow 3.0Workflow OrchestrationPythonData EngineeringETLData PipelinesMigration

Ray for Distributed ML — Train, Tune, Serve, and Scale Across Clusters

A practical guide to Ray for distributed machine learning: Ray Core remote tasks and actors with @ray.remote, the plasma object store for zero-copy shared memory between workers, Ray Data for scalable dataset preprocessing with lazy map_batches transformations and direct Parquet reads from S3, Ray Train for multi-GPU and multi-node distributed PyTorch training with DistributedDataParallel wrapping, fault-tolerant checkpointing on S3 with FailureConfig max_failures for spot instance resilience, ray.train.report() for per-epoch metric and checkpoint reporting, Ray Tune for distributed hyperparameter optimization with the ASHA scheduler for aggressive early stopping, Optuna Bayesian search for smarter candidate generation, Population-Based Training for mid-training hyperparameter mutation, MLflowLoggerCallback for automatic experiment tracking across all trials, Ray Serve for scalable model serving with @serve.batch request batching for GPU efficiency, autoscaling_config with min/max replicas and target_ongoing_requests, multi-model deployment graphs with Router actors binding Preprocessor and Classifier deployments, KubeRay operator with RayCluster for persistent clusters, RayJob for ephemeral per-run clusters that auto-cleanup after job completion, RayService for zero-downtime rolling upgrades of Ray Serve applications, GPU worker node pools with spot instance tolerations and Karpenter NodePool integration, and a 10-point production checklist covering version pinning, head node CPU isolation, checkpoint storage, fault tolerance testing, and Prometheus metrics scraping.

2026-06-16

What Is Airflow 3.0?

Apache Airflow 3.0 is the first major version bump since Airflow 2.0 in December 2020. The headline changes are architectural: the Task SDK is now a separate installable package decoupled from the Airflow core, Datasets are renamed to Assets with richer event-driven scheduling semantics, DAG versioning makes backfills reproducible even when DAG code changes between runs, the web UI is rewritten in React, and REST API v2 replaces the experimental v1 that shipped with Airflow 2.

Airflow 3.0 also introduces breaking changes that require code updates before upgrading: several import paths changed, the execution_date context variable is replaced by logical_date, and operators moved out of core into provider packages. For teams running Airflow 2.x in production with complex DAG portfolios, the migration is non-trivial but well-documented — this article covers each change and the steps needed to upgrade safely.

Task SDK

Tasks now run in a separate process using the standalone apache-airflow-task-sdk package. Worker nodes no longer need a full Airflow installation.

Assets

Datasets renamed to Assets with richer semantics: aliases, conditional scheduling, and watchers that trigger DAGs on external asset updates.

DAG Versioning

Each DAG code change creates a new version. Backfills and reruns execute against the version that originally ran, not the current code.

Airflow 3.0 vs 2.x — Key Differences

AreaAirflow 2.xAirflow 3.0
Task executionLocalExecutor / CeleryExecutor in coreTask SDK + Edge Executor for remote workers
Dataset triggersDataset (simple outlet scheduling)Asset with aliases, watchers, and conditions
DAG versioningNo versioning — latest code always runsImmutable versions per code change
REST APIExperimental REST API v1Stable REST API v2 (OpenAPI 3.1)
Web UIFlask/Jinja2 (Airflow UI)React SPA with improved Grid/Graph views
context['execution_date']Available (legacy)Removed — use logical_date
Operator locationMany operators in airflow.operators.*Operators in provider packages only

The New Task SDK

In Airflow 2.x, every worker node running tasks needed a full Airflow installation — the same codebase as the scheduler. This created version-lock between scheduler upgrades and worker deployments and meant worker containers were large. Airflow 3.0 separates task execution into the apache-airflow-task-sdk package.

The Task SDK is a lightweight package that provides the decorator API (@task), XCom access, and task context — without pulling in the full Airflow dependency tree. DAG authors who use the TaskFlow API can write task code that imports only from airflow.sdk and deploy it to worker environments that install only apache-airflow-task-sdk.

# Install the full Airflow (scheduler, webserver, triggerer)
pip install "apache-airflow==3.0.0"

# Install only the Task SDK (workers, task containers)
pip install "apache-airflow-task-sdk==3.0.0"

# ── Writing tasks with the SDK ─────────────────────────────────────────
# Airflow 3.0 — import from airflow.sdk (recommended)
from airflow.sdk import dag, task, Asset
from datetime import datetime, timedelta

# Legacy import path still works but is deprecated:
# from airflow.decorators import dag, task

orders_asset = Asset("s3://warehouse/orders/daily/")

@dag(
    dag_id="process_orders",
    schedule=orders_asset,          # trigger when this Asset is updated
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["orders", "production"],
)
def process_orders_dag():

    @task
    def validate_orders(ti=None) -> dict:
        # ti is the TaskInstance context object
        logical_date = ti.logical_date   # replaces execution_date
        print(f"Validating orders for {logical_date.date()}")
        return {"date": str(logical_date.date()), "valid": True}

    @task
    def transform_orders(validation: dict) -> int:
        if not validation["valid"]:
            raise ValueError("Validation failed")
        # Simulate transformation
        return 1_000

    @task(outlets=[Asset("s3://warehouse/orders_processed/daily/")])
    def load_orders(count: int) -> None:
        print(f"Loaded {count} order records")

    result = validate_orders()
    count  = transform_orders(result)
    load_orders(count)

dag_instance = process_orders_dag()

Note

The airflow.sdk import path is the stable public API surface for Airflow 3.0. Code that imports from airflow.decorators, airflow.operators.python, or airflow.models will still work in 3.0 but those paths emit deprecation warnings and may be removed in 4.0. Migrate to airflow.sdk now to make future upgrades easier.

Airflow Assets — Datasets Redesigned

Airflow 2.4 introduced Datasets as a way to trigger downstream DAGs when an upstream DAG updated a named data resource. The concept was powerful but limited: triggers were binary (updated or not), there was no way to express conditional logic, and the namespace was flat strings. Airflow 3.0 renames Datasets to Assets and significantly expands the model.

from airflow.sdk import Asset, AssetAlias, dag, task
from airflow.sdk.definitions.asset import AssetWatcher
from datetime import datetime

# ── Basic asset definition ─────────────────────────────────────────────
raw_orders    = Asset("s3://raw/orders/")
clean_orders  = Asset("s3://clean/orders/")
orders_report = Asset("s3://reports/orders/")

# ── Asset alias: decouple producer from consumer ──────────────────────
# Producers write to the alias; consumers schedule on the alias.
# Swap producers without changing downstream DAG schedules.
orders_alias = AssetAlias("orders-daily")

@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def ingest_orders():
    @task(outlets=[raw_orders, orders_alias])
    def fetch_and_store() -> None:
        print("Ingesting orders → raw_orders + orders-daily alias")

    fetch_and_store()

# ── Consumer: schedule on the alias ──────────────────────────────────
@dag(schedule=orders_alias, start_date=datetime(2026, 1, 1), catchup=False)
def transform_on_orders_alias():
    @task(outlets=[clean_orders])
    def transform() -> None:
        print("Triggered via orders-daily alias")

    transform()

# ── Conditional scheduling: require multiple assets ──────────────────
from airflow.sdk.definitions.asset import AssetAll, AssetAny

# Run only when BOTH assets have been updated
@dag(
    schedule=AssetAll(clean_orders, Asset("s3://clean/payments/")),
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def orders_payments_join():
    @task(outlets=[orders_report])
    def join_and_report() -> None:
        print("Both orders and payments ready — running join")

    join_and_report()

# Run when EITHER asset updates
@dag(
    schedule=AssetAny(raw_orders, Asset("s3://raw/orders_v2/")),
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def ingest_any_source():
    @task
    def ingest() -> None:
        print("At least one source updated")

    ingest()

# ── Asset watcher: trigger on external updates ───────────────────────
# Trigger a DAG when an external system updates an asset
# (no outlet from an Airflow task required)
external_crm_export = Asset(
    "s3://crm/exports/daily/",
    watchers=[
        AssetWatcher(
            name="crm-export-watcher",
            trigger=...,   # S3EventTrigger or custom trigger
        )
    ],
)

Note

Asset aliases are the recommended pattern for decoupling producer and consumer DAGs in large organizations. Without aliases, renaming the S3 path in a producer DAG requires updating every consumer that schedules on that asset. With aliases, producers emit to a logical name and consumers depend on the name — the physical path is an implementation detail.

DAG Versioning

In Airflow 2.x, a backfill always executed against the current version of the DAG code. If you changed a DAG between the original run date and the backfill, the backfill used the new logic — making historical runs non-reproducible. Airflow 3.0 solves this with immutable DAG versions: every time the serialized DAG changes (new task, changed parameters, updated schedule), Airflow creates a new version and stores the old one.

Backfills in Airflow 3.0 default to running against the version that was active at each logical_date. You can override this to backfill with the current version if you explicitly need the new behavior. This feature is especially important for compliance workloads where historical reruns must match original processing logic.

# ── DAG versioning — CLI commands ────────────────────────────────────
# List all versions of a DAG
airflow dags versions list --dag-id process_orders

# Output:
# Version  Active  Created                  Hash
# 3        True    2026-06-16T10:00:00Z     a1b2c3d4
# 2        False   2026-06-10T08:00:00Z     e5f6g7h8
# 1        False   2026-06-01T00:00:00Z     i9j0k1l2

# ── Backfill against a specific version ──────────────────────────────
# Default: backfill uses the version active at each logical_date
airflow dags backfill --dag-id process_orders   --start-date 2026-06-01   --end-date 2026-06-10

# Force backfill with current (latest) version:
airflow dags backfill --dag-id process_orders   --start-date 2026-06-01   --end-date 2026-06-10   --run-backwards False   --use-current-dag-version

# ── View DAG diff between versions ──────────────────────────────────
airflow dags versions diff --dag-id process_orders --version-a 2 --version-b 3

Breaking Changes from Airflow 2.x

Airflow 3.0 removes or renames several APIs that were deprecated in 2.x. Most changes are in import paths and context variable names — the underlying task logic rarely needs modification. Run airflow upgrade-check before upgrading to get a report of all issues in your DAG codebase. This tool is comparable to how Dagster handles breaking API changes with compatibility shims before final removal.

# ── 1. execution_date removed — use logical_date ─────────────────────

# Airflow 2.x (broken in 3.0):
def my_task(**context):
    execution_date = context["execution_date"]          # KeyError in 3.0
    ds = context["ds"]                                  # still works
    next_ds = context["next_ds"]                        # still works

# Airflow 3.0 (correct):
def my_task(**context):
    logical_date = context["logical_date"]              # pendulum.DateTime
    ds = context["ds"]                                  # date string — unchanged

# With TaskFlow API:
@task
def my_task(ti=None):
    logical_date = ti.logical_date
    data_interval_start = ti.data_interval_start
    data_interval_end   = ti.data_interval_end

# ── 2. Import path changes ────────────────────────────────────────────

# Airflow 2.x → Airflow 3.0 import mapping:
# airflow.decorators.dag         → airflow.sdk.dag
# airflow.decorators.task        → airflow.sdk.task
# airflow.models.dag.DAG         → airflow.sdk.DAG (or keep models for non-SDK use)
# airflow.datasets.Dataset       → airflow.sdk.Asset
# airflow.timetables.*           → airflow.sdk.timetables.*

# ── 3. Operators moved to providers ──────────────────────────────────

# Airflow 2.x (broken in 3.0 — removed from core):
from airflow.operators.bash        import BashOperator      # REMOVED from core
from airflow.operators.python      import PythonOperator    # REMOVED from core
from airflow.operators.email       import EmailOperator     # REMOVED from core
from airflow.sensors.filesystem    import FileSensor        # REMOVED from core

# Airflow 3.0 (install provider packages):
# pip install apache-airflow-providers-standard
from airflow.providers.standard.operators.bash     import BashOperator
from airflow.providers.standard.operators.python   import PythonOperator
from airflow.providers.standard.operators.email    import EmailOperator
from airflow.providers.standard.sensors.filesystem import FileSensor

# ── 4. XCom push/pull API changes ────────────────────────────────────

# Airflow 2.x (still works but deprecated):
def push_value(ti, **context):
    ti.xcom_push(key="result", value=42)

def pull_value(ti, **context):
    val = ti.xcom_pull(task_ids="push_task", key="result")

# Airflow 3.0 TaskFlow (recommended — type-safe, no manual push/pull):
@task
def compute() -> int:
    return 42

@task
def consume(value: int) -> None:
    print(f"Got: {value}")

# result is passed automatically between tasks:
consume(compute())

# ── 5. SubDAGs removed ───────────────────────────────────────────────
# SubDAGs are removed in 3.0. Migrate to TaskGroups (same UI grouping,
# no separate DAG run, no separate executor slot overhead):

# Airflow 2.x (SubDAG — removed in 3.0):
# from airflow.operators.subdag import SubDagOperator  # REMOVED

# Airflow 3.0 (TaskGroup):
from airflow.sdk import dag, task
from airflow.utils.task_group import TaskGroup

@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def my_pipeline():
    with TaskGroup("validation") as validation_group:
        @task
        def check_schema() -> bool:
            return True

        @task
        def check_nulls() -> bool:
            return True

        schema_ok = check_schema()
        nulls_ok  = check_nulls()

    @task
    def load(schema: bool, nulls: bool) -> None:
        print(f"Loading. Schema: {schema}, Nulls: {nulls}")

    load(validation_group["check_schema"]().__class__, ...)

Step-by-Step Migration from Airflow 2.x

Migrating a production Airflow installation requires careful sequencing: you cannot upgrade the scheduler in-place without verifying that all DAGs are compatible first. The safest approach is to run the upgrade check on a copy of your DAG repository before touching the production scheduler.

# ── Step 1: Run the upgrade check on your DAG repo ──────────────────
pip install "apache-airflow==3.0.0" --constraint   "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.0/constraints-3.12.txt"

airflow upgrade-check
# Outputs a report of all detected compatibility issues:
# - execution_date usage
# - deprecated import paths
# - SubDAG usage
# - Removed operators
# Fix ALL reported issues before proceeding.

# ── Step 2: Update provider packages ─────────────────────────────────
# Map old operators to new provider package imports.
# Install the standard provider for bash/python/email/filesystem:
pip install "apache-airflow-providers-standard>=1.0.0"

# Google Cloud:
pip install "apache-airflow-providers-google>=10.0.0"

# AWS:
pip install "apache-airflow-providers-amazon>=8.0.0"

# Check provider compatibility matrix:
# https://airflow.apache.org/docs/apache-airflow-providers/index.html

# ── Step 3: Migrate Dataset → Asset in all DAG files ─────────────────
# Automated sed replacement (backup first!):
find dags/ -name "*.py" -exec sed -i   's/from airflow.datasets import Dataset/from airflow.sdk import Asset/g' {} ;
find dags/ -name "*.py" -exec sed -i   's/Dataset(/Asset(/g' {} ;

# ── Step 4: Replace execution_date with logical_date ─────────────────
find dags/ -name "*.py" -exec sed -i   's/context["execution_date"]/context["logical_date"]/g' {} ;
find dags/ -name "*.py" -exec sed -i   's/context.get("execution_date")/context.get("logical_date")/g' {} ;

# ── Step 5: Upgrade the database schema ──────────────────────────────
# Run AFTER stopping the Airflow 2.x scheduler (never run against live scheduler)
airflow db migrate

# Verify schema version:
airflow db check

# ── Step 6: Start Airflow 3.0 services ───────────────────────────────
# Scheduler (single or HA with multiple replicas)
airflow scheduler

# New in 3.0: DAG processor is a separate process
airflow dag-processor

# API server (replaces webserver for API traffic in 3.0)
airflow api-server

# UI (React SPA served separately in 3.0)
airflow webserver

# Triggerer (unchanged from 2.x)
airflow triggerer

Note

Airflow 3.0 splits the monolithic airflow webserver process into separate api-server and webserver processes. In Kubernetes deployments this means two separate Deployments. The DAG processor is also a separate process — this prevents slow DAG parsing from blocking the scheduler loop, a common production pain point in Airflow 2.x with large DAG repositories.

After migrating, validate your DAGs with contract tests and schema assertions. See Data Pipeline Testing — Contract Tests, Great Expectations, and Schema Validation for patterns that catch regressions introduced during executor or provider changes.

Edge Executor — Remote Task Execution Without Celery

Airflow 2.x required either CeleryExecutor (with Redis or RabbitMQ as broker) or KubernetesExecutor (spawning a pod per task) for distributed execution. Both carry operational overhead. Airflow 3.0 introduces the Edge Executor: a lightweight HTTP-based executor where remote workers poll the Airflow API server for tasks, run them using the Task SDK, and report results back.

Edge workers require only apache-airflow-task-sdk and apache-airflow-providers-edge3 — no message broker, no full Airflow installation. This is particularly useful for running tasks in restricted environments (on-premises, edge locations, isolated VPCs) that can reach the Airflow API server over HTTPS but cannot access a Celery broker.

# ── Install Edge worker (minimal dependencies) ────────────────────────
pip install "apache-airflow-task-sdk==3.0.0"
pip install "apache-airflow-providers-edge3>=1.0.0"

# ── Configure Airflow scheduler to use Edge Executor ─────────────────
# airflow.cfg:
[core]
executor = airflow.providers.edge3.executors.edge_executor.EdgeExecutor

[edge]
api_url = https://airflow.internal:8080   # Airflow API server URL
api_token = ${AIRFLOW_EDGE_API_TOKEN}     # JWT token for worker auth
concurrency = 16                          # max concurrent tasks per worker

# ── Start an edge worker (on remote host/container/VM) ────────────────
# The worker polls the API server — no inbound connections needed
airflow edge worker   --queue default   --queue high-priority   --concurrency 8

# ── Tag tasks to run on specific queues ───────────────────────────────
from airflow.sdk import dag, task
from datetime import datetime

@dag(schedule="@hourly", start_date=datetime(2026, 1, 1), catchup=False)
def distributed_pipeline():

    @task(queue="high-priority")
    def critical_transform() -> dict:
        return {"status": "ok"}

    @task(queue="default")
    def write_results(result: dict) -> None:
        print(f"Writing: {result}")

    write_results(critical_transform())

# ── Kubernetes Helm chart for Airflow 3.0 with Edge Executor ──────────
# values.yaml:
executor: "EdgeExecutor"

workers:
  replicas: 3
  resources:
    requests:
      cpu: "500m"
      memory: "1Gi"
    limits:
      cpu: "2000m"
      memory: "4Gi"

dagProcessor:
  enabled: true       # new in Airflow 3.0 Helm chart
  replicas: 1

apiServer:
  enabled: true       # separate from webserver in 3.0
  replicas: 2

REST API v2 — Stable and OpenAPI 3.1

Airflow 2.x shipped with an experimental REST API at /api/v1. Airflow 3.0 replaces it with a stable v2 API at /api/v2 built on FastAPI with OpenAPI 3.1 spec generation. The v1 API is removed — clients that call /api/v1 must be updated.

# ── Common REST API v2 operations ────────────────────────────────────
AIRFLOW_URL="https://airflow.internal:8080"
TOKEN="eyJhbGciOiJIUzI1NiJ9..."   # JWT from API server

# Trigger a DAG run with parameters
curl -X POST "${AIRFLOW_URL}/api/v2/dags/process_orders/dagRuns"   -H "Authorization: Bearer ${TOKEN}"   -H "Content-Type: application/json"   -d '{
    "logical_date": "2026-06-16T00:00:00Z",
    "conf": {"source": "s3://raw/2026-06-16/", "dry_run": false}
  }'

# List DAG runs with filters
curl "${AIRFLOW_URL}/api/v2/dags/process_orders/dagRuns?state=failed&limit=10"   -H "Authorization: Bearer ${TOKEN}"

# Get task instance logs
curl "${AIRFLOW_URL}/api/v2/dags/process_orders/dagRuns/${RUN_ID}/taskInstances/validate_orders/logs/1"   -H "Authorization: Bearer ${TOKEN}"

# Pause / unpause a DAG
curl -X PATCH "${AIRFLOW_URL}/api/v2/dags/process_orders"   -H "Authorization: Bearer ${TOKEN}"   -H "Content-Type: application/json"   -d '{"is_paused": true}'

# List Assets and their last update
curl "${AIRFLOW_URL}/api/v2/assets?limit=50"   -H "Authorization: Bearer ${TOKEN}"

# Manually mark an Asset as updated (trigger downstream DAGs)
curl -X POST "${AIRFLOW_URL}/api/v2/assets/events"   -H "Authorization: Bearer ${TOKEN}"   -H "Content-Type: application/json"   -d '{"asset_uri": "s3://warehouse/orders/daily/", "extra": {"source": "manual"}}'

# ── Python client (generated from OpenAPI spec) ───────────────────────
# pip install apache-airflow-client
from airflow_client.client import ApiClient, Configuration
from airflow_client.client.api import dag_run_api

config = Configuration(
    host="https://airflow.internal:8080/api/v2",
    access_token="${AIRFLOW_TOKEN}",
)

with ApiClient(config) as api_client:
    api = dag_run_api.DAGRunApi(api_client)
    run = api.trigger_dag_run(
        dag_id="process_orders",
        trigger_dag_run_post_body={
            "logical_date": "2026-06-16T00:00:00Z",
            "conf": {"dry_run": False},
        },
    )
    print(f"Started run: {run.dag_run_id}")

The stable REST API v2 makes it practical to build AI agents that autonomously monitor, trigger, and repair pipelines. See Agentic Data Workflows — Using AI Agents to Automate Pipeline Orchestration for concrete tool definitions and integration patterns.

Production Configuration Changes

Several airflow.cfg settings were renamed or removed in 3.0. Before upgrading, compare your current configuration against the 3.0 defaults. The most impactful changes are in the [core], [webserver], and [scheduler] sections. Understanding these changes is even more important than the Airflow 2.x production patterns for scheduler high-availability and DAG serializationthose patterns still apply but the configuration keys have changed.

# ── airflow.cfg — notable changes from 2.x to 3.0 ───────────────────

[core]
# 2.x: load_examples = True
# 3.0: unchanged — still True by default; set False in production
load_examples = False

# 2.x: store_serialized_dags = True (added 2.4)
# 3.0: REMOVED — serialization is always on and cannot be disabled
# store_serialized_dags = True   ← delete this line

# 2.x: min_serialized_dag_update_interval = 30
# 3.0: renamed
min_serialized_dag_update_interval = 30   # still valid

# 2.x: executor = LocalExecutor
# 3.0: for distributed execution, use EdgeExecutor instead of CeleryExecutor
executor = LocalExecutor   # still valid for single-node

[scheduler]
# 2.x: parsing_processes = 2
# 3.0: moved to [dag_processor] section
# parsing_processes = 2   ← DELETE

[dag_processor]
# New section in 3.0 — configure DAG processor (separate process)
parsing_processes = 4      # number of parallel DAG parsing workers
dag_dir_list_interval = 30 # seconds between scans of dags/ folder
file_parsing_sort_mode = modified_time   # prioritize recently changed DAGs

[webserver]
# 2.x: web_server_host = 0.0.0.0
# 3.0: webserver serves only the React UI — API traffic goes to api-server
web_server_host = 0.0.0.0
web_server_port = 8080

[api_server]
# New section in 3.0
host = 0.0.0.0
port = 9090              # separate port from webserver
workers = 4              # gunicorn/uvicorn workers

[database]
# 2.x: sql_alchemy_conn
# 3.0: renamed (old name still works with deprecation warning)
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres/airflow
sql_engine_encoding = utf-8

# ── Environment variable overrides (recommended pattern) ─────────────
# All cfg settings: AIRFLOW__{SECTION}__{KEY} (double underscore)
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql+psycopg2://..."
export AIRFLOW__CORE__FERNET_KEY="${FERNET_KEY}"
export AIRFLOW__CORE__EXECUTOR="LocalExecutor"
export AIRFLOW__DAG_PROCESSOR__PARSING_PROCESSES="4"

Airflow 3.0 Migration Checklist

1

Run airflow upgrade-check against your entire dags/ directory on a branch before touching production — fix every reported issue and run the check again until it passes cleanly

2

Replace all execution_date references with logical_date throughout DAG code — execution_date is removed from the task context in 3.0 and will raise KeyError silently if left in place

3

Migrate from airflow.operators.bash, airflow.operators.python, and airflow.sensors.filesystem to apache-airflow-providers-standard — these operators are no longer part of Airflow core

4

Rename all Dataset() usages to Asset() and update imports from airflow.datasets to airflow.sdk — the migration is a search-replace but test thoroughly as Asset scheduling semantics have changed

5

Remove any SubDAG patterns and replace with TaskGroup — SubDAGs are removed in 3.0 and TaskGroups provide the same UI grouping without a separate DAG run or executor slot

6

Update airflow.cfg: remove store_serialized_dags (always on in 3.0), move parsing_processes from [scheduler] to [dag_processor], and add the new [api_server] section for the separate API process

7

Add the dag-processor as a separate Kubernetes Deployment or systemd service — in Airflow 3.0 DAG parsing runs in its own process and must be started explicitly

8

Update any direct REST API integrations from /api/v1 to /api/v2 — the v1 API is removed; generate a new Python client from the OpenAPI 3.1 spec at /api/v2/openapi.json

9

Pin provider package versions in requirements.txt alongside apache-airflow==3.0.0 — provider packages now follow their own release cadence and can introduce breaking changes independently

10

Test backfills after migration by running a historical backfill on a non-critical DAG and verifying that versioned reruns produce identical output to the original run

Hitting single-machine limits on ML training or hyperparameter search, managing separate Spark, training, and serving clusters, or looking to consolidate your ML infrastructure on Kubernetes?

We design and implement distributed ML platforms with Ray — from Ray Core actor design and Ray Data preprocessing pipeline configuration to Ray Train multi-GPU training setup with fault-tolerant S3 checkpointing, Ray Tune HPO with ASHA and Optuna search, MLflow integration for experiment tracking across all trials, Ray Serve deployment graph design with request batching and autoscaling, KubeRay operator installation with RayCluster and RayJob CRD configuration for ephemeral training jobs, RayService rolling upgrade configuration, GPU node pool setup with spot instance toleration and Karpenter autoscaler integration, and Prometheus metrics scraping for cluster observability. Let’s talk.

Let's Talk

Related Articles

DataSOps Consulting

Need help implementing this in production?

We build and operate data pipelines, AI systems, and observability stacks for engineering teams. Reach out for a free 30-minute architecture review.