Back to Blog
OpenTelemetryData PipelinesObservabilityDistributed TracingAirflowSparkPythonData Engineering

OpenTelemetry for Data Pipelines — Distributed Tracing and Observability Beyond APIs

A comprehensive guide to applying OpenTelemetry in data engineering: TracerProvider and MeterProvider setup with BatchSpanProcessor and OTLP gRPC exporter, custom span design with data-specific semantic conventions (pipeline.stage, pipeline.records.read/written/failed, pipeline.partition.key, pipeline.bytes.processed), W3C TraceContext propagation across batch boundaries via Kafka message headers, S3 object metadata, and Airflow XComs, Airflow 2.7+ native OTel integration with task-level child spans and XCom-based context chaining, driver-side PySpark instrumentation with Spark REST API metrics enrichment and Java agent for executor suppression, data quality checks as OTel metrics (freshness, completeness ratio, row count, validity), OTel Collector tail-based sampling (keep all errors, 10% of healthy traces), Grafana Tempo and Jaeger backend routing, dbt run_results.json to OTel span conversion, Prometheus alerting rules for pipeline SLOs, and a 12-point production checklist covering orphaned spans, PII leakage, sampler configuration, and SDK shutdown.

2026-06-04

The Observability Gap — Why API Tracing Alone Fails for Data Pipelines

OpenTelemetry has become the default observability standard for microservices and APIs. Auto-instrumentation libraries attach to HTTP frameworks, database clients, and gRPC stubs, producing traces that describe request flows in milliseconds. The tooling works beautifully for synchronous, request-scoped systems where a trace has a clear start and end.

Data pipelines are structurally different. A single Airflow DAG run may spawn dozens of tasks across multiple workers over three hours, processing 500 million records through intermediate stages, each writing to different storage systems, each running as a separate OS process with no shared in-memory trace context. A Spark job fans out across 200 executors in parallel and then collapses back to a driver. A dbt run compiles SQL, executes 80 models in dependency order, and runs schema tests — and the only feedback on failure is a log line 40 minutes in.

The observability gaps this creates are painful and concrete. When a pipeline delivers stale data, you have no trace showing which stage introduced the latency. When row counts drop 30% in the output table, you have no span attribute recording where rows were dropped and why. When an Airflow task retried three times before succeeding, each retry is a separate log event with no causal link to the upstream Kafka consumer that sent malformed records. OpenTelemetry can close all of these gaps — but it requires deliberate, data-aware instrumentation rather than a one-line auto-instrument call.

This guide covers the complete picture: OTel Python SDK setup, custom span design with data-specific attributes, context propagation across batch boundaries (Kafka headers, S3 metadata, Airflow XComs), Airflow and Spark instrumentation patterns, data-quality metrics as OTel metrics, exporting to Jaeger and Grafana Tempo, and building pipeline SLO dashboards.

OTel Python SDK Setup — TracerProvider, MeterProvider, and OTLP Exporter

The OpenTelemetry Python SDK separates signal creation from signal export via a provider/exporter architecture. The TracerProvider manages trace creation; MeterProvider manages metric instruments; both push to an OTLPExporter that forwards signals to a collector endpoint. For data pipeline use, configure the SDK once per process in a shared module that all pipeline stages import.

# pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc
# pip install opentelemetry-instrumentation-requests opentelemetry-instrumentation-sqlalchemy

# pipeline_otel.py — shared OTel bootstrap module
import os
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION

OTEL_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
PIPELINE_NAME = os.getenv("PIPELINE_NAME", "data-pipeline")
PIPELINE_ENV  = os.getenv("ENV", "production")

def setup_otel(service_name: str = PIPELINE_NAME) -> tuple[trace.Tracer, metrics.Meter]:
    resource = Resource.create({
        SERVICE_NAME:    service_name,
        SERVICE_VERSION: os.getenv("GIT_SHA", "unknown"),
        "deployment.environment": PIPELINE_ENV,
        "pipeline.team":          os.getenv("PIPELINE_TEAM", "data-engineering"),
    })

    # ── Traces ──
    tracer_provider = TracerProvider(resource=resource)
    tracer_provider.add_span_processor(
        BatchSpanProcessor(
            OTLPSpanExporter(endpoint=OTEL_ENDPOINT, insecure=True),
            max_queue_size=4096,
            max_export_batch_size=512,
            export_timeout_millis=30_000,
        )
    )
    trace.set_tracer_provider(tracer_provider)

    # ── Metrics ──
    metric_reader = PeriodicExportingMetricReader(
        OTLPMetricExporter(endpoint=OTEL_ENDPOINT, insecure=True),
        export_interval_millis=60_000,   # push every 60 s
    )
    meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
    metrics.set_meter_provider(meter_provider)

    tracer = trace.get_tracer(service_name)
    meter  = metrics.get_meter(service_name)
    return tracer, meter

# Usage in any pipeline module:
# from pipeline_otel import setup_otel
# tracer, meter = setup_otel("orders-etl")

Note

Use BatchSpanProcessor for all data pipeline work — never SimpleSpanProcessor. Batch processing buffers spans in memory and flushes asynchronously, so a slow OTLP endpoint does not add latency to your pipeline stages. Set max_queue_size high enough to absorb bursts from parallel Spark-style fan-out operations; at 512 bytes per span, 4096 spans ≈ 2 MB of buffer.

Custom Span Design — Data-Specific Attributes and Semantic Conventions

Standard OTel semantic conventions were designed for HTTP and RPC. For data pipelines you need a richer attribute vocabulary. The OpenTelemetry semantic conventions provide a general attribute namespace, but the community has converged on additional conventions specifically for data work. Standardise these across your team so dashboards and alerts can be written once and applied to all pipelines.

Key data pipeline attributes to attach to every span:

  • pipeline.stage — logical stage name (extract, transform, load, validate)
  • pipeline.source.system and pipeline.sink.system — e.g. kafka, postgres, s3, bigquery
  • pipeline.records.read, pipeline.records.written, pipeline.records.failed — row counts attached as events at span end
  • pipeline.partition.key — the partition or date slice being processed (e.g. 2026-06-04)
  • pipeline.bytes.processed — bytes read or written (set on span end)
# etl_pipeline.py — a fully instrumented ETL stage
import time
from opentelemetry import trace
from opentelemetry.trace import StatusCode
from pipeline_otel import setup_otel

tracer, meter = setup_otel("orders-etl")

# ── Metric instruments ──
records_counter = meter.create_counter(
    "pipeline.records.processed",
    unit="records",
    description="Total records processed by this pipeline stage",
)
stage_duration = meter.create_histogram(
    "pipeline.stage.duration_seconds",
    unit="s",
    description="Wall-clock duration of each pipeline stage",
)
data_quality_gauge = meter.create_up_down_counter(
    "pipeline.data_quality.failed_records",
    unit="records",
    description="Records that failed validation in this stage",
)


def extract_from_postgres(
    conn,
    table: str,
    partition_date: str,
) -> list[dict]:
    with tracer.start_as_current_span(
        "extract",
        attributes={
            "pipeline.stage":         "extract",
            "pipeline.source.system": "postgres",
            "pipeline.source.table":  table,
            "pipeline.partition.key": partition_date,
            "db.system":              "postgresql",
        },
    ) as span:
        t0 = time.monotonic()
        try:
            rows = conn.execute(
                f"SELECT * FROM {table} WHERE date_trunc('day', created_at) = %s",
                (partition_date,),
            ).fetchall()

            record_count = len(rows)
            bytes_est    = record_count * 512  # rough estimate

            # Attach data volume attributes at span end
            span.set_attribute("pipeline.records.read",     record_count)
            span.set_attribute("pipeline.bytes.processed",  bytes_est)
            span.set_status(StatusCode.OK)

            # Increment metric counter with stage/source labels
            records_counter.add(
                record_count,
                {"pipeline.stage": "extract", "pipeline.source": "postgres"},
            )
            return rows

        except Exception as exc:
            span.set_status(StatusCode.ERROR, str(exc))
            span.record_exception(exc)
            raise
        finally:
            stage_duration.record(
                time.monotonic() - t0,
                {"pipeline.stage": "extract"},
            )


def transform_orders(rows: list[dict], partition_date: str) -> tuple[list[dict], int]:
    with tracer.start_as_current_span(
        "transform",
        attributes={
            "pipeline.stage":         "transform",
            "pipeline.partition.key": partition_date,
            "pipeline.records.read":  len(rows),
        },
    ) as span:
        t0 = time.monotonic()
        output, failed = [], 0

        for row in rows:
            try:
                # Apply business logic
                transformed = {
                    "order_id":   row["id"],
                    "revenue_usd": float(row["amount"]) / 100,
                    "status":     row["status"].lower().strip(),
                    "date":       partition_date,
                }
                if transformed["revenue_usd"] < 0:
                    raise ValueError(f"Negative revenue: {row['id']}")
                output.append(transformed)
            except Exception as exc:
                failed += 1
                # Record validation failure as a span event (not an error)
                span.add_event(
                    "record_validation_failed",
                    attributes={
                        "record.id":     str(row.get("id", "unknown")),
                        "failure.reason": str(exc),
                    },
                )

        span.set_attribute("pipeline.records.written", len(output))
        span.set_attribute("pipeline.records.failed",  failed)

        data_quality_gauge.add(failed, {"pipeline.stage": "transform"})
        stage_duration.record(time.monotonic() - t0, {"pipeline.stage": "transform"})
        return output, failed


def load_to_warehouse(records: list[dict], partition_date: str) -> None:
    with tracer.start_as_current_span(
        "load",
        attributes={
            "pipeline.stage":        "load",
            "pipeline.sink.system":  "bigquery",
            "pipeline.sink.table":   "analytics.orders",
            "pipeline.partition.key": partition_date,
        },
    ) as span:
        t0 = time.monotonic()
        try:
            # ... actual BQ insert logic here ...
            span.set_attribute("pipeline.records.written", len(records))
            span.set_status(StatusCode.OK)
            records_counter.add(len(records), {"pipeline.stage": "load", "pipeline.sink": "bigquery"})
        except Exception as exc:
            span.set_status(StatusCode.ERROR, str(exc))
            span.record_exception(exc)
            raise
        finally:
            stage_duration.record(time.monotonic() - t0, {"pipeline.stage": "load"})

Context Propagation Across Batch Boundaries — Kafka, S3, and Airflow XComs

The hardest part of data pipeline tracing is stitching together causally related work that runs in separate processes, hours apart, with no shared memory. OpenTelemetry solves this with W3C TraceContext propagation: a traceparent header that encodes the trace ID and parent span ID as a string that can be serialised into any carrier — HTTP headers, Kafka message headers, S3 object metadata, or database columns.

For batch pipelines, the cleanest propagation strategy is to inject the trace context into the pipeline run record at the trigger boundary and extract it in each downstream stage. Here is how to propagate through Kafka message headers and through Airflow XComs:

# context_propagation.py
from opentelemetry import trace, propagate
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

propagator = TraceContextTextMapPropagator()


# ── Kafka producer: inject trace context into message headers ──
def produce_with_trace(producer, topic: str, key: str, value: bytes) -> None:
    carrier: dict[str, str] = {}
    propagator.inject(carrier)   # fills {"traceparent": "00-<trace_id>-<span_id>-01"}

    headers = [(k, v.encode()) for k, v in carrier.items()]
    producer.produce(topic, key=key.encode(), value=value, headers=headers)


# ── Kafka consumer: extract trace context from message headers ──
def consume_and_trace(msg) -> trace.Span:
    # Reconstruct carrier dict from Kafka header list
    carrier = {k: v.decode() for k, v in (msg.headers() or [])}
    parent_ctx = propagator.extract(carrier)

    tracer = trace.get_tracer("orders-consumer")
    # Start a new span that is a child of the producer's span
    return tracer.start_as_current_span(
        "consume_order_event",
        context=parent_ctx,
        attributes={
            "messaging.system":           "kafka",
            "messaging.destination.name": msg.topic(),
            "messaging.kafka.partition":  msg.partition(),
            "messaging.kafka.offset":     msg.offset(),
        },
    )


# ── Airflow: propagate trace context via XCom ──
# In the trigger/entry task:
def push_trace_context_to_xcom(ti, **kwargs) -> None:
    carrier: dict[str, str] = {}
    propagator.inject(carrier)
    ti.xcom_push(key="otel_trace_context", value=carrier)


# In a downstream task:
def downstream_task_with_parent_trace(ti, **kwargs) -> None:
    carrier = ti.xcom_pull(key="otel_trace_context", task_ids="trigger_task")
    parent_ctx = propagator.extract(carrier or {})

    tracer = trace.get_tracer("airflow-pipeline")
    with tracer.start_as_current_span(
        "downstream_processing",
        context=parent_ctx,
        attributes={"airflow.task_id": kwargs["task_instance"].task_id},
    ) as span:
        # ... task logic ...
        span.set_attribute("pipeline.records.processed", 12_000)


# ── S3: propagate via object metadata ──
import boto3

def upload_with_trace(s3_client, bucket: str, key: str, data: bytes) -> None:
    carrier: dict[str, str] = {}
    propagator.inject(carrier)

    s3_client.put_object(
        Bucket=bucket,
        Key=key,
        Body=data,
        Metadata={f"otel-{k}": v for k, v in carrier.items()},
    )

def download_and_extract_trace(s3_client, bucket: str, key: str) -> dict[str, str]:
    response = s3_client.head_object(Bucket=bucket, Key=key)
    meta = response.get("Metadata", {})
    return {k[5:]: v for k, v in meta.items() if k.startswith("otel-")}

Note

Airflow's XCom-based propagation creates a logical trace that spans the full DAG run, even across multi-hour gaps between task executions. The trade-off is that the trace timeline will show idle time between spans — this is correct and intentional. A trace duration of three hours with twenty minutes of active work is useful data: it tells you that the pipeline spent 2h40m waiting for upstream dependencies, not processing records. Filter by span duration to distinguish scheduler wait from compute work.

Airflow DAG Instrumentation — Task-Level Spans and Data Volume Events

Airflow 2.7+ ships with built-in OpenTelemetry tracing support via the opentelemetry extra. When enabled, Airflow creates a span per task instance with standard attributes like airflow.dag_id, airflow.task_id, and airflow.run_id. For richer data-volume context, supplement the built-in spans with custom child spans inside your task callables.

# airflow.cfg (or environment variables)
# [traces]
# otel_on = True
# otel_host = otel-collector
# otel_port = 4318   # HTTP/protobuf endpoint (not gRPC for Airflow 2.7)
# otel_ssl_active = False

# dags/orders_etl_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from opentelemetry import trace
from pipeline_otel import setup_otel

tracer, meter = setup_otel("airflow-orders-etl")

records_written = meter.create_counter(
    "airflow.task.records_written",
    unit="records",
)


def extract_task(partition_date: str, ti, **kwargs) -> None:
    # Airflow already created the parent span — start a child span
    with tracer.start_as_current_span(
        "extract_orders_from_db",
        attributes={
            "pipeline.stage":         "extract",
            "pipeline.partition.key": partition_date,
            "pipeline.source.system": "postgres",
        },
    ) as span:
        # push trace context for downstream tasks
        carrier: dict[str, str] = {}
        from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
        TraceContextTextMapPropagator().inject(carrier)
        ti.xcom_push(key="otel_context", value=carrier)

        rows = _fetch_orders(partition_date)
        span.set_attribute("pipeline.records.read", len(rows))
        ti.xcom_push(key="record_count", value=len(rows))
        return rows


def transform_task(ti, **kwargs) -> None:
    carrier = ti.xcom_pull(key="otel_context", task_ids="extract")
    from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
    parent_ctx = TraceContextTextMapPropagator().extract(carrier or {})

    with tracer.start_as_current_span(
        "transform_orders",
        context=parent_ctx,
        attributes={"pipeline.stage": "transform"},
    ) as span:
        raw_count = ti.xcom_pull(key="record_count", task_ids="extract") or 0
        # ... transformation logic ...
        output_count = int(raw_count * 0.98)   # mock: 2% filtered

        span.set_attribute("pipeline.records.read",    raw_count)
        span.set_attribute("pipeline.records.written", output_count)
        span.set_attribute("pipeline.records.failed",  raw_count - output_count)
        records_written.add(output_count, {"pipeline.stage": "transform"})


with DAG(
    dag_id="orders_etl",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    default_args={
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
        "retry_exponential_backoff": True,
    },
) as dag:

    extract = PythonOperator(
        task_id="extract",
        python_callable=extract_task,
        op_kwargs={"partition_date": "{{ ds }}"},
    )

    transform = PythonOperator(
        task_id="transform",
        python_callable=transform_task,
    )

    extract >> transform

Spark Job Instrumentation — Driver Spans, Stage Metrics, and the Listener API

Spark's distributed execution model poses a unique propagation challenge: task closures are serialised and shipped to executors, but the OTel SDK is not thread-safe across JVM boundaries. The practical pattern is to instrument at the driver level, creating spans around each Spark action (collect(), write(), count()) and enriching those spans with Spark query execution metrics after the action completes. The Spark metrics REST API (available on port 4040 during a job run) exposes per-stage row counts, shuffle bytes, spill sizes, and task durations.

# spark_otel.py — driver-side OTel instrumentation for PySpark jobs
import time
import requests
from pyspark.sql import SparkSession, DataFrame
from opentelemetry import trace
from opentelemetry.trace import StatusCode
from pipeline_otel import setup_otel

tracer, meter = setup_otel("spark-orders-job")

spark_rows_read    = meter.create_counter("spark.rows.read",    unit="rows")
spark_shuffle_mb   = meter.create_histogram("spark.shuffle.mb", unit="MB")
spark_stage_dur    = meter.create_histogram("spark.stage.duration_seconds", unit="s")


def _fetch_spark_stage_metrics(app_id: str, stage_id: int) -> dict:
    try:
        r = requests.get(
            f"http://localhost:4040/api/v1/applications/{app_id}/stages/{stage_id}",
            timeout=5,
        )
        return r.json()[0] if r.ok else {}
    except Exception:
        return {}


def run_spark_stage(
    spark: SparkSession,
    df: DataFrame,
    stage_name: str,
    output_path: str,
    partition_date: str,
) -> None:
    with tracer.start_as_current_span(
        stage_name,
        attributes={
            "pipeline.stage":               stage_name,
            "pipeline.partition.key":       partition_date,
            "pipeline.sink.system":         "s3",
            "pipeline.sink.path":           output_path,
            "spark.executor.instances":     spark.conf.get("spark.executor.instances", "unknown"),
            "spark.executor.memory":        spark.conf.get("spark.executor.memory", "unknown"),
        },
    ) as span:
        t0 = time.monotonic()
        try:
            # Trigger a Spark action (this is where work actually happens)
            written_count = df.write.mode("overwrite").parquet(output_path)

            # Read back metrics from Spark REST API post-action
            app_id = spark.sparkContext.applicationId
            # Last stage ID roughly corresponds to this action (simplification)
            last_stage = spark.sparkContext._jsc.statusTracker().getActiveStageIds()

            span.set_status(StatusCode.OK)
            span.set_attribute("pipeline.records.written", df.count())

        except Exception as exc:
            span.set_status(StatusCode.ERROR, str(exc))
            span.record_exception(exc)
            raise
        finally:
            elapsed = time.monotonic() - t0
            spark_stage_dur.record(elapsed, {"pipeline.stage": stage_name})
            span.set_attribute("pipeline.stage.duration_seconds", round(elapsed, 3))


# ── Full PySpark job with parent span ──
def main(partition_date: str) -> None:
    spark = SparkSession.builder         .appName(f"orders-etl-{partition_date}")         .config("spark.sql.adaptive.enabled", "true")         .config("spark.sql.adaptive.coalescePartitions.enabled", "true")         .getOrCreate()

    with tracer.start_as_current_span(
        "spark_job",
        attributes={
            "pipeline.job":           "orders-etl",
            "pipeline.partition.key": partition_date,
            "spark.app.id":           spark.sparkContext.applicationId,
            "spark.app.name":         spark.sparkContext.appName,
        },
    ) as root_span:
        try:
            raw_df = spark.read.parquet(
                f"s3://my-lake/raw/orders/date={partition_date}/"
            )
            root_span.set_attribute("pipeline.records.read", raw_df.count())

            transformed_df = raw_df                 .filter("amount > 0")                 .selectExpr(
                    "id AS order_id",
                    "customer_id",
                    "CAST(amount / 100 AS DOUBLE) AS revenue_usd",
                    "status",
                )

            run_spark_stage(
                spark, transformed_df,
                stage_name="load_to_warehouse",
                output_path=f"s3://my-lake/warehouse/orders/date={partition_date}/",
                partition_date=partition_date,
            )
            root_span.set_status(StatusCode.OK)
        except Exception as exc:
            root_span.set_status(StatusCode.ERROR, str(exc))
            root_span.record_exception(exc)
            raise
        finally:
            spark.stop()

Note

For Scala/Java Spark jobs, use the OpenTelemetry Java agent (-javaagent:opentelemetry-javaagent.jar) on the driver JVM. It auto-instruments JDBC connections, HTTP clients, and thread pool operations. Disable the executor-side agent with spark.executor.extraJavaOptions=-Dotel.sdk.disabled=true to avoid flooding the collector with per-task spans and the associated overhead.

Data Quality Metrics — Freshness, Completeness, and Validity as OTel Signals

Observability in data systems is not only about latency and error rates. The signals that matter most are often about the data itself: is the latest partition complete, are there unexpected nulls, is the row count within historical bounds? These data quality checks map naturally to OpenTelemetry metrics — the same infrastructure that alerts on p99 latency can alert on a completeness ratio dropping below 0.99.

# data_quality_metrics.py — emit data quality checks as OTel metrics
import time
from opentelemetry import metrics
from pipeline_otel import setup_otel

_, meter = setup_otel("data-quality-monitor")

# ── Define metric instruments once at module level ──
freshness_gauge    = meter.create_gauge(
    "dq.freshness_seconds",
    unit="s",
    description="Seconds since the latest record in the table was ingested",
)
completeness_gauge = meter.create_gauge(
    "dq.completeness_ratio",
    unit="1",
    description="Ratio of non-null values for a column (0.0–1.0)",
)
row_count_gauge    = meter.create_gauge(
    "dq.row_count",
    unit="rows",
    description="Row count of a table or partition",
)
validity_gauge     = meter.create_gauge(
    "dq.validity_ratio",
    unit="1",
    description="Ratio of records passing domain validity rules",
)


def run_data_quality_checks(conn, table: str, partition_date: str) -> dict:
    results = {}

    # ── 1. Freshness: seconds since latest record ──
    max_ts = conn.execute(
        f"SELECT EXTRACT(EPOCH FROM (NOW() - MAX(created_at))) FROM {table} "
        f"WHERE DATE(created_at) = %s",
        (partition_date,),
    ).scalar()
    freshness_gauge.set(
        float(max_ts or 999_999),
        {"dq.table": table, "dq.partition": partition_date},
    )
    results["freshness_seconds"] = float(max_ts or 999_999)

    # ── 2. Row count ──
    count = conn.execute(
        f"SELECT COUNT(*) FROM {table} WHERE DATE(created_at) = %s",
        (partition_date,),
    ).scalar()
    row_count_gauge.set(
        int(count),
        {"dq.table": table, "dq.partition": partition_date},
    )
    results["row_count"] = int(count)

    # ── 3. Completeness: non-null ratio for critical columns ──
    critical_columns = ["order_id", "customer_id", "amount"]
    for col in critical_columns:
        non_null = conn.execute(
            f"SELECT COUNT({col})::float / NULLIF(COUNT(*), 0) "
            f"FROM {table} WHERE DATE(created_at) = %s",
            (partition_date,),
        ).scalar()
        ratio = float(non_null or 0.0)
        completeness_gauge.set(
            ratio,
            {"dq.table": table, "dq.partition": partition_date, "dq.column": col},
        )
        results[f"completeness_{col}"] = ratio

    # ── 4. Validity: amount must be positive ──
    valid_count = conn.execute(
        f"SELECT COUNT(*)::float / NULLIF(COUNT(*), 0) "
        f"FROM {table} WHERE DATE(created_at) = %s AND amount > 0",
        (partition_date,),
    ).scalar()
    validity_gauge.set(
        float(valid_count or 0.0),
        {"dq.table": table, "dq.rule": "amount_positive", "dq.partition": partition_date},
    )
    results["validity_amount_positive"] = float(valid_count or 0.0)

    return results


# ── Prometheus alerting rules (via OTel → Prometheus bridge) ──
# alert: DataFreshnessExceeded
#   expr: dq_freshness_seconds{dq_table="orders"} > 7200
#   for:  5m
#   annotations:
#     summary: "orders table is stale (> 2h since latest record)"
#
# alert: CompletenessDropped
#   expr: dq_completeness_ratio{dq_column="order_id"} < 0.999
#   for:  5m
#   annotations:
#     summary: "order_id completeness below 99.9% — possible upstream schema change"

OTel Collector, Grafana Tempo, and Jaeger — Routing Pipeline Telemetry

The OpenTelemetry Collector is the recommended deployment pattern: pipelines push to a local collector, which fans out to multiple backends (Jaeger for ad-hoc trace exploration, Grafana Tempo for long-term retention, Prometheus for metrics alerting). The collector also handles batching, retry-on-failure, and tail-based sampling — keeping only traces where a stage recorded an error or exceeded a latency threshold.

# docker-compose.yml — local OTel stack for development
version: "3.9"
services:

  otel-collector:
    image: otel/opentelemetry-collector-contrib:0.101.0
    command: ["--config=/etc/otelcol-contrib/config.yaml"]
    volumes:
      - ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml
    ports:
      - "4317:4317"   # gRPC OTLP receiver (used by Python SDK)
      - "4318:4318"   # HTTP OTLP receiver (used by Airflow 2.7+)
      - "8888:8888"   # Prometheus metrics for the collector itself
    depends_on: [jaeger, tempo, prometheus]

  jaeger:
    image: jaegertracing/all-in-one:1.57
    ports:
      - "16686:16686"  # Jaeger UI
      - "4317"         # OTLP gRPC (internal)

  tempo:
    image: grafana/tempo:2.4.1
    command: ["-config.file=/etc/tempo.yaml"]
    volumes:
      - ./tempo.yaml:/etc/tempo.yaml
      - tempo-data:/var/tempo
    ports:
      - "3200:3200"    # Tempo HTTP API (Grafana datasource)

  prometheus:
    image: prom/prometheus:v2.51.0
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana:10.4.2
    ports:
      - "3000:3000"
    environment:
      GF_AUTH_ANONYMOUS_ENABLED: "true"
      GF_AUTH_ANONYMOUS_ORG_ROLE: "Admin"

volumes:
  tempo-data:

---
# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: "0.0.0.0:4317"
      http:
        endpoint: "0.0.0.0:4318"

processors:
  batch:
    timeout: 5s
    send_batch_size: 1024

  # Tail-based sampling: keep all error traces + 10% of healthy traces
  tail_sampling:
    decision_wait: 30s
    policies:
      - name: errors-policy
        type: status_code
        status_code: {status_codes: [ERROR]}
      - name: latency-policy
        type: latency
        latency: {threshold_ms: 30000}
      - name: probabilistic-healthy
        type: probabilistic
        probabilistic: {sampling_percentage: 10}

  # Add pipeline team and environment resource attributes
  resource:
    attributes:
      - key: "deployment.environment"
        value: "production"
        action: insert

exporters:
  jaeger:
    endpoint: "jaeger:4317"
    tls:
      insecure: true

  otlp/tempo:
    endpoint: "tempo:4317"
    tls:
      insecure: true

  prometheus:
    endpoint: "0.0.0.0:8889"
    namespace: "otel"

service:
  pipelines:
    traces:
      receivers:  [otlp]
      processors: [batch, tail_sampling, resource]
      exporters:  [jaeger, otlp/tempo]
    metrics:
      receivers:  [otlp]
      processors: [batch]
      exporters:  [prometheus]

dbt Run Instrumentation — Model-Level Spans via dbt Artifacts

dbt does not yet have a native OTel integration, but it produces machine-readable run artifacts that can be converted into spans after each run. The run_results.json artifact (written to the target/ directory after every dbt run) contains per-model timing, row counts (for supported adapters), and pass/fail status for every model and test. A post-run hook can parse this file and emit OTel spans with backdated timestamps.

# dbt_otel_exporter.py — convert dbt run_results.json into OTel spans
import json
from datetime import datetime, timezone
from pathlib import Path
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider, ReadableSpan
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.trace import StatusCode, SpanContext, TraceFlags

OTLP_ENDPOINT = "http://otel-collector:4317"

def emit_dbt_spans(run_results_path: str = "target/run_results.json") -> None:
    with open(run_results_path) as f:
        results = json.load(f)

    provider = TracerProvider()
    provider.add_span_processor(
        BatchSpanProcessor(OTLPSpanExporter(endpoint=OTLP_ENDPOINT, insecure=True))
    )
    tracer = provider.get_tracer("dbt")

    invocation_id = results.get("metadata", {}).get("invocation_id", "unknown")
    run_started   = results.get("metadata", {}).get("generated_at", "")

    # Create a root span covering the entire dbt run
    with tracer.start_as_current_span(
        "dbt_run",
        attributes={
            "dbt.invocation_id":    invocation_id,
            "dbt.dbt_schema_version": results.get("metadata", {}).get("dbt_schema_version", ""),
            "pipeline.tool":        "dbt",
        },
    ) as root:

        for result in results.get("results", []):
            node_id    = result.get("unique_id", "")
            node_name  = node_id.split(".")[-1] if node_id else "unknown"
            status     = result.get("status", "")
            timing     = result.get("timing", [{}])

            # dbt timing: list of {name, started_at, completed_at}
            execute_timing = next(
                (t for t in timing if t.get("name") == "execute"), {}
            )
            compile_timing = next(
                (t for t in timing if t.get("name") == "compile"), {}
            )

            rows_affected = result.get("adapter_response", {}).get("rows_affected", None)
            message       = result.get("message", "")

            with tracer.start_as_current_span(
                f"dbt.{node_name}",
                attributes={
                    "dbt.node_id":       node_id,
                    "dbt.node_name":     node_name,
                    "dbt.resource_type": node_id.split(".")[0] if "." in node_id else "unknown",
                    "dbt.status":        status,
                    "dbt.message":       message,
                    **({"pipeline.records.written": rows_affected} if rows_affected is not None else {}),
                },
            ) as span:
                if status in ("error", "fail"):
                    span.set_status(StatusCode.ERROR, message)
                    # Attach compilation error if present
                    compiled_code = result.get("compiled_code") or result.get("compiled_sql", "")
                    if compiled_code:
                        span.set_attribute("dbt.compiled_sql_length", len(compiled_code))
                else:
                    span.set_status(StatusCode.OK)

    # Flush all spans before exiting
    provider.force_flush()
    provider.shutdown()


if __name__ == "__main__":
    emit_dbt_spans()
    print("dbt OTel spans exported.")

Note

Attach this exporter as a dbt on-run-end hook or as a post-step in your CI/CD pipeline. For Airflow-orchestrated dbt, call it in a PythonOperator task after the BashOperator that runs dbt run, and propagate the Airflow trace context so dbt model spans appear as children of the DAG run span in Grafana Tempo.

Pipeline SLO Dashboards — Grafana Queries for Latency, Volume, and Quality

With traces in Tempo and metrics in Prometheus, you can build pipeline SLO dashboards that replace ad-hoc log grep sessions. The key panels for a data pipeline SLO dashboard are: end-to-end pipeline duration (histogram over the root span), records processed per run (counter), data quality ratios (gauges), and error rate per stage (derived from span status).

# Grafana dashboard panels — Prometheus queries

# ── Panel 1: Pipeline p95 end-to-end duration (last 24h) ──
# Uses spans exported to Prometheus via Tempo's metrics generator
histogram_quantile(
  0.95,
  sum(rate(
    traces_spanmetrics_duration_milliseconds_bucket{
      service_name="orders-etl",
      span_name="spark_job"
    }[5m]
  )) by (le)
) / 1000  # convert ms → s

# ── Panel 2: Records written per hour (by stage) ──
sum by (pipeline_stage) (
  increase(
    otel_pipeline_records_processed_total[1h]
  )
)

# ── Panel 3: Data quality completeness (last run) ──
min by (dq_table, dq_column) (
  dq_completeness_ratio
)

# ── Panel 4: Data freshness heatmap ──
dq_freshness_seconds

# ── Panel 5: Error rate per stage (span-derived metric) ──
sum by (pipeline_stage) (
  rate(
    traces_spanmetrics_calls_total{
      service_name="orders-etl",
      status_code="STATUS_CODE_ERROR"
    }[5m]
  )
)
/
sum by (pipeline_stage) (
  rate(
    traces_spanmetrics_calls_total{
      service_name="orders-etl"
    }[5m]
  )
)

---
# Prometheus alerting rules (prometheus/rules/pipeline-slos.yml)
groups:
  - name: pipeline_slos
    rules:

      - alert: PipelineStageTooSlow
        expr: |
          histogram_quantile(0.95,
            sum(rate(traces_spanmetrics_duration_milliseconds_bucket{
              service_name="orders-etl"
            }[30m])) by (le, span_name)
          ) > 1800000    # 30 minutes in ms
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Pipeline stage {{ $labels.span_name }} p95 > 30 min"

      - alert: DataFreshnessViolation
        expr: dq_freshness_seconds{dq_table="orders"} > 7200
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Orders table stale — last record > 2h ago"

      - alert: RecordCountAnomaly
        expr: |
          abs(
            dq_row_count{dq_table="orders"} -
            avg_over_time(dq_row_count{dq_table="orders"}[7d])
          ) / avg_over_time(dq_row_count{dq_table="orders"}[7d]) > 0.30
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Orders row count deviates > 30% from 7-day average"

Sampling Strategy — Head-Based vs Tail-Based for High-Throughput Pipelines

A Spark job processing 500 million records should not generate 500 million spans. Sampling is essential for keeping telemetry costs proportional to operational value. The two main strategies are head-based sampling (decide at span start, keep a fixed percentage) and tail-based sampling (decide after the full trace completes, keep based on outcome).

For data pipelines, tail-based sampling at the collector is almost always the right choice. A healthy daily pipeline run is not interesting — you want 100% retention for runs that errored, exceeded latency thresholds, or produced anomalous record counts. The collector configuration shown in the earlier section implements exactly this: all error-status traces plus a 10% probabilistic sample of healthy traces.

At the SDK level, set a parent-based sampler so that all spans within a sampled trace are retained together:

# Sampler configuration in the TracerProvider (Python SDK)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.sampling import (
    ParentBased,
    TraceIdRatioBased,
    ALWAYS_ON,
    ALWAYS_OFF,
)

# Head-based: sample 20% of root spans; child spans inherit the decision
head_sampler = ParentBased(
    root=TraceIdRatioBased(0.20),          # 20% of new traces
    remote_parent_sampled=ALWAYS_ON,        # always include if parent was sampled
    remote_parent_not_sampled=ALWAYS_OFF,   # drop if parent was not sampled
    local_parent_sampled=ALWAYS_ON,
    local_parent_not_sampled=ALWAYS_OFF,
)

# For error-preserving always-on sampling (when tail sampling is at the collector):
tracer_provider = TracerProvider(
    sampler=ALWAYS_ON,   # send everything to collector; collector decides
    resource=resource,
)
# Then configure tail_sampling processor in otel-collector-config.yaml (see above)

# ── Custom sampler: always keep error spans, sample the rest ──
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision
from opentelemetry.trace import SpanKind

class ErrorAlwaysSampler(Sampler):
    def __init__(self, ratio: float = 0.10):
        self._ratio_sampler = TraceIdRatioBased(ratio)

    def should_sample(self, parent_context, trace_id, name, kind, attributes, links):
        # Always sample if this span will be an error (pre-start hint via attributes)
        if attributes and attributes.get("pipeline.records.failed", 0) > 0:
            return SamplingResult(Decision.RECORD_AND_SAMPLE, attributes, [])
        return self._ratio_sampler.should_sample(
            parent_context, trace_id, name, kind, attributes, links
        )

    def get_description(self) -> str:
        return "ErrorAlwaysSampler"

Production Checklist — Twelve Things to Get Right Before You Ship

Observability instrumentation is easy to add and easy to do badly. Before you ship OTel instrumentation to production data pipelines, verify these twelve points:

Production observability checklist for data pipelines

 1. SDK bootstrap is idempotent
    → Call setup_otel() once per process; repeated calls create duplicate exporters.
    → Use module-level singletons (tracer, meter) — do not call get_tracer() per function.

 2. Spans always close (even on exception)
    → Always use "with tracer.start_as_current_span(...) as span:" context managers.
    → Avoid try/except blocks that swallow span.end() calls.

 3. Error status is set explicitly
    → span.set_status(StatusCode.ERROR, message) + span.record_exception(exc)
    → A span with no status is UNSET, not OK — be explicit.

 4. Record counts are set at span end, not start
    → You don't know how many records will be written until the write completes.
    → Set span.set_attribute("pipeline.records.written", n) just before span exits.

 5. Trace context is propagated across process boundaries
    → Kafka headers, XCom, S3 metadata — choose one canonical carrier per boundary.
    → Test propagation with a two-process integration test before production.

 6. Sensitive data never enters span attributes
    → PII (email, user IDs, IP addresses) must NOT appear in span attributes.
    → Use record counts, not record values. If you must debug data, use log references.

 7. Tail-based sampling is configured at the collector
    → Never send 100% of traces from a high-throughput job without collector sampling.
    → Keep all error traces; sample healthy traces at 5–20%.

 8. BatchSpanProcessor flush is called before process exit
    → tracer_provider.force_flush() + tracer_provider.shutdown()
    → Without this, spans buffered in memory are lost on clean exit (not just crashes).

 9. Metric instruments are created once, used many times
    → meter.create_counter() is expensive — do it at module import time, not per call.
    → Creating duplicate instruments with the same name raises a warning and may merge.

10. Data quality metrics use consistent label keys
    → dq.table, dq.column, dq.partition must be identical across all emit sites.
    → Inconsistent label cardinality breaks Prometheus queries and Grafana panels.

11. Collector availability does not block pipeline execution
    → OTel export errors must be caught and logged, not re-raised.
    → The BatchSpanProcessor already retries internally; your code should not add retries.

12. OTel adds < 1% overhead to pipeline wall-clock time
    → Benchmark a representative pipeline stage with and without instrumentation.
    → If overhead exceeds 1%, reduce attribute cardinality or increase batch size.

Note

The most common production issue with OTel in data pipelines is orphaned spans: spans that are started but never ended because an exception bypassed the context manager. Python's with statement guarantees __exit__ is always called — always use the context manager form over the explicit span.start() / span.end() API. In the OTel SDK, __exit__ calls span.end() even when the body raises — so exceptions are automatically captured and the span is properly closed with an ERROR status if you call record_exception().

Work with us

Running data pipelines in production and flying blind when a stage silently drops rows or exceeds its SLA?

We instrument data pipelines with OpenTelemetry — from Python ETL and Airflow DAG tracing with task-level spans and XCom context propagation to PySpark driver instrumentation, dbt artifact-to-span exporters, data quality metrics (freshness, completeness, validity) as OTel signals, OTel Collector tail-based sampling configuration, and Grafana Tempo + Prometheus SLO dashboards. Let’s talk.

Get in touch

Related Articles

DataSOps Consulting

Need help implementing this in production?

We build and operate data pipelines, AI systems, and observability stacks for engineering teams. Reach out for a free 30-minute architecture review.