Back to Blog
PrefectWorkflow OrchestrationPythonData EngineeringETLData PipelinesDevOpsScheduling

Apache Airflow 3.0 — What Changed, Migration Guide, and New Task SDK

A practical guide to Apache Airflow 3.0: the standalone apache-airflow-task-sdk package decoupling task execution from the Airflow core so workers install only the Task SDK without a full Airflow installation, the new airflow.sdk import namespace replacing airflow.decorators and airflow.datasets, Airflow Assets replacing Datasets with AssetAlias for decoupling producer and consumer DAGs, AssetAll and AssetAny for conditional multi-asset scheduling, AssetWatcher for triggering on external system updates without an Airflow outlet, immutable DAG versioning so backfills replay against the code version that originally ran rather than current code, the breaking removal of execution_date from task context replaced by logical_date throughout all DAG code, BashOperator and PythonOperator and FileSensor moved from Airflow core to apache-airflow-providers-standard, SubDAGs removed in favour of TaskGroups, the Edge Executor replacing CeleryExecutor for remote task execution over HTTP with workers polling the API server using only the Task SDK and no message broker, a new [dag_processor] configuration section for the separate DAG parsing process decoupled from the scheduler loop, the stable REST API v2 built on FastAPI with OpenAPI 3.1 replacing the experimental v1 API, step-by-step migration playbook covering airflow upgrade-check, Dataset to Asset search-replace, execution_date to logical_date substitution, cfg section changes, and Kubernetes Helm chart updates for the new dag-processor and api-server Deployments, and a 10-point production migration checklist.

2026-06-15

Why Prefect 3?

Apache Airflow shaped a decade of data pipeline orchestration, but its design reflects the constraints of its era: DAGs defined as static Python code that the scheduler parses at fixed intervals, operators that embed execution logic and infrastructure concerns together, and a task model that cannot easily express fan-out parallelism over dynamic lists. Prefect 3 takes a different approach — workflows are plain Python functions decorated with @flow and @task. No DAG class, no operator inheritance, no XML or YAML describing your pipeline logic.

The key insight is that Prefect instruments existing Python code rather than replacing it. You can run a Prefect flow from your terminal during development — it executes exactly like a plain Python script but also emits structured state events, captures logs, stores results, and records run history. The same code is deployed to production with a single CLI command. For teams already owning complex data pipelines in Python, this is a lower adoption cost than migrating to a framework with its own DSL.

Prefect 3 (released 2024) is a significant rewrite that introduced a new work pool architecture, a leaner task engine, improved async support, and a revamped deployment model. Compared to Dagster, which centers on software-defined assets and type-checked data lineage, Prefect focuses on the operational experience of running Python workflows: retries, caching, dynamic parallelism, and observability — without requiring you to restructure your code around an asset graph.

Plain Python

Flows are regular functions. No DAG classes, no operator inheritance. Existing Python scripts need minimal changes to become orchestrated workflows.

Dynamic DAGs

Task graphs are built at runtime, not parse time. Fan-out parallelism with .map() and .submit() over lists determined at execution time.

Hybrid execution

Work pools decouple scheduling from execution. Run workers in your VPC, Kubernetes, or serverless — the Prefect server never touches your data.

Prefect 3 vs Airflow vs Dagster

PropertyPrefect 3Apache Airflow 2Dagster
Workflow model@flow / @task decoratorsDAG + Operator classes@asset / @op / @job
Dynamic tasksNative: .map() / .submit()Dynamic task mapping (2.3+)Dynamic partitions
Execution modelWork pools + workersScheduler + executorDaemon + run launchers
State managementRich: Completed, Failed, Cached, PausedSuccess, Failed, Skipped, Up for retrySuccess, Failure, Skipped
Data lineageArtifacts; no type systemXCom; no lineageFirst-class asset lineage graph
Learning curveLow — plain PythonMedium — operator modelMedium-High — asset graph
Best forPython ETL, ML pipelines, dynamic fan-outBatch scheduling, complex dependenciesData asset management, type-safe pipelines

Installation and First Flow

Prefect 3 requires Python 3.9+. Install it from PyPI, configure the API URL, and you can run flows immediately — either against a local ephemeral server, Prefect Cloud, or a self-hosted Prefect server.

# Install Prefect 3
pip install prefect>=3.0

# Option A: Prefect Cloud (managed — recommended for teams)
prefect cloud login  # opens browser, stores API key in ~/.prefect/profiles.toml

# Option B: Self-hosted server
pip install prefect-server
prefect server start  # starts API + UI on http://localhost:4200

# Option C: Ephemeral mode (local only, no server)
# PREFECT_API_URL is unset — flow runs stored in ~/.prefect/

# Verify connection
prefect version
prefect config view
# ── hello_pipeline.py — your first Prefect 3 flow ────────────────────
from prefect import flow, task
from prefect.logging import get_run_logger

@task(retries=2, retry_delay_seconds=10)
def extract(source: str) -> list[dict]:
    logger = get_run_logger()
    logger.info(f"Extracting from {source}")
    # Replace with real extraction logic
    return [{"id": 1, "value": 100}, {"id": 2, "value": 200}]

@task
def transform(records: list[dict]) -> list[dict]:
    return [{"id": r["id"], "value": r["value"] * 2} for r in records]

@task
def load(records: list[dict], destination: str) -> int:
    logger = get_run_logger()
    logger.info(f"Loading {len(records)} records to {destination}")
    return len(records)

@flow(name="etl-pipeline", log_prints=True)
def etl_pipeline(source: str = "s3://bucket/input/", destination: str = "warehouse") -> int:
    raw       = extract(source)
    processed = transform(raw)
    count     = load(processed, destination)
    print(f"Loaded {count} records")
    return count

if __name__ == "__main__":
    result = etl_pipeline()  # run locally — identical to a plain Python call

Note

log_prints=True on a flow captures all print() statements as structured Prefect log events. Use get_run_logger() inside tasks and flows for structured logging with run IDs automatically attached — this makes log correlation across distributed runs trivial in the Prefect UI.

State Management and Retry Policies

Every Prefect task and flow execution produces a State object: Completed, Failed, Crashed, Cancelled, Paused, or Cached. States carry a result reference, a message, and optional metadata. You can inspect state in your flow logic to implement conditional branching — something that is awkward in Airflow's trigger-rule model.

from prefect import flow, task
from prefect.states import Failed
from datetime import timedelta

# ── Fine-grained retry configuration ─────────────────────────────────
@task(
    retries=3,
    retry_delay_seconds=exponential_backoff(backoff_factor=2),
    # retry_delay_seconds can also be a list: [10, 30, 90]
    retry_jitter_factor=0.3,     # adds ±30% jitter to prevent thundering herds
    timeout_seconds=120,         # fail if the task exceeds 2 minutes
)
def fetch_api_data(endpoint: str) -> dict:
    import httpx
    response = httpx.get(endpoint, timeout=30)
    response.raise_for_status()
    return response.json()

# ── Retry on specific exceptions only ────────────────────────────────
import httpx
from prefect import task

@task(
    retries=2,
    retry_condition_fn=lambda task, task_run, state: (
        # Only retry on 5xx errors and network timeouts
        isinstance(state.result(raise_on_failure=False),
                   (httpx.TimeoutException, httpx.HTTPStatusError))
    ),
)
def resilient_fetch(url: str) -> bytes:
    return httpx.get(url, timeout=10).content

# ── State inspection in flow logic ───────────────────────────────────
from prefect import flow, task
from prefect.futures import PrefectFuture

@task
def risky_operation(value: int) -> int:
    if value < 0:
        raise ValueError(f"Negative value: {value}")
    return value * 10

@flow
def conditional_pipeline(values: list[int]) -> dict:
    successes, failures = [], []

    for val in values:
        # return_state=True gives you the State object, never raises
        state = risky_operation(val, return_state=True)
        if state.is_completed():
            successes.append(state.result())
        else:
            failures.append({"value": val, "error": str(state.result(raise_on_failure=False))})

    return {"successes": successes, "failures": failures}

Dynamic Task Mapping: .submit() and .map()

The most powerful Prefect 3 feature for data engineering is dynamic task mapping — running the same task over a collection whose size is determined at runtime. In Airflow, dynamic task mapping requires the list to be known at DAG parse time or uses the TaskFlow API's expand(). In Prefect, .map() submits one task run per element and returns a list of futures that resolve concurrently. This matches how complex Airflow DAGs handle fan-out with dynamic task mapping but with simpler syntax and no parse-time constraint. Note that Apache Airflow 3.0 introduced a new Task SDK and an Edge Executor that bring Airflow's execution model closer to Prefect's approach — teams evaluating the migration should check whether Airflow 3.0 closes the gaps that motivated the switch.

from prefect import flow, task
from prefect.futures import wait

# ── .submit(): explicit future-based concurrency ──────────────────────
@task
def process_file(path: str) -> int:
    # Simulate file processing — returns row count
    import time; time.sleep(1)
    return len(path) * 100   # placeholder

@flow
def parallel_file_pipeline(paths: list[str]) -> list[int]:
    # Submit all tasks concurrently — they run in parallel
    futures = [process_file.submit(path) for path in paths]

    # Wait for all futures and collect results
    results = [f.result() for f in futures]
    return results

# ── .map(): shorthand for the same pattern ────────────────────────────
@flow
def map_pipeline(paths: list[str]) -> list[int]:
    # Equivalent to the .submit() loop above
    future_list = process_file.map(paths)
    return [f.result() for f in future_list]

# ── Chained mapping: output of map feeds next map ─────────────────────
@task
def download_partition(partition_id: int) -> list[dict]:
    # Returns records for one partition
    return [{"partition": partition_id, "value": i} for i in range(10)]

@task
def validate_partition(records: list[dict]) -> int:
    valid = [r for r in records if r["value"] >= 0]
    return len(valid)

@task
def summarize(counts: list[int]) -> dict:
    return {"total": sum(counts), "partitions": len(counts)}

@flow
def partitioned_pipeline(partition_ids: list[int]) -> dict:
    # Stage 1: download all partitions concurrently
    download_futures = download_partition.map(partition_ids)

    # Stage 2: validate each downloaded partition (also concurrent)
    # Pass the list of futures directly — Prefect resolves them
    validate_futures = validate_partition.map(download_futures)

    # Stage 3: gather and summarize (waits for all validates)
    counts = [f.result() for f in validate_futures]
    summary = summarize(counts)
    return summary

# ── Mixed mapping: constant + variable args ───────────────────────────
from prefect import unmapped

@task
def enrich(record: dict, config: dict) -> dict:
    return {**record, "enriched": True, "config_version": config["version"]}

@flow
def enrichment_pipeline(records: list[dict]) -> list[dict]:
    config = {"version": "2.1", "env": "production"}
    # config is the same for every task — unmapped() broadcasts it
    futures = enrich.map(records, config=unmapped(config))
    return [f.result() for f in futures]

Note

Prefect's task runner controls how mapped tasks execute concurrently. The default ConcurrentTaskRunner uses Python threads — appropriate for I/O-bound work. Install prefect[dask] or prefect[ray] for CPU-bound parallel work. The DaskTaskRunner distributes tasks across a Dask cluster; RayTaskRunner distributes them across a Ray cluster. Task runner is set per-flow, not globally.

Subflows for Modular Pipeline Composition

Any flow can call another flow as a subflow. Subflows appear as nested run groups in the Prefect UI, each with their own state history, logs, and result. This enables modular pipeline composition — shared ETL primitives defined once and composed into domain-specific pipelines without code duplication.

from prefect import flow, task

# ── Reusable subflow: generic ingestion primitive ─────────────────────
@task
def read_source(uri: str) -> list[dict]:
    # Abstract over S3, local, API — caller doesn't care
    return [{"uri": uri, "row": i} for i in range(100)]

@task
def write_sink(records: list[dict], table: str) -> int:
    print(f"Writing {len(records)} rows to {table}")
    return len(records)

@flow(name="ingest-source")
def ingest_source(uri: str, target_table: str) -> int:
    records = read_source(uri)
    return write_sink(records, target_table)

# ── Domain pipeline: composes multiple subflows ───────────────────────
@task
def run_dbt_model(model: str) -> str:
    import subprocess
    result = subprocess.run(
        ["dbt", "run", "--select", model],
        capture_output=True, text=True, check=True
    )
    return result.stdout

@flow(name="daily-orders-pipeline")
def daily_orders_pipeline(date: str) -> dict:
    # Stage 1: ingest raw sources (subflows run sequentially here)
    orders_count   = ingest_source(
        uri=f"s3://raw/orders/dt={date}/",
        target_table="raw_orders"
    )
    payments_count = ingest_source(
        uri=f"s3://raw/payments/dt={date}/",
        target_table="raw_payments"
    )

    # Stage 2: transform with dbt
    dbt_output = run_dbt_model("orders_daily")

    return {
        "date": date,
        "orders_ingested": orders_count,
        "payments_ingested": payments_count,
        "dbt_status": "success" if dbt_output else "failed",
    }

# ── Concurrent subflow execution ──────────────────────────────────────
from prefect.futures import wait as prefect_wait

@flow(name="multi-domain-pipeline")
def multi_domain_pipeline(date: str) -> list[dict]:
    # Run domain pipelines concurrently using .submit()
    # Each subflow call is wrapped in a task via submit
    domains = ["orders", "payments", "users"]
    futures = [
        daily_orders_pipeline.with_options(name=f"{d}-pipeline").submit(date)
        for d in domains
    ]
    return [f.result() for f in futures]

Deployments: Scheduling and Remote Execution

A deployment is a server-side configuration that tells Prefect how to run a flow remotely: where the code lives, which work pool to use, what schedule to apply, and what parameters to pass. Deployments decouple the flow definition from its execution environment — you can deploy the same flow to run on Kubernetes, ECS, a local process, or a Docker container by changing only the work pool.

# ── prefect.yaml — declarative deployment definition ─────────────────
# Run: prefect deploy --all  (deploys all entries in this file)

name: datasops-pipelines
prefect-version: "3.*"

build:
  - prefect_docker.deployments.steps.build_docker_image:
      id: build-image
      requires: prefect-docker>=0.4.0
      image_name: "{{ ${REGISTRY_URL} }}/datasops/pipelines"
      tag: "{{ ${GIT_SHA:-latest} }}"
      dockerfile: Dockerfile

push:
  - prefect_docker.deployments.steps.push_docker_image:
      requires: prefect-docker>=0.4.0
      image_name: "{{ build-image.image_name }}"
      tag: "{{ build-image.tag }}"

pull:
  - prefect.deployments.steps.set_working_directory:
      directory: /app

deployments:
  - name: daily-orders-pipeline
    version: "1.0"
    tags: ["orders", "production"]
    description: "Ingest orders and run dbt transformation"

    flow_name: daily-orders-pipeline
    entrypoint: pipelines/orders.py:daily_orders_pipeline

    work_pool:
      name: kubernetes-pool
      job_variables:
        image: "{{ build-image.image }}"
        cpu_request: "500m"
        memory_request: "512Mi"

    schedules:
      - cron: "0 6 * * *"        # 06:00 UTC daily
        timezone: "UTC"
        active: true

    parameters:
      date: null   # injected at runtime via trigger or override

  - name: etl-pipeline-dev
    flow_name: etl-pipeline
    entrypoint: pipelines/etl.py:etl_pipeline
    work_pool:
      name: local-process-pool   # runs directly on the worker host
    schedules:
      - interval: 3600           # every hour
# ── Deployment via Python API (alternative to prefect.yaml) ──────────
from prefect import flow
from prefect.client.schemas.schedules import CronSchedule
from prefect.runner.storage import GitRepository

# Deploy from a git repository — worker clones on each run
@flow
def ml_training_pipeline(experiment_name: str, epochs: int = 10) -> str:
    print(f"Training {experiment_name} for {epochs} epochs")
    return f"model-{experiment_name}-v1"

if __name__ == "__main__":
    ml_training_pipeline.from_source(
        source=GitRepository(
            url="https://github.com/myorg/ml-pipelines",
            branch="main",
        ),
        entrypoint="pipelines/training.py:ml_training_pipeline",
    ).deploy(
        name="nightly-training",
        work_pool_name="kubernetes-gpu-pool",
        schedules=[CronSchedule(cron="0 2 * * *", timezone="UTC")],
        parameters={"experiment_name": "bert-finetune", "epochs": 5},
        tags=["ml", "nightly"],
    )

# Trigger a run immediately with parameter override:
# prefect deployment run 'ml-training-pipeline/nightly-training' #   --param experiment_name=gpt-finetune --param epochs=3

Work Pools and Workers

A work pool is a server-side queue that holds flow run submissions. A worker is a process running in your infrastructure that polls the work pool and launches flow runs. This hybrid model means the Prefect server (or Prefect Cloud) orchestrates and schedules while your workers execute — your data never leaves your network.

# ── Create and start a work pool ─────────────────────────────────────
# Process pool: runs flows as subprocesses on the worker host
prefect work-pool create local-process-pool --type process

# Docker pool: runs each flow run in a fresh Docker container
prefect work-pool create docker-pool --type docker

# Kubernetes pool: runs each flow run as a Kubernetes Job
prefect work-pool create kubernetes-pool --type kubernetes

# ── Start a worker (runs on your infrastructure) ──────────────────────
# Install the appropriate extras for the work pool type
pip install prefect-kubernetes  # for kubernetes pool
pip install prefect-docker      # for docker pool

prefect worker start --pool kubernetes-pool --limit 10
# --limit 10: max concurrent flow runs this worker will launch
# ── Kubernetes work pool configuration (via Prefect UI or API) ───────
# Base job template for the kubernetes pool — set via UI or:
# prefect work-pool update kubernetes-pool --base-job-template template.json

# template.json excerpt:
{
  "job_configuration": {
    "namespace": "prefect",
    "image": "{{ image }}",
    "image_pull_policy": "Always",
    "service_account_name": "prefect-worker",
    "pod_watch_timeout_seconds": 300,
    "stream_output": true,
    "resources": {
      "requests": {
        "cpu": "{{ cpu_request | default('500m') }}",
        "memory": "{{ memory_request | default('512Mi') }}"
      },
      "limits": {
        "cpu": "{{ cpu_limit | default('2000m') }}",
        "memory": "{{ memory_limit | default('2Gi') }}"
      }
    },
    "env": [
      {"name": "PREFECT_API_URL", "value": "https://api.prefect.cloud/api/accounts/.../workspaces/..."},
      {"name": "PREFECT_API_KEY", "valueFrom": {"secretKeyRef": {"name": "prefect-api-key", "key": "key"}}}
    ],
    "tolerations": [
      {"key": "workload-type", "operator": "Equal", "value": "data-pipeline", "effect": "NoSchedule"}
    ]
  }
}

Note

Run the Prefect worker as a Kubernetes Deployment (not a Job) so it restarts automatically on failure. Use --limit to cap concurrent flow runs and prevent resource exhaustion. For Kubernetes, the worker itself is lightweight — it only polls the API and creates Job resources; the actual compute happens in the spawned Job pods.

Result Caching and Persistence

Task results can be persisted and cached. On re-run, if the cache key matches a previous successful result, Prefect returns the stored result without re-executing the task. This is especially valuable for expensive extraction steps: when a downstream transformation fails and you re-run the pipeline, you pay the extraction cost only once.

Good result caching is one of the features that distinguishes Prefect from earlier orchestrators. In Airflow, re-running a failed task always re-executes from the start of that task. Prefect's caching also integrates naturally with data pipeline testing patterns — you can cache extraction in integration test runs to avoid repeated API calls.

from prefect import flow, task
from prefect.cache_policies import INPUTS, NO_CACHE
from prefect.results import ResultStore
from prefect.filesystems import S3
from datetime import timedelta
import hashlib

# ── Basic caching: cache based on input arguments ─────────────────────
@task(cache_policy=INPUTS, cache_expiration=timedelta(hours=6))
def extract_from_api(endpoint: str, date: str) -> list[dict]:
    # Result stored locally by default; re-used on identical (endpoint, date)
    import httpx
    return httpx.get(f"{endpoint}?date={date}").json()["records"]

# ── Custom cache key: include external state in the key ───────────────
def source_version_cache_key(context, parameters):
    # Include the source file's ETag in the cache key
    import boto3
    s3 = boto3.client("s3")
    head = s3.head_object(Bucket="raw", Key=parameters["s3_key"])
    etag = head["ETag"].strip('"')
    return hashlib.sha256(f"{parameters['s3_key']}-{etag}".encode()).hexdigest()

@task(cache_key_fn=source_version_cache_key, cache_expiration=timedelta(days=1))
def read_s3_file(s3_key: str) -> list[dict]:
    import boto3, json
    s3 = boto3.client("s3")
    obj = s3.get_object(Bucket="raw", Key=s3_key)
    return json.loads(obj["Body"].read())

# ── Persist results to S3 ─────────────────────────────────────────────
# Configure the result storage backend in prefect.yaml or via env:
# PREFECT_DEFAULT_RESULT_STORAGE_BLOCK=s3/my-results-block

# Or configure per-task:
@task(
    result_storage=S3.load("my-results-block"),
    result_serializer="json",
    persist_result=True,
    cache_policy=INPUTS,
    cache_expiration=timedelta(hours=12),
)
def expensive_transformation(records: list[dict]) -> list[dict]:
    # Result serialized to S3; subsequent runs with same input skip this
    return [{"processed": True, **r} for r in records]

# ── Disable caching for non-deterministic tasks ───────────────────────
@task(cache_policy=NO_CACHE)
def send_notification(message: str) -> None:
    # Never cache — side effects must always execute
    import httpx
    httpx.post("https://hooks.slack.com/...", json={"text": message})

Artifacts, Events, and Observability

Prefect 3 introduces first-class artifacts— structured outputs that appear in the Prefect UI alongside the flow run. Artifacts can be tables, Markdown reports, links to external systems, or plain text. Unlike log messages, artifacts are queryable and persist beyond the run's log retention window.

from prefect import flow, task
from prefect.artifacts import (
    create_table_artifact,
    create_markdown_artifact,
    create_link_artifact,
)

@task
def process_orders(orders: list[dict]) -> dict:
    stats = {
        "total": len(orders),
        "high_value": sum(1 for o in orders if o.get("amount", 0) > 1000),
        "total_revenue": sum(o.get("amount", 0) for o in orders),
    }

    # Table artifact — visible in Prefect UI as a rendered table
    create_table_artifact(
        key="order-stats",
        table=[
            {"metric": "Total Orders", "value": stats["total"]},
            {"metric": "High-Value Orders (>$1k)", "value": stats["high_value"]},
            {"metric": "Total Revenue", "value": f"${stats['total_revenue']:,.2f}"},
        ],
        description="Order processing summary",
    )

    return stats

@task
def generate_report(stats: dict, date: str) -> str:
    report_url = f"https://grafana.internal/d/orders?date={date}"

    create_markdown_artifact(
        key="pipeline-report",
        markdown=f"""# Orders Pipeline — {date}

| Metric | Value |
|--------|-------|
| Total processed | {stats['total']:,} |
| High-value | {stats['high_value']:,} |
| Revenue | ${stats['total_revenue']:,.2f} |

Pipeline completed successfully.
""",
    )

    create_link_artifact(
        key="grafana-dashboard",
        link=report_url,
        link_text="View Grafana Dashboard",
        description="Orders dashboard for this run's date",
    )

    return report_url

# ── Emit custom events for automation ────────────────────────────────
from prefect.events import emit_event

@task
def validate_data_quality(records: list[dict]) -> bool:
    null_count = sum(1 for r in records if r.get("id") is None)
    quality_ok = null_count / len(records) < 0.01  # <1% nulls

    emit_event(
        event="data.quality.check",
        resource={"prefect.resource.id": "orders-pipeline", "prefect.resource.name": "Orders"},
        payload={
            "null_rate": null_count / len(records),
            "threshold": 0.01,
            "passed": quality_ok,
        },
    )

    return quality_ok
# ── Notifications via automations ────────────────────────────────────
# Configure in Prefect UI: Automations → Create Automation
# Or via API:

from prefect.client.orchestration import get_client
import asyncio

async def create_failure_notification():
    async with get_client() as client:
        await client.create_automation({
            "name": "Notify on pipeline failure",
            "enabled": True,
            "trigger": {
                "type": "event",
                "match": {"prefect.resource.name": "daily-orders-pipeline"},
                "expect": ["prefect.flow-run.Failed", "prefect.flow-run.Crashed"],
                "within": 0,
            },
            "actions": [{
                "type": "send-notification",
                "block_document_id": "<slack-webhook-block-id>",
                "subject": "Pipeline Failed: {{ event.resource.name }}",
                "body": "Run {{ event.related[0].prefect.resource.id }} failed at {{ event.occurred }}",
            }],
        })

asyncio.run(create_failure_notification())

Production Patterns: Secrets, Concurrency, and Error Handling

Production Prefect deployments need secret management, concurrency control to avoid resource contention, and structured error handling that distinguishes transient failures from permanent ones.

from prefect import flow, task
from prefect.blocks.system import Secret
from prefect.concurrency.sync import concurrency
from prefect.utilities.annotations import quote

# ── Secrets from Prefect Blocks (never hardcode credentials) ──────────
@task
def connect_to_database() -> object:
    # Secret block stored in Prefect — create via UI or CLI:
    # prefect block create secret --name db-password
    db_password = Secret.load("db-password").get()
    import psycopg2
    return psycopg2.connect(
        host="postgres.internal",
        database="warehouse",
        user="pipeline_user",
        password=db_password,
    )

# ── Global concurrency limits — prevent overloading external APIs ─────
@task
def call_external_api(record_id: str) -> dict:
    # "external-api-calls" is a concurrency limit defined in Prefect UI
    # (Settings → Concurrency Limits → Create)
    # This blocks until a slot is available
    with concurrency("external-api-calls", occupy=1):
        import httpx, time
        response = httpx.get(f"https://api.example.com/records/{record_id}")
        time.sleep(0.1)   # rate limit: max 10 req/s with limit=1 + sleep
        return response.json()

# ── Structured error handling with custom states ───────────────────────
from prefect.states import Completed, Failed
from prefect import task

@task
def idempotent_upsert(record: dict, table: str) -> str:
    try:
        # Attempt DB upsert
        _db_upsert(record, table)
        return f"upserted:{record['id']}"
    except DuplicateKeyError:
        # Not a real failure — record already exists, pipeline continues
        return Completed(message=f"Already exists: {record['id']}", data=None)
    except ConnectionError as e:
        # Transient — will be retried
        raise
    except Exception as e:
        # Permanent failure — mark failed but don't raise (flow continues)
        return Failed(message=f"Permanent failure for {record['id']}: {e}")

# ── Flow-level timeout and safe teardown ──────────────────────────────
@flow(
    name="bounded-etl",
    timeout_seconds=3600,      # hard stop at 1 hour
    on_completion=[notify_completion],
    on_failure=[notify_failure, cleanup_temp_files],
    on_crashed=[notify_ops_team],
)
def bounded_etl_pipeline(date: str) -> dict:
    ...

Testing Prefect Flows

Prefect flows and tasks are plain Python functions — you can call them directly in tests without a running server. Use prefect.testing.utilities.prefect_test_harness as a context manager to enable full state tracking in tests without a real API endpoint.

import pytest
from prefect.testing.utilities import prefect_test_harness
from prefect.states import Completed, Failed
from unittest.mock import patch, MagicMock

# ── Use the test harness for full Prefect state tracking ──────────────
@pytest.fixture(autouse=True)
def prefect_harness():
    with prefect_test_harness():
        yield

# ── Unit test: call task directly as a Python function ────────────────
from pipelines.orders import extract, transform, load

def test_transform_doubles_values():
    # Tasks are plain functions — call without .submit() or await
    raw = [{"id": 1, "value": 50}, {"id": 2, "value": 100}]
    result = transform(raw)
    assert result == [{"id": 1, "value": 100}, {"id": 2, "value": 200}]

# ── Integration test: run full flow with mocked I/O ───────────────────
from pipelines.orders import etl_pipeline

def test_etl_pipeline_succeeds():
    with patch("pipelines.orders.extract") as mock_extract,          patch("pipelines.orders.load") as mock_load:

        mock_extract.return_value = [{"id": 1, "value": 10}]
        mock_load.return_value = 1

        state = etl_pipeline(
            source="s3://test/",
            destination="test_table",
            return_state=True,
        )

        assert state.is_completed()
        assert state.result() == 1  # load returned 1 record

# ── Test retry behavior ───────────────────────────────────────────────
from pipelines.orders import fetch_api_data

def test_retry_on_http_error():
    call_count = 0

    def flaky_get(*args, **kwargs):
        nonlocal call_count
        call_count += 1
        if call_count < 3:
            raise httpx.TimeoutException("timeout", request=None)
        return MagicMock(status_code=200, json=lambda: {"records": []})

    with patch("httpx.get", side_effect=flaky_get):
        result = fetch_api_data("https://api.example.com/data", "2026-06-15")

    assert call_count == 3   # retried twice before succeeding
    assert result == []

# ── Test dynamic mapping result count ────────────────────────────────
from pipelines.orders import partitioned_pipeline

def test_partition_pipeline_covers_all_partitions():
    with patch("pipelines.orders.download_partition") as mock_dl,          patch("pipelines.orders.validate_partition") as mock_val:

        mock_dl.return_value = [{"partition": 0, "value": 1}]
        mock_val.return_value = 1

        result = partitioned_pipeline([0, 1, 2, 3, 4])

        assert result["partitions"] == 5
        assert result["total"] == 5

Prefect 3 Production Checklist

1

Pin your Prefect version in requirements.txt and your Docker image — minor Prefect releases occasionally change deployment schema in breaking ways; lock to a tested version and upgrade intentionally

2

Store all credentials as Prefect Secret blocks — never pass API keys or passwords as flow parameters, which are logged and visible in the UI to all workspace members

3

Configure global concurrency limits for shared external resources (APIs, databases) via Prefect UI Settings → Concurrency Limits — unthrottled .map() over thousands of records will exhaust connection pools

4

Set timeout_seconds on every production flow — flows without a timeout run indefinitely on infrastructure failures, tying up worker slots and blocking dependent pipelines

5

Use on_failure and on_crashed flow hooks for notifications — a Failed state means the flow raised an exception; a Crashed state means the flow run process died (OOM kill, node eviction) without cleanup

6

Enable result persistence for expensive extraction tasks with cache_policy=INPUTS — re-runs after downstream failures should not re-pay extraction costs that may involve external API rate limits

7

Deploy workers as Kubernetes Deployments with readinessProbe set to poll the Prefect API — this ensures the worker pod is replaced before it begins polling with a broken API connection

8

Tag deployments with environment (production, staging) and domain (orders, ml, ingestion) — use these tags to filter runs and target automations without coupling deployment names to environments

9

Write flow-level integration tests with prefect_test_harness and mock I/O boundaries — verify that state is Completed, not just that the return value is correct

10

Monitor work pool queue depth and worker utilization via Prefect Cloud metrics or the self-hosted /api/work_pools endpoint — sustained queue depth means workers are undersized for the scheduled load

Running Apache Airflow 2.x in production and planning the upgrade to 3.0, or struggling with DAG compatibility issues, provider package migrations, or the new Task SDK architecture?

We design and execute Apache Airflow 3.0 migration projects — from running airflow upgrade-check across your DAG repository and fixing execution_date and SubDAG patterns, through migrating operator imports to provider packages and renaming Dataset to Asset with alias-based decoupling of producer and consumer DAGs, configuring the new dag-processor and api-server processes in Kubernetes Helm charts, setting up Edge Executor workers with the lightweight Task SDK, updating REST API v1 integrations to v2, and verifying backfill reproducibility with DAG versioning. Let’s talk.

Let's Talk

Related Articles

DataSOps Consulting

Need help implementing this in production?

We build and operate data pipelines, AI systems, and observability stacks for engineering teams. Reach out for a free 30-minute architecture review.