Back to Blog
TemporalWorkflow OrchestrationDurable ExecutionPythonGoMicroservicesFault ToleranceDistributed SystemsActivitiesEvent-Driven

Temporal for Durable Workflow Orchestration — Activities, Signals, and Fault Tolerance

A practical guide to Temporal for durable workflow orchestration: the event-sourcing-based execution model where workflows replay from an append-only event history making worker crashes, server restarts, and network partitions transparent to application code, the three-tier server architecture with frontend, history, matching, and worker services backed by Cassandra or PostgreSQL, Workers polling Task Queues for workflow and activity tasks, and the Client SDK for starting, signaling, and querying workflows, Python SDK installation with pip install temporalio and a local development server via temporal server start-dev or Docker Compose with the auto-setup image, workflow class definition with @workflow.defn and @workflow.run decorators enforcing determinism constraints — no random, no system clocks, no direct I/O — that Temporal’s replay engine depends on, @activity.defn functions for all non-deterministic operations including HTTP calls, database queries, and file I/O that run outside the sandbox with their own retry lifecycle, Worker registration connecting client workflows and activities to a named task queue with asyncio event loop support, ActivityOptions configuration with start_to_close_timeout for absolute activity execution deadlines, heartbeat_timeout for detecting stalled long-running processes, and schedule_to_close_timeout that caps the total time including all retry attempts, RetryPolicy with initial_interval, backoff_coefficient, maximum_interval, maximum_attempts, and non_retryable_error_types for business errors that should not retry, activity heartbeating with activity.heartbeat(progress_dict) allowing activities processing millions of records to report progress and resume from the last checkpoint after worker restart rather than restarting from the beginning, @workflow.signal handlers for sending named typed events into running workflows enabling human-in-the-loop approval patterns where the workflow durably waits on asyncio.Event without consuming compute, @workflow.query handlers for reading current workflow state from external observers without modifying execution history, child workflows via workflow.execute_child_workflow with independent retry policies and parent-child cancellation scope, workflow.sleep for durable timers persisted in event history that survive worker restarts without any external cron or Redis dependency, the Temporal Schedules API with ScheduleSpec cron_expressions, fixed intervals, and jitter for managed recurring workflow execution supporting backfill and manual trigger operations, workflow.patched for safe zero-downtime code changes to in-flight workflow instances by branching old and new code paths on a numeric patch ID without requiring workflow completion, Docker Compose local development setup with PostgreSQL backend, Kubernetes Deployment manifests for worker pools with separate task queues for order, notification, and analytics workflows, tctl namespace create for environment isolation, and a 10-point production checklist covering determinism rules, heartbeat cadence matching activity duration, non_retryable_error_types taxonomy, namespace-per-environment isolation, worker versioning, workflow ID uniqueness guarantees, Prometheus metrics scraping, Temporal Cloud migration criteria, signal handler idempotency, and schedule overlap policy configuration.

2026-06-26

What is Temporal and Why It Exists

Temporal is a durable execution platform that solves a fundamental distributed systems problem: how do you write multi-step business processes that are guaranteed to complete even when the underlying infrastructure fails? The standard answer — combine a message queue, a cron job, database state flags, and retry logic spread across multiple services — produces code where the failure-handling logic dwarfs the business logic. Temporal inverts this by making durability a property of the runtime rather than something you bolt on. Prefect and Airflow solve the orchestration problem for data pipelines with a DAG model — Temporal targets general-purpose business process workflows where workflows may wait for human approval, interact with external APIs, and run for days or months.

The core mechanism is event sourcing applied to workflow execution. Every state transition in a workflow — activity scheduled, activity completed, signal received, timer fired — is appended as an immutable event to a history log stored in Temporal's database (Cassandra or PostgreSQL). If a worker crashes mid-execution, the workflow resumes on any available worker by replaying the history log to reconstruct in-memory state exactly. From the workflow code's perspective, the crash never happened.

Durable Execution

Workflows survive worker crashes, server restarts, and network partitions automatically. No manual checkpoint logic, no state recovery code — the runtime guarantees exactly-once activity execution and durable timer delivery.

Replay-Based Model

Workflow history is an append-only event log. Replaying events reconstructs workflow state deterministically on any worker. This means workflow code must be deterministic — all non-deterministic operations move to activities.

Multi-Language SDKs

Official SDKs for Python, Go, Java, TypeScript, .NET, and PHP. Workers in different languages can share the same Temporal cluster — a Python worker handles one task queue while a Go worker handles another.

Architecture — Server, Workers, and Task Queues

Temporal's architecture separates the durable state management from the business logic execution. The Temporal Server stores and schedules work; your Worker processes contain the actual workflow and activity code. Communication happens through Task Queues — named logical channels that Workers poll for work.

1

Temporal Server

A cluster of stateless services — frontend (gRPC API), history (workflow state machine), matching (task queue dispatch), and internal worker (archival, visibility). Backed by a persistence layer: PostgreSQL for most deployments, Cassandra for very large scale. The server never runs workflow code directly.

2

Task Queues

Named FIFO queues for routing work to workers. Multiple workers can listen on the same task queue for horizontal scaling. Separate task queues isolate different workload types — an orders task queue for high-priority payment workflows and an analytics task queue for background reporting.

3

Workers

Processes you deploy that poll a task queue, execute workflow and activity code, and report results back to the server. Workers have no persistent state — they are stateless compute that receive tasks, run code, and return results. Scale workers by adding pods without coordination.

4

Workflows

Deterministic, long-running functions that orchestrate activities and timers. Workflow code runs inside the Temporal worker's sandbox where time, random, and I/O calls are intercepted. Each workflow instance has a unique Workflow ID and produces a queryable execution history.

5

Activities

Non-deterministic operations: HTTP calls, database queries, file I/O, external API integrations. Activities run with their own retry lifecycle, timeouts, and heartbeat mechanism. Activity failures trigger retries defined in RetryPolicy without any workflow code involvement.

Installation and Local Development

The Temporal Python SDK requires Python 3.8+. The local development server bundles the Temporal Server, SQLite persistence, and the web UI in a single binary — no external database needed for development.

# Install the Python SDK
pip install temporalio

# Install the Temporal CLI (local dev server + tctl)
# macOS
brew install temporal

# Linux / WSL — download from releases
curl -sSf https://temporal.download/cli.sh | sh

# Start a local development server (SQLite, web UI on :8233)
temporal server start-dev

# Verify connectivity
temporal workflow list --namespace default
# Docker Compose for local dev with PostgreSQL backend
# docker-compose.yml
version: "3.5"
services:
  postgresql:
    image: postgres:16-alpine
    environment:
      POSTGRES_PASSWORD: temporal
      POSTGRES_USER: temporal
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  temporal:
    image: temporalio/auto-setup:1.25
    depends_on:
      - postgresql
    environment:
      - DB=postgresql
      - DB_PORT=5432
      - POSTGRES_USER=temporal
      - POSTGRES_PWD=temporal
      - POSTGRES_SEEDS=postgresql
    ports:
      - "7233:7233"

  temporal-ui:
    image: temporalio/ui:latest
    depends_on:
      - temporal
    environment:
      - TEMPORAL_ADDRESS=temporal:7233
      - TEMPORAL_CORS_ORIGINS=http://localhost:3000
    ports:
      - "8080:8080"

volumes:
  postgres_data:

Defining Your First Workflow

Temporal workflows are Python classes decorated with @workflow.defn. The main entry point is the @workflow.run async method. Activities are plain async functions decorated with @activity.defn. The separation is strict: workflow code must be deterministic, activity code handles all side effects.

# workflows/onboarding.py
import asyncio
from datetime import timedelta
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker


# Activities: non-deterministic operations go here
@activity.defn
async def create_user_account(user_id: str, email: str) -> dict:
    """Call user service API to provision account."""
    # HTTP calls, DB writes, external API integrations all live here
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://user-service/api/users",
            json={"id": user_id, "email": email},
            timeout=10.0,
        )
        response.raise_for_status()
        return response.json()


@activity.defn
async def send_welcome_email(user_id: str, email: str) -> str:
    """Send welcome email via email service."""
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://email-service/api/send",
            json={"to": email, "template": "welcome", "user_id": user_id},
            timeout=10.0,
        )
        response.raise_for_status()
        return response.json()["message_id"]


@activity.defn
async def provision_trial_resources(user_id: str) -> dict:
    """Provision trial workspace, storage quota, and default dashboards."""
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://provisioning-service/api/trial",
            json={"user_id": user_id, "plan": "trial"},
            timeout=30.0,
        )
        response.raise_for_status()
        return response.json()


# Workflow: orchestrates activities, enforces determinism
@workflow.defn
class OnboardingWorkflow:
    @workflow.run
    async def run(self, user_id: str, email: str) -> dict:
        # Step 1 — create account (retry up to 3 times automatically)
        account = await workflow.execute_activity(
            create_user_account,
            args=[user_id, email],
            start_to_close_timeout=timedelta(seconds=30),
        )

        # Step 2 — send welcome email in parallel with provisioning
        email_task = workflow.execute_activity(
            send_welcome_email,
            args=[user_id, email],
            start_to_close_timeout=timedelta(seconds=10),
        )
        provision_task = workflow.execute_activity(
            provision_trial_resources,
            user_id,
            start_to_close_timeout=timedelta(minutes=2),
        )
        message_id, resources = await asyncio.gather(email_task, provision_task)

        # Step 3 — wait 7 days, then check-in (durable timer — survives restarts)
        await workflow.sleep(timedelta(days=7))

        return {
            "user_id": user_id,
            "account": account,
            "message_id": message_id,
            "resources": resources,
            "status": "onboarded",
        }
# Run the worker and start a workflow
# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflows.onboarding import (
    OnboardingWorkflow,
    create_user_account,
    send_welcome_email,
    provision_trial_resources,
)


async def run_worker():
    client = await Client.connect("localhost:7233")
    async with Worker(
        client,
        task_queue="onboarding",
        workflows=[OnboardingWorkflow],
        activities=[
            create_user_account,
            send_welcome_email,
            provision_trial_resources,
        ],
    ):
        print("Worker started — polling task queue 'onboarding'")
        await asyncio.Future()  # run forever


# start_workflow.py
async def start_onboarding(user_id: str, email: str):
    client = await Client.connect("localhost:7233")
    handle = await client.start_workflow(
        OnboardingWorkflow.run,
        args=[user_id, email],
        id=f"onboarding-{user_id}",   # unique per user — prevents duplicate starts
        task_queue="onboarding",
    )
    print(f"Started workflow: {handle.id} (run: {handle.result_run_id})")

    # Wait for completion (optional — caller can detach and poll later)
    result = await handle.result()
    print(f"Completed: {result}")
    return result


if __name__ == "__main__":
    asyncio.run(run_worker())

Note

Workflow code must be deterministic. Do not call datetime.now(), random.random(), uuid.uuid4(), or make any I/O calls inside a workflow function — use workflow.now() for the current time and move all side effects to activities. Temporal's replay engine will re-execute workflow code on every worker restart — non-deterministic calls produce different values on replay, causing a NonDeterminismError.

Activities — Timeouts, Retry Policies, and Heartbeating

Activities are the primary mechanism for interacting with the outside world. Temporal gives you four independent timeout dimensions and a configurable retry policy per activity execution. Understanding the difference between each timeout is critical for production reliability.

from datetime import timedelta
from temporalio import workflow, activity
from temporalio.common import RetryPolicy


@activity.defn
async def charge_payment(order_id: str, amount_cents: int) -> dict:
    """Charge via Stripe API. Non-idempotent — must use idempotency key."""
    import stripe
    # Report progress for heartbeat_timeout monitoring
    activity.heartbeat({"step": "initiating_charge", "order_id": order_id})

    charge = await stripe.Charge.create_async(
        amount=amount_cents,
        currency="usd",
        idempotency_key=f"charge-{order_id}",  # safe to retry
        metadata={"order_id": order_id},
    )
    activity.heartbeat({"step": "charge_complete", "charge_id": charge.id})
    return {"charge_id": charge.id, "status": charge.status}


@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, order_id: str, amount_cents: int) -> dict:
        return await workflow.execute_activity(
            charge_payment,
            args=[order_id, amount_cents],
            # Maximum time for a single execution attempt (no retries)
            start_to_close_timeout=timedelta(seconds=30),
            # If worker crashes, new worker must receive heartbeat within this window
            heartbeat_timeout=timedelta(seconds=10),
            # Maximum total time including ALL retry attempts
            schedule_to_close_timeout=timedelta(minutes=5),
            retry_policy=RetryPolicy(
                # First retry after 1 second
                initial_interval=timedelta(seconds=1),
                # Double the interval each retry: 1s, 2s, 4s, 8s...
                backoff_coefficient=2.0,
                # Cap retry interval at 30 seconds
                maximum_interval=timedelta(seconds=30),
                # Maximum number of attempts (1 initial + 4 retries = 5 total)
                maximum_attempts=5,
                # These errors should NOT be retried — fail fast
                non_retryable_error_types=[
                    "stripe.error.CardError",    # card declined
                    "ValueError",                # validation error
                    "ApplicationError",          # business logic errors
                ],
            ),
        )
# Heartbeating for long-running activities
# Activities that process large datasets, call slow APIs, or run ML inference
# must heartbeat regularly to signal liveness to Temporal

from temporalio import activity
import asyncio


@activity.defn
async def process_large_dataset(
    bucket: str,
    key: str,
    batch_size: int = 10_000,
) -> dict:
    """
    Process a large Parquet file row by row.
    Heartbeat every batch so Temporal knows the activity is alive
    and can resume from the last checkpoint if the worker restarts.
    """
    # Check for details from a previous heartbeat on worker restart
    details = activity.info().heartbeat_details
    start_offset = details[0]["offset"] if details else 0

    import pyarrow.parquet as pq
    import s3fs

    fs = s3fs.S3FileSystem()
    table = pq.read_table(f"{bucket}/{key}", filesystem=fs)
    total_rows = len(table)
    processed = start_offset
    errors = 0

    while processed < total_rows:
        batch_end = min(processed + batch_size, total_rows)
        batch = table.slice(processed, batch_end - processed)

        # Process this batch
        for i in range(len(batch)):
            try:
                row = {col: batch[col][i].as_py() for col in batch.schema.names}
                await process_single_row(row)
            except Exception:
                errors += 1

        processed = batch_end

        # Heartbeat with progress checkpoint
        # If this worker crashes, a new worker picks up from 'processed'
        activity.heartbeat({
            "offset": processed,
            "total": total_rows,
            "errors": errors,
            "pct": round(processed / total_rows * 100, 1),
        })

        # Yield control to the event loop
        await asyncio.sleep(0)

    return {"processed": processed, "errors": errors, "total": total_rows}

Signals and Queries — External Workflow Communication

Signals allow external systems to send typed events into a running workflow — triggering state transitions, providing input, or unblocking a waiting workflow. Queries read current workflow state without modifying execution history. AI agent workflows that require human-in-the-loop approval benefit directly from Temporal signals — the workflow durably waits for an approval signal without polling or consuming compute while the approval request lives in the workflow event history.

from dataclasses import dataclass
from datetime import timedelta
import asyncio
from temporalio import workflow, activity
from temporalio.client import Client


@dataclass
class ApprovalDecision:
    approved: bool
    approver_id: str
    comment: str


@workflow.defn
class PurchaseOrderWorkflow:
    def __init__(self) -> None:
        self._status = "pending_approval"
        self._decision: ApprovalDecision | None = None

    @workflow.signal
    async def approve(self, decision: ApprovalDecision) -> None:
        """Signal handler — called when an approver sends a decision."""
        self._decision = decision
        self._status = "approved" if decision.approved else "rejected"

    @workflow.signal
    async def escalate(self, reason: str) -> None:
        """Escalate to senior approver — sets escalation flag."""
        self._status = f"escalated:{reason}"

    @workflow.query
    def get_status(self) -> str:
        """Query handler — returns current status without modifying history."""
        return self._status

    @workflow.query
    def get_decision(self) -> dict | None:
        if self._decision is None:
            return None
        return {
            "approved": self._decision.approved,
            "approver_id": self._decision.approver_id,
            "comment": self._decision.comment,
        }

    @workflow.run
    async def run(self, po_id: str, amount: float, requester_id: str) -> dict:
        # Notify approver via activity
        await workflow.execute_activity(
            notify_approver,
            args=[po_id, amount, requester_id],
            start_to_close_timeout=timedelta(seconds=30),
        )

        # Durably wait for approval signal — up to 48 hours
        # Survives worker restarts with no polling overhead
        try:
            await workflow.wait_condition(
                lambda: self._decision is not None,
                timeout=timedelta(hours=48),
            )
        except asyncio.TimeoutError:
            return {"po_id": po_id, "status": "expired", "reason": "approval_timeout"}

        assert self._decision is not None
        if not self._decision.approved:
            return {
                "po_id": po_id,
                "status": "rejected",
                "approver_id": self._decision.approver_id,
                "comment": self._decision.comment,
            }

        # Proceed with approved order
        result = await workflow.execute_activity(
            submit_purchase_order,
            args=[po_id, amount],
            start_to_close_timeout=timedelta(minutes=2),
        )
        return {"po_id": po_id, "status": "submitted", **result}
# Sending signals and running queries from the client

async def approve_purchase_order(
    po_id: str,
    approver_id: str,
    approved: bool,
    comment: str,
):
    client = await Client.connect("localhost:7233")

    handle = client.get_workflow_handle(
        workflow_id=f"purchase-order-{po_id}",
    )

    # Send an approval signal to the running workflow
    await handle.signal(
        PurchaseOrderWorkflow.approve,
        ApprovalDecision(
            approved=approved,
            approver_id=approver_id,
            comment=comment,
        ),
    )
    print(f"Signal sent for PO {po_id}: {'approved' if approved else 'rejected'}")


async def get_po_status(po_id: str) -> str:
    client = await Client.connect("localhost:7233")
    handle = client.get_workflow_handle(
        workflow_id=f"purchase-order-{po_id}",
    )
    # Query does not appear in workflow history — read-only
    status = await handle.query(PurchaseOrderWorkflow.get_status)
    return status

Child Workflows and Cancellation

Child workflows allow decomposing complex processes into independently managed sub-workflows. Each child workflow has its own execution history, retry logic, and worker assignment. Parent workflows can cancel child workflows when they are cancelled, or detach children to run independently.

from temporalio import workflow
from temporalio.workflow import ChildWorkflowHandle
from datetime import timedelta
from dataclasses import dataclass


@dataclass
class OrderLine:
    product_id: str
    quantity: int
    warehouse_id: str


@workflow.defn
class FulfillmentWorkflow:
    @workflow.run
    async def run(self, order_line: OrderLine) -> dict:
        # Reserve inventory
        reserved = await workflow.execute_activity(
            reserve_inventory,
            args=[order_line.product_id, order_line.quantity, order_line.warehouse_id],
            start_to_close_timeout=timedelta(seconds=30),
        )
        # Ship from warehouse
        shipped = await workflow.execute_activity(
            create_shipment,
            args=[order_line.warehouse_id, reserved["reservation_id"]],
            start_to_close_timeout=timedelta(minutes=2),
        )
        return {"order_line": order_line.product_id, **shipped}


@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, order_id: str, lines: list[OrderLine]) -> dict:
        # Start child workflows for each line in parallel
        # Each child has its own history, retry policy, and task queue
        child_handles: list[ChildWorkflowHandle] = []
        for line in lines:
            handle = await workflow.start_child_workflow(
                FulfillmentWorkflow.run,
                line,
                id=f"fulfillment-{order_id}-{line.product_id}",
                task_queue="fulfillment",
                # Parent cancellation propagates to all children
                parent_close_policy=workflow.ParentClosePolicy.TERMINATE,
            )
            child_handles.append(handle)

        # Wait for all children to complete
        import asyncio
        results = await asyncio.gather(*[h.result() for h in child_handles])

        return {
            "order_id": order_id,
            "fulfillments": results,
            "status": "fulfilled",
        }


# Cancellation from the client
async def cancel_order(order_id: str):
    client = await Client.connect("localhost:7233")
    handle = client.get_workflow_handle(workflow_id=f"order-{order_id}")
    # Cancellation propagates to all child workflows
    await handle.cancel()
    print(f"Order {order_id} cancellation requested")

Durable Timers and Temporal Schedules

workflow.sleep() creates a durable timer stored in Temporal's event history. Unlike asyncio.sleep(), a workflow timer survives worker restarts — the workflow resumes exactly when the timer fires regardless of how many times the worker restarted. For recurring workflows, Temporal Schedules replace cron jobs with managed execution including backfill, pause, and manual trigger.

from datetime import timedelta
from temporalio import workflow


@workflow.defn
class TrialExpiryWorkflow:
    @workflow.run
    async def run(self, user_id: str, trial_days: int) -> dict:
        # Day 1: send welcome
        await workflow.execute_activity(
            send_onboarding_email,
            user_id,
            start_to_close_timeout=timedelta(seconds=30),
        )

        # Day 5: trial expiry warning (durable timer — no cron needed)
        await workflow.sleep(timedelta(days=trial_days - 2))
        await workflow.execute_activity(
            send_trial_expiry_warning,
            user_id,
            start_to_close_timeout=timedelta(seconds=30),
        )

        # Day 7: trial ends
        await workflow.sleep(timedelta(days=2))
        expired = await workflow.execute_activity(
            check_subscription_status,
            user_id,
            start_to_close_timeout=timedelta(seconds=10),
        )

        if not expired["has_subscription"]:
            await workflow.execute_activity(
                downgrade_to_free_tier,
                user_id,
                start_to_close_timeout=timedelta(seconds=30),
            )
            return {"user_id": user_id, "status": "downgraded"}

        return {"user_id": user_id, "status": "converted"}
# Temporal Schedules — managed recurring workflow execution
# Replaces cron jobs with full lifecycle management: backfill, pause, trigger

from temporalio.client import (
    Client,
    Schedule,
    ScheduleActionStartWorkflow,
    ScheduleIntervalSpec,
    ScheduleOverlapPolicy,
    ScheduleSpec,
    ScheduleState,
)
from datetime import timedelta


async def create_daily_report_schedule():
    client = await Client.connect("localhost:7233")

    await client.create_schedule(
        "daily-revenue-report",
        Schedule(
            action=ScheduleActionStartWorkflow(
                RevenueReportWorkflow.run,
                args=["daily"],
                id="daily-revenue-report-{scheduled_time}",
                task_queue="analytics",
            ),
            spec=ScheduleSpec(
                # Cron expression: 9am UTC Monday-Friday
                cron_expressions=["0 9 * * 1-5"],
                # Or use interval-based scheduling:
                # intervals=[ScheduleIntervalSpec(every=timedelta(hours=6))],
                # Jitter prevents thundering herd when many schedules fire at once
                jitter=timedelta(minutes=5),
            ),
            policy=Schedule.Policy(
                # If previous run is still active, skip this run
                overlap=ScheduleOverlapPolicy.SKIP,
                # If service was down, run all missed executions on restart
                # (use SKIP or ALLOW_ALL depending on your requirements)
            ),
            state=ScheduleState(
                note="Daily revenue report — runs Monday-Friday at 9am UTC",
            ),
        ),
    )


# Manage schedules
async def manage_schedule():
    client = await Client.connect("localhost:7233")
    handle = client.get_schedule_handle("daily-revenue-report")

    # Pause the schedule temporarily
    await handle.pause(note="Paused for Q2 close — resume manually")

    # Trigger an immediate run (outside normal schedule)
    await handle.trigger(overlap=ScheduleOverlapPolicy.ALLOW_ALL)

    # Backfill missed runs for a date range
    from temporalio.client import ScheduleBackfill
    import datetime
    await handle.backfill(
        ScheduleBackfill(
            start_at=datetime.datetime(2026, 6, 1, tzinfo=datetime.timezone.utc),
            end_at=datetime.datetime(2026, 6, 7, tzinfo=datetime.timezone.utc),
            overlap=ScheduleOverlapPolicy.ALLOW_ALL,
        )
    )

    # Resume
    await handle.unpause(note="Q2 close complete")

Workflow Versioning — Zero-Downtime Code Deploys

Because workflow code is replayed from event history, deploying new workflow code while instances are in-flight requires care. If you change the sequence of activities in a running workflow, replay will produce a different event sequence from the history — a NonDeterminismError. workflow.patched() creates a versioned branch point: old executions that predate the patch follow the old path, new executions follow the new path.

from temporalio import workflow
from datetime import timedelta


@workflow.defn
class OrderFulfillmentWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> dict:
        # Original code path — all pre-existing in-flight workflows use this
        # New code path uses workflow.patched() to branch safely

        # Step 1 — validate order (existing in all versions)
        validation = await workflow.execute_activity(
            validate_order,
            order_id,
            start_to_close_timeout=timedelta(seconds=30),
        )

        # Step 2 — NEW: fraud check added in v2 deploy
        # workflow.patched("fraud-check-v2") returns True for new executions,
        # False for executions that started before this patch was deployed
        if workflow.patched("fraud-check-v2"):
            fraud_result = await workflow.execute_activity(
                check_fraud_signals,
                order_id,
                start_to_close_timeout=timedelta(seconds=15),
            )
            if fraud_result["flagged"]:
                return {"order_id": order_id, "status": "blocked_fraud"}

        # Step 3 — charge payment (existing in all versions)
        payment = await workflow.execute_activity(
            process_payment,
            order_id,
            start_to_close_timeout=timedelta(seconds=30),
        )

        # Step 4 — NEW: loyalty points added in v3 deploy
        if workflow.patched("loyalty-points-v3"):
            await workflow.execute_activity(
                award_loyalty_points,
                args=[order_id, payment["amount_cents"]],
                start_to_close_timeout=timedelta(seconds=10),
            )

        return {"order_id": order_id, "status": "fulfilled", "payment": payment}


# After all in-flight workflows complete and no old-code workers remain,
# replace workflow.patched() branches with workflow.deprecate_patch()
# to clean up the versioning scaffolding:
#
# if workflow.deprecate_patch("fraud-check-v2"):  # always True, no branching
#     ...

Note

Deploy versioned workflow code before retiring old workers. Workers running old code must continue processing in-flight workflows until they complete. The safe deploy sequence is: (1) deploy new workers alongside old workers, (2) let old in-flight workflows drain, (3) shut down old workers. workflow.patched() ensures both old and new workers can handle the same workflow class during the overlap period.

Production Kubernetes Deployment

In production, Temporal Server and your Worker pools are separate deployments. Workers are stateless and horizontally scalable — add replicas to increase throughput. Separate task queues for different workflow types give independent scaling and fault isolation. Unlike Airflow or Dagster which co-locate scheduling logic with infrastructure, Temporal separates the durable state server from your Worker compute entirely — Workers can run anywhere that can reach the Temporal Server gRPC endpoint.

# Temporal Worker Deployment — orders domain
# k8s/temporal-worker-orders.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: temporal-worker-orders
  namespace: temporal
spec:
  replicas: 3
  selector:
    matchLabels:
      app: temporal-worker-orders
  template:
    metadata:
      labels:
        app: temporal-worker-orders
    spec:
      containers:
      - name: worker
        image: myapp/temporal-worker:1.25.0
        command: ["python", "-m", "workers.orders"]
        env:
        - name: TEMPORAL_HOST
          value: "temporal-frontend.temporal.svc.cluster.local:7233"
        - name: TEMPORAL_NAMESPACE
          value: "production"
        - name: TASK_QUEUE
          value: "orders"
        - name: MAX_CONCURRENT_ACTIVITIES
          value: "50"
        - name: MAX_CONCURRENT_WORKFLOWS
          value: "100"
        resources:
          requests:
            cpu: "500m"
            memory: "512Mi"
          limits:
            cpu: "2"
            memory: "2Gi"
        livenessProbe:
          exec:
            command: ["python", "-c", "import sys; sys.exit(0)"]
          initialDelaySeconds: 30
          periodSeconds: 30
---
# HPA for automatic scaling based on task queue depth
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: temporal-worker-orders-hpa
  namespace: temporal
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: temporal-worker-orders
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: External
    external:
      metric:
        name: temporal_task_queue_depth
        selector:
          matchLabels:
            task_queue: orders
      target:
        type: AverageValue
        averageValue: "10"
# Worker with configurable concurrency and graceful shutdown
# workers/orders.py
import asyncio
import signal
import os
from temporalio.client import Client, TLSConfig
from temporalio.worker import Worker
from workflows.order_workflow import OrderWorkflow
from activities.payment import charge_payment, refund_payment
from activities.fulfillment import reserve_inventory, create_shipment
from activities.notification import send_order_confirmation


async def run_worker():
    # TLS connection to Temporal Server (Temporal Cloud or self-hosted with TLS)
    tls_config = None
    if os.environ.get("TEMPORAL_TLS_CERT"):
        tls_config = TLSConfig(
            client_cert=open(os.environ["TEMPORAL_TLS_CERT"], "rb").read(),
            client_private_key=open(os.environ["TEMPORAL_TLS_KEY"], "rb").read(),
        )

    client = await Client.connect(
        os.environ.get("TEMPORAL_HOST", "localhost:7233"),
        namespace=os.environ.get("TEMPORAL_NAMESPACE", "default"),
        tls=tls_config,
    )

    worker = Worker(
        client,
        task_queue=os.environ.get("TASK_QUEUE", "orders"),
        workflows=[OrderWorkflow],
        activities=[
            charge_payment,
            refund_payment,
            reserve_inventory,
            create_shipment,
            send_order_confirmation,
        ],
        max_concurrent_activities=int(os.environ.get("MAX_CONCURRENT_ACTIVITIES", "50")),
        max_concurrent_workflow_tasks=int(os.environ.get("MAX_CONCURRENT_WORKFLOWS", "100")),
    )

    # Graceful shutdown on SIGTERM (Kubernetes sends this before SIGKILL)
    shutdown_event = asyncio.Event()

    def handle_sigterm(*_):
        print("SIGTERM received — initiating graceful shutdown")
        shutdown_event.set()

    signal.signal(signal.SIGTERM, handle_sigterm)
    signal.signal(signal.SIGINT, handle_sigterm)

    async with worker:
        print(f"Worker started on task queue '{worker.task_queue}'")
        await shutdown_event.wait()
        print("Draining in-flight activities before shutdown...")
        # Worker.__aexit__ waits for in-progress tasks to complete


if __name__ == "__main__":
    asyncio.run(run_worker())
# Create a production namespace with tctl
temporal operator namespace create   --namespace production   --retention 30d   --description "Production workflows"

# Create a staging namespace
temporal operator namespace create   --namespace staging   --retention 7d   --description "Staging workflows"

# List namespaces
temporal operator namespace list

# Describe a namespace
temporal operator namespace describe --namespace production

# View workflow history for a specific execution
temporal workflow show   --workflow-id "order-12345"   --namespace production

# Reset a workflow to a specific event (disaster recovery)
temporal workflow reset   --workflow-id "order-12345"   --event-id 15   --reason "activity returned incorrect result — reprocessing"

# Terminate stuck workflows in bulk (use with caution)
temporal workflow list   --query 'WorkflowType="OrderWorkflow" AND ExecutionStatus="Running"'   --namespace production

Production Checklist

1

Enforce workflow determinism in CI by running a determinism linter or writing unit tests that replay a captured event history and assert identical outcomes. At minimum, audit workflow code for datetime.now(), random, uuid4, asyncio.sleep() (use workflow.sleep instead), and any direct I/O. A NonDeterminismError in production causes workflow tasks to fail continuously until code is fixed and deployed.

2

Set heartbeat_timeout shorter than your heartbeat cadence on any activity that runs longer than 30 seconds. If an activity heartbeats every 10 seconds, set heartbeat_timeout to 15–20 seconds. Without heartbeat_timeout, a crashed worker holds the activity task invisibly until schedule_to_start_timeout expires — delaying recovery by minutes instead of seconds.

3

Build a non_retryable_error_types taxonomy from your business error catalog before deploying. Card declines, validation errors, duplicate order IDs, and permission denials should not be retried — they need human intervention or a different workflow path. Retrying them wastes activity slots and triggers false alerting. Use ApplicationError(cause, non_retryable=True) from activity code to mark errors dynamically.

4

Use one Temporal Namespace per environment (development, staging, production) — not one namespace with prefixed workflow IDs. Namespaces have independent visibility, retention policies, and access controls. Cross-namespace workflow queries or signals are not supported; this enforces clean environment isolation.

5

Set schedule_to_close_timeout on activities that interact with external services with SLAs. start_to_close_timeout caps a single attempt; schedule_to_close_timeout caps the total execution time including retries. For a payment activity with a 5-minute SLA and 5 retry attempts, set start_to_close_timeout=30s and schedule_to_close_timeout=5min to guarantee the order fails fast if the payment service is degraded.

6

Implement Worker Versioning (Build IDs) when deploying breaking workflow changes to ensure new workers only pick up new executions while old workers drain existing ones. Without Worker Versioning, old and new workers compete for tasks — if the old worker retires before in-flight workflows complete, a new worker picks up replays that fail with NonDeterminismError.

7

Use Workflow ID collision policies to prevent duplicate workflow starts in distributed systems. The default REJECT_DUPLICATE prevents starting a workflow with an ID that is already running. ALLOW_DUPLICATE_FAILED_ONLY allows restarting failed workflows. Always set a deterministic Workflow ID based on the entity (e.g., order-{order_id}) so retried client calls are idempotent.

8

Scrape Temporal's Prometheus metrics endpoint exposed by the Frontend and History services. Key metrics: temporal_request_latency (workflow task scheduling), temporal_activity_poll_succeed_total (worker throughput), temporal_workflow_task_schedule_to_start_latency (task queue depth indicator), and temporal_workflow_failed_total by WorkflowType for SLA alerting.

9

Configure signal handler idempotency for signals that may be delivered more than once. Temporal guarantees at-least-once signal delivery in some failure scenarios — a signal handler that increments a counter must deduplicate on signal ID or use set semantics. Use a dict keyed by signal ID to deduplicate approval signals.

10

Evaluate Temporal Cloud for teams with fewer than 3 platform engineers. Temporal Cloud eliminates server operation, persistence management, TLS certificate rotation, and Cassandra/PostgreSQL upgrades. Billing is per action (workflow started, activity scheduled, signal sent) — typically lower total cost than self-hosting once engineering time is included. Self-hosting makes sense for compliance requirements, very high volumes (>1B actions/month), or custom visibility backends.

Running distributed processes that fail halfway through multi-step operations with no way to resume from the failure point, using a combination of cron jobs, message queues, and database flags to track workflow state, or losing in-flight business processes when a service restarts?

We design and implement Temporal workflow systems — from workflow and activity design with determinism audits and heartbeat cadence planning, RetryPolicy taxonomy mapping business error types to non-retryable exceptions, signal-driven human-in-the-loop approval workflow patterns, child workflow decomposition for complex multi-domain processes, Temporal Schedules migration from cron-based recurring pipelines, workflow.patched versioning strategy for zero-downtime deploys to in-flight instances, Worker pool sizing and task queue topology for throughput isolation, Kubernetes Deployment configurations with separate worker pools per domain, tctl namespace setup for production and staging environment isolation, Prometheus metrics integration and Grafana dashboard configuration for workflow success rate, schedule-to-close latency, and activity retry rate alerting, and Temporal Cloud migration planning for teams that want managed infrastructure without operating the Temporal cluster. Let’s talk.

Let's Talk

Related Articles

DataSOps Consulting

Need help implementing this in production?

We build and operate data pipelines, AI systems, and observability stacks for engineering teams. Reach out for a free 30-minute architecture review.