What is Temporal and Why It Exists
Temporal is a durable execution platform that solves a fundamental distributed systems problem: how do you write multi-step business processes that are guaranteed to complete even when the underlying infrastructure fails? The standard answer — combine a message queue, a cron job, database state flags, and retry logic spread across multiple services — produces code where the failure-handling logic dwarfs the business logic. Temporal inverts this by making durability a property of the runtime rather than something you bolt on. Prefect and Airflow solve the orchestration problem for data pipelines with a DAG model — Temporal targets general-purpose business process workflows where workflows may wait for human approval, interact with external APIs, and run for days or months.
The core mechanism is event sourcing applied to workflow execution. Every state transition in a workflow — activity scheduled, activity completed, signal received, timer fired — is appended as an immutable event to a history log stored in Temporal's database (Cassandra or PostgreSQL). If a worker crashes mid-execution, the workflow resumes on any available worker by replaying the history log to reconstruct in-memory state exactly. From the workflow code's perspective, the crash never happened.
Durable Execution
Workflows survive worker crashes, server restarts, and network partitions automatically. No manual checkpoint logic, no state recovery code — the runtime guarantees exactly-once activity execution and durable timer delivery.
Replay-Based Model
Workflow history is an append-only event log. Replaying events reconstructs workflow state deterministically on any worker. This means workflow code must be deterministic — all non-deterministic operations move to activities.
Multi-Language SDKs
Official SDKs for Python, Go, Java, TypeScript, .NET, and PHP. Workers in different languages can share the same Temporal cluster — a Python worker handles one task queue while a Go worker handles another.
Architecture — Server, Workers, and Task Queues
Temporal's architecture separates the durable state management from the business logic execution. The Temporal Server stores and schedules work; your Worker processes contain the actual workflow and activity code. Communication happens through Task Queues — named logical channels that Workers poll for work.
Temporal Server
A cluster of stateless services — frontend (gRPC API), history (workflow state machine), matching (task queue dispatch), and internal worker (archival, visibility). Backed by a persistence layer: PostgreSQL for most deployments, Cassandra for very large scale. The server never runs workflow code directly.
Task Queues
Named FIFO queues for routing work to workers. Multiple workers can listen on the same task queue for horizontal scaling. Separate task queues isolate different workload types — an orders task queue for high-priority payment workflows and an analytics task queue for background reporting.
Workers
Processes you deploy that poll a task queue, execute workflow and activity code, and report results back to the server. Workers have no persistent state — they are stateless compute that receive tasks, run code, and return results. Scale workers by adding pods without coordination.
Workflows
Deterministic, long-running functions that orchestrate activities and timers. Workflow code runs inside the Temporal worker's sandbox where time, random, and I/O calls are intercepted. Each workflow instance has a unique Workflow ID and produces a queryable execution history.
Activities
Non-deterministic operations: HTTP calls, database queries, file I/O, external API integrations. Activities run with their own retry lifecycle, timeouts, and heartbeat mechanism. Activity failures trigger retries defined in RetryPolicy without any workflow code involvement.
Installation and Local Development
The Temporal Python SDK requires Python 3.8+. The local development server bundles the Temporal Server, SQLite persistence, and the web UI in a single binary — no external database needed for development.
# Install the Python SDK
pip install temporalio
# Install the Temporal CLI (local dev server + tctl)
# macOS
brew install temporal
# Linux / WSL — download from releases
curl -sSf https://temporal.download/cli.sh | sh
# Start a local development server (SQLite, web UI on :8233)
temporal server start-dev
# Verify connectivity
temporal workflow list --namespace default# Docker Compose for local dev with PostgreSQL backend
# docker-compose.yml
version: "3.5"
services:
postgresql:
image: postgres:16-alpine
environment:
POSTGRES_PASSWORD: temporal
POSTGRES_USER: temporal
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
temporal:
image: temporalio/auto-setup:1.25
depends_on:
- postgresql
environment:
- DB=postgresql
- DB_PORT=5432
- POSTGRES_USER=temporal
- POSTGRES_PWD=temporal
- POSTGRES_SEEDS=postgresql
ports:
- "7233:7233"
temporal-ui:
image: temporalio/ui:latest
depends_on:
- temporal
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CORS_ORIGINS=http://localhost:3000
ports:
- "8080:8080"
volumes:
postgres_data:Defining Your First Workflow
Temporal workflows are Python classes decorated with @workflow.defn. The main entry point is the @workflow.run async method. Activities are plain async functions decorated with @activity.defn. The separation is strict: workflow code must be deterministic, activity code handles all side effects.
# workflows/onboarding.py
import asyncio
from datetime import timedelta
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
# Activities: non-deterministic operations go here
@activity.defn
async def create_user_account(user_id: str, email: str) -> dict:
"""Call user service API to provision account."""
# HTTP calls, DB writes, external API integrations all live here
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(
"http://user-service/api/users",
json={"id": user_id, "email": email},
timeout=10.0,
)
response.raise_for_status()
return response.json()
@activity.defn
async def send_welcome_email(user_id: str, email: str) -> str:
"""Send welcome email via email service."""
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(
"http://email-service/api/send",
json={"to": email, "template": "welcome", "user_id": user_id},
timeout=10.0,
)
response.raise_for_status()
return response.json()["message_id"]
@activity.defn
async def provision_trial_resources(user_id: str) -> dict:
"""Provision trial workspace, storage quota, and default dashboards."""
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(
"http://provisioning-service/api/trial",
json={"user_id": user_id, "plan": "trial"},
timeout=30.0,
)
response.raise_for_status()
return response.json()
# Workflow: orchestrates activities, enforces determinism
@workflow.defn
class OnboardingWorkflow:
@workflow.run
async def run(self, user_id: str, email: str) -> dict:
# Step 1 — create account (retry up to 3 times automatically)
account = await workflow.execute_activity(
create_user_account,
args=[user_id, email],
start_to_close_timeout=timedelta(seconds=30),
)
# Step 2 — send welcome email in parallel with provisioning
email_task = workflow.execute_activity(
send_welcome_email,
args=[user_id, email],
start_to_close_timeout=timedelta(seconds=10),
)
provision_task = workflow.execute_activity(
provision_trial_resources,
user_id,
start_to_close_timeout=timedelta(minutes=2),
)
message_id, resources = await asyncio.gather(email_task, provision_task)
# Step 3 — wait 7 days, then check-in (durable timer — survives restarts)
await workflow.sleep(timedelta(days=7))
return {
"user_id": user_id,
"account": account,
"message_id": message_id,
"resources": resources,
"status": "onboarded",
}# Run the worker and start a workflow
# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflows.onboarding import (
OnboardingWorkflow,
create_user_account,
send_welcome_email,
provision_trial_resources,
)
async def run_worker():
client = await Client.connect("localhost:7233")
async with Worker(
client,
task_queue="onboarding",
workflows=[OnboardingWorkflow],
activities=[
create_user_account,
send_welcome_email,
provision_trial_resources,
],
):
print("Worker started — polling task queue 'onboarding'")
await asyncio.Future() # run forever
# start_workflow.py
async def start_onboarding(user_id: str, email: str):
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
OnboardingWorkflow.run,
args=[user_id, email],
id=f"onboarding-{user_id}", # unique per user — prevents duplicate starts
task_queue="onboarding",
)
print(f"Started workflow: {handle.id} (run: {handle.result_run_id})")
# Wait for completion (optional — caller can detach and poll later)
result = await handle.result()
print(f"Completed: {result}")
return result
if __name__ == "__main__":
asyncio.run(run_worker())Note
datetime.now(), random.random(), uuid.uuid4(), or make any I/O calls inside a workflow function — use workflow.now() for the current time and move all side effects to activities. Temporal's replay engine will re-execute workflow code on every worker restart — non-deterministic calls produce different values on replay, causing a NonDeterminismError.Activities — Timeouts, Retry Policies, and Heartbeating
Activities are the primary mechanism for interacting with the outside world. Temporal gives you four independent timeout dimensions and a configurable retry policy per activity execution. Understanding the difference between each timeout is critical for production reliability.
from datetime import timedelta
from temporalio import workflow, activity
from temporalio.common import RetryPolicy
@activity.defn
async def charge_payment(order_id: str, amount_cents: int) -> dict:
"""Charge via Stripe API. Non-idempotent — must use idempotency key."""
import stripe
# Report progress for heartbeat_timeout monitoring
activity.heartbeat({"step": "initiating_charge", "order_id": order_id})
charge = await stripe.Charge.create_async(
amount=amount_cents,
currency="usd",
idempotency_key=f"charge-{order_id}", # safe to retry
metadata={"order_id": order_id},
)
activity.heartbeat({"step": "charge_complete", "charge_id": charge.id})
return {"charge_id": charge.id, "status": charge.status}
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str, amount_cents: int) -> dict:
return await workflow.execute_activity(
charge_payment,
args=[order_id, amount_cents],
# Maximum time for a single execution attempt (no retries)
start_to_close_timeout=timedelta(seconds=30),
# If worker crashes, new worker must receive heartbeat within this window
heartbeat_timeout=timedelta(seconds=10),
# Maximum total time including ALL retry attempts
schedule_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(
# First retry after 1 second
initial_interval=timedelta(seconds=1),
# Double the interval each retry: 1s, 2s, 4s, 8s...
backoff_coefficient=2.0,
# Cap retry interval at 30 seconds
maximum_interval=timedelta(seconds=30),
# Maximum number of attempts (1 initial + 4 retries = 5 total)
maximum_attempts=5,
# These errors should NOT be retried — fail fast
non_retryable_error_types=[
"stripe.error.CardError", # card declined
"ValueError", # validation error
"ApplicationError", # business logic errors
],
),
)# Heartbeating for long-running activities
# Activities that process large datasets, call slow APIs, or run ML inference
# must heartbeat regularly to signal liveness to Temporal
from temporalio import activity
import asyncio
@activity.defn
async def process_large_dataset(
bucket: str,
key: str,
batch_size: int = 10_000,
) -> dict:
"""
Process a large Parquet file row by row.
Heartbeat every batch so Temporal knows the activity is alive
and can resume from the last checkpoint if the worker restarts.
"""
# Check for details from a previous heartbeat on worker restart
details = activity.info().heartbeat_details
start_offset = details[0]["offset"] if details else 0
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
table = pq.read_table(f"{bucket}/{key}", filesystem=fs)
total_rows = len(table)
processed = start_offset
errors = 0
while processed < total_rows:
batch_end = min(processed + batch_size, total_rows)
batch = table.slice(processed, batch_end - processed)
# Process this batch
for i in range(len(batch)):
try:
row = {col: batch[col][i].as_py() for col in batch.schema.names}
await process_single_row(row)
except Exception:
errors += 1
processed = batch_end
# Heartbeat with progress checkpoint
# If this worker crashes, a new worker picks up from 'processed'
activity.heartbeat({
"offset": processed,
"total": total_rows,
"errors": errors,
"pct": round(processed / total_rows * 100, 1),
})
# Yield control to the event loop
await asyncio.sleep(0)
return {"processed": processed, "errors": errors, "total": total_rows}Signals and Queries — External Workflow Communication
Signals allow external systems to send typed events into a running workflow — triggering state transitions, providing input, or unblocking a waiting workflow. Queries read current workflow state without modifying execution history. AI agent workflows that require human-in-the-loop approval benefit directly from Temporal signals — the workflow durably waits for an approval signal without polling or consuming compute while the approval request lives in the workflow event history.
from dataclasses import dataclass
from datetime import timedelta
import asyncio
from temporalio import workflow, activity
from temporalio.client import Client
@dataclass
class ApprovalDecision:
approved: bool
approver_id: str
comment: str
@workflow.defn
class PurchaseOrderWorkflow:
def __init__(self) -> None:
self._status = "pending_approval"
self._decision: ApprovalDecision | None = None
@workflow.signal
async def approve(self, decision: ApprovalDecision) -> None:
"""Signal handler — called when an approver sends a decision."""
self._decision = decision
self._status = "approved" if decision.approved else "rejected"
@workflow.signal
async def escalate(self, reason: str) -> None:
"""Escalate to senior approver — sets escalation flag."""
self._status = f"escalated:{reason}"
@workflow.query
def get_status(self) -> str:
"""Query handler — returns current status without modifying history."""
return self._status
@workflow.query
def get_decision(self) -> dict | None:
if self._decision is None:
return None
return {
"approved": self._decision.approved,
"approver_id": self._decision.approver_id,
"comment": self._decision.comment,
}
@workflow.run
async def run(self, po_id: str, amount: float, requester_id: str) -> dict:
# Notify approver via activity
await workflow.execute_activity(
notify_approver,
args=[po_id, amount, requester_id],
start_to_close_timeout=timedelta(seconds=30),
)
# Durably wait for approval signal — up to 48 hours
# Survives worker restarts with no polling overhead
try:
await workflow.wait_condition(
lambda: self._decision is not None,
timeout=timedelta(hours=48),
)
except asyncio.TimeoutError:
return {"po_id": po_id, "status": "expired", "reason": "approval_timeout"}
assert self._decision is not None
if not self._decision.approved:
return {
"po_id": po_id,
"status": "rejected",
"approver_id": self._decision.approver_id,
"comment": self._decision.comment,
}
# Proceed with approved order
result = await workflow.execute_activity(
submit_purchase_order,
args=[po_id, amount],
start_to_close_timeout=timedelta(minutes=2),
)
return {"po_id": po_id, "status": "submitted", **result}# Sending signals and running queries from the client
async def approve_purchase_order(
po_id: str,
approver_id: str,
approved: bool,
comment: str,
):
client = await Client.connect("localhost:7233")
handle = client.get_workflow_handle(
workflow_id=f"purchase-order-{po_id}",
)
# Send an approval signal to the running workflow
await handle.signal(
PurchaseOrderWorkflow.approve,
ApprovalDecision(
approved=approved,
approver_id=approver_id,
comment=comment,
),
)
print(f"Signal sent for PO {po_id}: {'approved' if approved else 'rejected'}")
async def get_po_status(po_id: str) -> str:
client = await Client.connect("localhost:7233")
handle = client.get_workflow_handle(
workflow_id=f"purchase-order-{po_id}",
)
# Query does not appear in workflow history — read-only
status = await handle.query(PurchaseOrderWorkflow.get_status)
return statusChild Workflows and Cancellation
Child workflows allow decomposing complex processes into independently managed sub-workflows. Each child workflow has its own execution history, retry logic, and worker assignment. Parent workflows can cancel child workflows when they are cancelled, or detach children to run independently.
from temporalio import workflow
from temporalio.workflow import ChildWorkflowHandle
from datetime import timedelta
from dataclasses import dataclass
@dataclass
class OrderLine:
product_id: str
quantity: int
warehouse_id: str
@workflow.defn
class FulfillmentWorkflow:
@workflow.run
async def run(self, order_line: OrderLine) -> dict:
# Reserve inventory
reserved = await workflow.execute_activity(
reserve_inventory,
args=[order_line.product_id, order_line.quantity, order_line.warehouse_id],
start_to_close_timeout=timedelta(seconds=30),
)
# Ship from warehouse
shipped = await workflow.execute_activity(
create_shipment,
args=[order_line.warehouse_id, reserved["reservation_id"]],
start_to_close_timeout=timedelta(minutes=2),
)
return {"order_line": order_line.product_id, **shipped}
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str, lines: list[OrderLine]) -> dict:
# Start child workflows for each line in parallel
# Each child has its own history, retry policy, and task queue
child_handles: list[ChildWorkflowHandle] = []
for line in lines:
handle = await workflow.start_child_workflow(
FulfillmentWorkflow.run,
line,
id=f"fulfillment-{order_id}-{line.product_id}",
task_queue="fulfillment",
# Parent cancellation propagates to all children
parent_close_policy=workflow.ParentClosePolicy.TERMINATE,
)
child_handles.append(handle)
# Wait for all children to complete
import asyncio
results = await asyncio.gather(*[h.result() for h in child_handles])
return {
"order_id": order_id,
"fulfillments": results,
"status": "fulfilled",
}
# Cancellation from the client
async def cancel_order(order_id: str):
client = await Client.connect("localhost:7233")
handle = client.get_workflow_handle(workflow_id=f"order-{order_id}")
# Cancellation propagates to all child workflows
await handle.cancel()
print(f"Order {order_id} cancellation requested")Durable Timers and Temporal Schedules
workflow.sleep() creates a durable timer stored in Temporal's event history. Unlike asyncio.sleep(), a workflow timer survives worker restarts — the workflow resumes exactly when the timer fires regardless of how many times the worker restarted. For recurring workflows, Temporal Schedules replace cron jobs with managed execution including backfill, pause, and manual trigger.
from datetime import timedelta
from temporalio import workflow
@workflow.defn
class TrialExpiryWorkflow:
@workflow.run
async def run(self, user_id: str, trial_days: int) -> dict:
# Day 1: send welcome
await workflow.execute_activity(
send_onboarding_email,
user_id,
start_to_close_timeout=timedelta(seconds=30),
)
# Day 5: trial expiry warning (durable timer — no cron needed)
await workflow.sleep(timedelta(days=trial_days - 2))
await workflow.execute_activity(
send_trial_expiry_warning,
user_id,
start_to_close_timeout=timedelta(seconds=30),
)
# Day 7: trial ends
await workflow.sleep(timedelta(days=2))
expired = await workflow.execute_activity(
check_subscription_status,
user_id,
start_to_close_timeout=timedelta(seconds=10),
)
if not expired["has_subscription"]:
await workflow.execute_activity(
downgrade_to_free_tier,
user_id,
start_to_close_timeout=timedelta(seconds=30),
)
return {"user_id": user_id, "status": "downgraded"}
return {"user_id": user_id, "status": "converted"}# Temporal Schedules — managed recurring workflow execution
# Replaces cron jobs with full lifecycle management: backfill, pause, trigger
from temporalio.client import (
Client,
Schedule,
ScheduleActionStartWorkflow,
ScheduleIntervalSpec,
ScheduleOverlapPolicy,
ScheduleSpec,
ScheduleState,
)
from datetime import timedelta
async def create_daily_report_schedule():
client = await Client.connect("localhost:7233")
await client.create_schedule(
"daily-revenue-report",
Schedule(
action=ScheduleActionStartWorkflow(
RevenueReportWorkflow.run,
args=["daily"],
id="daily-revenue-report-{scheduled_time}",
task_queue="analytics",
),
spec=ScheduleSpec(
# Cron expression: 9am UTC Monday-Friday
cron_expressions=["0 9 * * 1-5"],
# Or use interval-based scheduling:
# intervals=[ScheduleIntervalSpec(every=timedelta(hours=6))],
# Jitter prevents thundering herd when many schedules fire at once
jitter=timedelta(minutes=5),
),
policy=Schedule.Policy(
# If previous run is still active, skip this run
overlap=ScheduleOverlapPolicy.SKIP,
# If service was down, run all missed executions on restart
# (use SKIP or ALLOW_ALL depending on your requirements)
),
state=ScheduleState(
note="Daily revenue report — runs Monday-Friday at 9am UTC",
),
),
)
# Manage schedules
async def manage_schedule():
client = await Client.connect("localhost:7233")
handle = client.get_schedule_handle("daily-revenue-report")
# Pause the schedule temporarily
await handle.pause(note="Paused for Q2 close — resume manually")
# Trigger an immediate run (outside normal schedule)
await handle.trigger(overlap=ScheduleOverlapPolicy.ALLOW_ALL)
# Backfill missed runs for a date range
from temporalio.client import ScheduleBackfill
import datetime
await handle.backfill(
ScheduleBackfill(
start_at=datetime.datetime(2026, 6, 1, tzinfo=datetime.timezone.utc),
end_at=datetime.datetime(2026, 6, 7, tzinfo=datetime.timezone.utc),
overlap=ScheduleOverlapPolicy.ALLOW_ALL,
)
)
# Resume
await handle.unpause(note="Q2 close complete")Workflow Versioning — Zero-Downtime Code Deploys
Because workflow code is replayed from event history, deploying new workflow code while instances are in-flight requires care. If you change the sequence of activities in a running workflow, replay will produce a different event sequence from the history — a NonDeterminismError. workflow.patched() creates a versioned branch point: old executions that predate the patch follow the old path, new executions follow the new path.
from temporalio import workflow
from datetime import timedelta
@workflow.defn
class OrderFulfillmentWorkflow:
@workflow.run
async def run(self, order_id: str) -> dict:
# Original code path — all pre-existing in-flight workflows use this
# New code path uses workflow.patched() to branch safely
# Step 1 — validate order (existing in all versions)
validation = await workflow.execute_activity(
validate_order,
order_id,
start_to_close_timeout=timedelta(seconds=30),
)
# Step 2 — NEW: fraud check added in v2 deploy
# workflow.patched("fraud-check-v2") returns True for new executions,
# False for executions that started before this patch was deployed
if workflow.patched("fraud-check-v2"):
fraud_result = await workflow.execute_activity(
check_fraud_signals,
order_id,
start_to_close_timeout=timedelta(seconds=15),
)
if fraud_result["flagged"]:
return {"order_id": order_id, "status": "blocked_fraud"}
# Step 3 — charge payment (existing in all versions)
payment = await workflow.execute_activity(
process_payment,
order_id,
start_to_close_timeout=timedelta(seconds=30),
)
# Step 4 — NEW: loyalty points added in v3 deploy
if workflow.patched("loyalty-points-v3"):
await workflow.execute_activity(
award_loyalty_points,
args=[order_id, payment["amount_cents"]],
start_to_close_timeout=timedelta(seconds=10),
)
return {"order_id": order_id, "status": "fulfilled", "payment": payment}
# After all in-flight workflows complete and no old-code workers remain,
# replace workflow.patched() branches with workflow.deprecate_patch()
# to clean up the versioning scaffolding:
#
# if workflow.deprecate_patch("fraud-check-v2"): # always True, no branching
# ...Note
workflow.patched() ensures both old and new workers can handle the same workflow class during the overlap period.Production Kubernetes Deployment
In production, Temporal Server and your Worker pools are separate deployments. Workers are stateless and horizontally scalable — add replicas to increase throughput. Separate task queues for different workflow types give independent scaling and fault isolation. Unlike Airflow or Dagster which co-locate scheduling logic with infrastructure, Temporal separates the durable state server from your Worker compute entirely — Workers can run anywhere that can reach the Temporal Server gRPC endpoint.
# Temporal Worker Deployment — orders domain
# k8s/temporal-worker-orders.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: temporal-worker-orders
namespace: temporal
spec:
replicas: 3
selector:
matchLabels:
app: temporal-worker-orders
template:
metadata:
labels:
app: temporal-worker-orders
spec:
containers:
- name: worker
image: myapp/temporal-worker:1.25.0
command: ["python", "-m", "workers.orders"]
env:
- name: TEMPORAL_HOST
value: "temporal-frontend.temporal.svc.cluster.local:7233"
- name: TEMPORAL_NAMESPACE
value: "production"
- name: TASK_QUEUE
value: "orders"
- name: MAX_CONCURRENT_ACTIVITIES
value: "50"
- name: MAX_CONCURRENT_WORKFLOWS
value: "100"
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "2"
memory: "2Gi"
livenessProbe:
exec:
command: ["python", "-c", "import sys; sys.exit(0)"]
initialDelaySeconds: 30
periodSeconds: 30
---
# HPA for automatic scaling based on task queue depth
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: temporal-worker-orders-hpa
namespace: temporal
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: temporal-worker-orders
minReplicas: 2
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: temporal_task_queue_depth
selector:
matchLabels:
task_queue: orders
target:
type: AverageValue
averageValue: "10"# Worker with configurable concurrency and graceful shutdown
# workers/orders.py
import asyncio
import signal
import os
from temporalio.client import Client, TLSConfig
from temporalio.worker import Worker
from workflows.order_workflow import OrderWorkflow
from activities.payment import charge_payment, refund_payment
from activities.fulfillment import reserve_inventory, create_shipment
from activities.notification import send_order_confirmation
async def run_worker():
# TLS connection to Temporal Server (Temporal Cloud or self-hosted with TLS)
tls_config = None
if os.environ.get("TEMPORAL_TLS_CERT"):
tls_config = TLSConfig(
client_cert=open(os.environ["TEMPORAL_TLS_CERT"], "rb").read(),
client_private_key=open(os.environ["TEMPORAL_TLS_KEY"], "rb").read(),
)
client = await Client.connect(
os.environ.get("TEMPORAL_HOST", "localhost:7233"),
namespace=os.environ.get("TEMPORAL_NAMESPACE", "default"),
tls=tls_config,
)
worker = Worker(
client,
task_queue=os.environ.get("TASK_QUEUE", "orders"),
workflows=[OrderWorkflow],
activities=[
charge_payment,
refund_payment,
reserve_inventory,
create_shipment,
send_order_confirmation,
],
max_concurrent_activities=int(os.environ.get("MAX_CONCURRENT_ACTIVITIES", "50")),
max_concurrent_workflow_tasks=int(os.environ.get("MAX_CONCURRENT_WORKFLOWS", "100")),
)
# Graceful shutdown on SIGTERM (Kubernetes sends this before SIGKILL)
shutdown_event = asyncio.Event()
def handle_sigterm(*_):
print("SIGTERM received — initiating graceful shutdown")
shutdown_event.set()
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGINT, handle_sigterm)
async with worker:
print(f"Worker started on task queue '{worker.task_queue}'")
await shutdown_event.wait()
print("Draining in-flight activities before shutdown...")
# Worker.__aexit__ waits for in-progress tasks to complete
if __name__ == "__main__":
asyncio.run(run_worker())# Create a production namespace with tctl
temporal operator namespace create --namespace production --retention 30d --description "Production workflows"
# Create a staging namespace
temporal operator namespace create --namespace staging --retention 7d --description "Staging workflows"
# List namespaces
temporal operator namespace list
# Describe a namespace
temporal operator namespace describe --namespace production
# View workflow history for a specific execution
temporal workflow show --workflow-id "order-12345" --namespace production
# Reset a workflow to a specific event (disaster recovery)
temporal workflow reset --workflow-id "order-12345" --event-id 15 --reason "activity returned incorrect result — reprocessing"
# Terminate stuck workflows in bulk (use with caution)
temporal workflow list --query 'WorkflowType="OrderWorkflow" AND ExecutionStatus="Running"' --namespace productionProduction Checklist
Enforce workflow determinism in CI by running a determinism linter or writing unit tests that replay a captured event history and assert identical outcomes. At minimum, audit workflow code for datetime.now(), random, uuid4, asyncio.sleep() (use workflow.sleep instead), and any direct I/O. A NonDeterminismError in production causes workflow tasks to fail continuously until code is fixed and deployed.
Set heartbeat_timeout shorter than your heartbeat cadence on any activity that runs longer than 30 seconds. If an activity heartbeats every 10 seconds, set heartbeat_timeout to 15–20 seconds. Without heartbeat_timeout, a crashed worker holds the activity task invisibly until schedule_to_start_timeout expires — delaying recovery by minutes instead of seconds.
Build a non_retryable_error_types taxonomy from your business error catalog before deploying. Card declines, validation errors, duplicate order IDs, and permission denials should not be retried — they need human intervention or a different workflow path. Retrying them wastes activity slots and triggers false alerting. Use ApplicationError(cause, non_retryable=True) from activity code to mark errors dynamically.
Use one Temporal Namespace per environment (development, staging, production) — not one namespace with prefixed workflow IDs. Namespaces have independent visibility, retention policies, and access controls. Cross-namespace workflow queries or signals are not supported; this enforces clean environment isolation.
Set schedule_to_close_timeout on activities that interact with external services with SLAs. start_to_close_timeout caps a single attempt; schedule_to_close_timeout caps the total execution time including retries. For a payment activity with a 5-minute SLA and 5 retry attempts, set start_to_close_timeout=30s and schedule_to_close_timeout=5min to guarantee the order fails fast if the payment service is degraded.
Implement Worker Versioning (Build IDs) when deploying breaking workflow changes to ensure new workers only pick up new executions while old workers drain existing ones. Without Worker Versioning, old and new workers compete for tasks — if the old worker retires before in-flight workflows complete, a new worker picks up replays that fail with NonDeterminismError.
Use Workflow ID collision policies to prevent duplicate workflow starts in distributed systems. The default REJECT_DUPLICATE prevents starting a workflow with an ID that is already running. ALLOW_DUPLICATE_FAILED_ONLY allows restarting failed workflows. Always set a deterministic Workflow ID based on the entity (e.g., order-{order_id}) so retried client calls are idempotent.
Scrape Temporal's Prometheus metrics endpoint exposed by the Frontend and History services. Key metrics: temporal_request_latency (workflow task scheduling), temporal_activity_poll_succeed_total (worker throughput), temporal_workflow_task_schedule_to_start_latency (task queue depth indicator), and temporal_workflow_failed_total by WorkflowType for SLA alerting.
Configure signal handler idempotency for signals that may be delivered more than once. Temporal guarantees at-least-once signal delivery in some failure scenarios — a signal handler that increments a counter must deduplicate on signal ID or use set semantics. Use a dict keyed by signal ID to deduplicate approval signals.
Evaluate Temporal Cloud for teams with fewer than 3 platform engineers. Temporal Cloud eliminates server operation, persistence management, TLS certificate rotation, and Cassandra/PostgreSQL upgrades. Billing is per action (workflow started, activity scheduled, signal sent) — typically lower total cost than self-hosting once engineering time is included. Self-hosting makes sense for compliance requirements, very high volumes (>1B actions/month), or custom visibility backends.
Running distributed processes that fail halfway through multi-step operations with no way to resume from the failure point, using a combination of cron jobs, message queues, and database flags to track workflow state, or losing in-flight business processes when a service restarts?
We design and implement Temporal workflow systems — from workflow and activity design with determinism audits and heartbeat cadence planning, RetryPolicy taxonomy mapping business error types to non-retryable exceptions, signal-driven human-in-the-loop approval workflow patterns, child workflow decomposition for complex multi-domain processes, Temporal Schedules migration from cron-based recurring pipelines, workflow.patched versioning strategy for zero-downtime deploys to in-flight instances, Worker pool sizing and task queue topology for throughput isolation, Kubernetes Deployment configurations with separate worker pools per domain, tctl namespace setup for production and staging environment isolation, Prometheus metrics integration and Grafana dashboard configuration for workflow success rate, schedule-to-close latency, and activity retry rate alerting, and Temporal Cloud migration planning for teams that want managed infrastructure without operating the Temporal cluster. Let’s talk.
Let's Talk