Back to Blog
OpenLineageData LineageData ObservabilityMarquezAirflowdbtSparkData EngineeringMetadataOpen Source

OpenLineage — Dataset-Level Lineage, Facets, and Ecosystem Integration

A practical guide to OpenLineage, the open standard for data lineage: the RunEvent data model carrying a Job, a Run, and arrays of InputDataset and OutputDataset objects each identified by namespace and name, the facet extension mechanism where DatasetFacets attach schema field lists, DataQualityAssertions, and custom domain metadata while RunFacets carry nominalTime, parentRun, and errorMessage, the openlineage-python client library for emitting START, COMPLETE, FAIL, and ABORT lifecycle events with SchemaDatasetFacet, SQLJobFacet, JobTypeJobFacet, and NominalTimeRunFacet built-in facets from structured Python dataclasses, Marquez as the reference OpenLineage backend running on PostgreSQL with a REST graph traversal API at /api/v1/lineage and a web UI for visualizing job and dataset nodes, the apache-airflow-providers-openlineage package that hooks into Airflow listener callbacks to emit RunEvents for every task instance lifecycle transition without any DAG code changes, with BigQueryInsertJobOperator and SnowflakeOperator automatically populating input and output dataset namespaces and SQL facets, the openlineage-dbt integration that processes manifest.json and run_results.json post-dbt-run to emit per-model RunEvents with schema facets from compiled column metadata and FAIL events for failed models, openlineage-spark using SparkListener and QueryExecutionListener to extract input and output datasets from logical query plans at execution time including reads and writes to S3, Delta Lake, and Iceberg tables with column-level lineage from the resolved operator tree, custom facet authoring by extending BaseFacet with domain-specific fields for data quality metrics, cost attribution, PII classification, and pipeline context with _producer and _schemaURL registration, the /api/v1/lineage graph traversal API with nodeId, depth, and withDownstream parameters for impact analysis before schema changes, the Kafka transport alternative to HTTP for decoupling lineage emission from pipeline throughput, parent run facets for preserving hierarchical Airflow-to-dbt and Airflow-to-Spark lineage relationships, and a 10-point production checklist covering namespace convention consistency, Marquez PostgreSQL production setup with managed database and retention policies, Kafka transport for high-throughput pipelines, parent run facet injection for hierarchical lineage, Marquez API indexing for graph traversal, lineage coverage audits against the data catalog, column lineage for fine-grained schema change impact analysis, CI pipeline lineage validation, OpenLineage events as SLA monitoring data source, and custom facet schema versioning in a shared registry.

2026-07-05

Why a Lineage Standard Matters

Data lineage — knowing which dataset produced which other dataset, through which job, at what time — is the foundation of every incident response, compliance audit, and impact analysis in a data platform. The problem is not capturing lineage; every modern orchestration tool already emits some form of it. The problem is that each tool emits lineage in its own format. Airflow logs task dependencies in its metadata database. dbt writes a manifest JSON with model-to-model dependencies. Spark has its own lineage API. When a source table schema changes, tracing the impact requires querying three incompatible systems and manually correlating the results.

OpenLineage is an open standard — a specification, not a product — that defines a common event format for data lineage. Every tool that emits OpenLineage events produces the same JSON structure: a RunEvent containing a Job, a Run, and arrays of input and output Dataset objects. A backend that receives these events — Marquez, the reference implementation; or the OpenLineage integrations in Monte Carlo, which ingests OpenLineage events to populate its automated field-level lineage graph without requiring per-table configuration — can build a unified lineage graph spanning every tool in your stack, queryable through a single API.

Open Standard

A specification, not a vendor product. Any tool can emit events and any backend can consume them — vendor lock-in is structurally impossible.

Facet-Based

The core event carries minimal required fields. Rich metadata — column schemas, data quality assertions, SQL queries, custom domain attributes — attaches as typed facet objects.

Ecosystem Coverage

Native integrations for Airflow, dbt, Spark, Flink, Hive, and Trino. Custom integrations use the Python or Java client library to emit events from any pipeline code.

The OpenLineage Data Model

Every OpenLineage event is a RunEvent. The event type — START, COMPLETE, FAIL, or ABORT — marks the lifecycle transition being recorded. Three objects hang off every RunEvent:

  • Job — a logical processing step, identified by a namespace and name. An Airflow task, a dbt model run, a Spark application — each is a Job. Jobs persist across runs; their history is the sequence of Runs attributed to them.
  • Run — a single execution of a Job, identified by a UUID. Runs carry RunFacets: metadata specific to this execution, such as the nominal scheduled time, the parent run that triggered it, or an error message on failure.
  • Dataset — an input or output data source, identified by a namespace (e.g. bigquery://project.dataset) and name (e.g. table_name). Datasets carry DatasetFacets: schema, data quality assertions, data source connection details, and custom domain metadata.
# Minimal OpenLineage RunEvent — raw JSON structure
{
  "eventType": "COMPLETE",
  "eventTime": "2026-07-05T08:00:00.000Z",
  "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.0.0/client/python",
  "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json",
  "job": {
    "namespace": "airflow",
    "name": "my_dag.transform_orders"
  },
  "run": {
    "runId": "d3f2a9c4-1e5b-4c8d-9f2a-3b6c7e8d9f0a",
    "facets": {
      "nominalTime": {
        "_producer": "...",
        "_schemaURL": "...",
        "nominalStartTime": "2026-07-05T08:00:00Z",
        "nominalEndTime":   "2026-07-05T08:05:00Z"
      }
    }
  },
  "inputs": [
    {
      "namespace": "bigquery://my-project.raw",
      "name": "orders",
      "facets": {
        "schema": {
          "_producer": "...",
          "_schemaURL": "...",
          "fields": [
            { "name": "order_id",    "type": "INTEGER" },
            { "name": "customer_id", "type": "INTEGER" },
            { "name": "total",       "type": "FLOAT"   }
          ]
        }
      }
    }
  ],
  "outputs": [
    {
      "namespace": "bigquery://my-project.analytics",
      "name": "orders_daily",
      "facets": {
        "schema": {
          "_producer": "...",
          "_schemaURL": "...",
          "fields": [
            { "name": "date",         "type": "DATE"    },
            { "name": "total_orders", "type": "INTEGER" },
            { "name": "total_revenue","type": "FLOAT"   }
          ]
        }
      }
    }
  ]
}

Note

Every facet object must include _producer (a URI identifying the software that created the facet) and _schemaURL (a URI pointing to the JSON Schema that validates the facet). The _producer field is what allows backends like Marquez to distinguish facets emitted by the Airflow provider from those emitted by a custom Python integration — useful for debugging lineage gaps when different tools disagree on dataset identity.

Marquez: The Reference OpenLineage Backend

Marquez is the reference implementation of an OpenLineage backend — an open-source metadata service that stores RunEvents, builds a lineage graph, and exposes it through a REST API and web UI. It stores jobs, runs, and datasets in PostgreSQL and serves the lineage graph as a queryable API. Marquez is the easiest way to get started: run it locally with Docker Compose to validate your OpenLineage integrations before pointing them at a production backend.

# docker-compose.yml for local Marquez
version: "3"

services:
  marquez:
    image: marquezproject/marquez:latest
    ports:
      - "5000:5000"   # HTTP API
      - "5001:5001"   # Admin/health
    environment:
      MARQUEZ_PORT: 5000
      MARQUEZ_ADMIN_PORT: 5001
      MARQUEZ_DB_HOST: marquez-db
      MARQUEZ_DB_PORT: 5432
      MARQUEZ_DB_USER: marquez
      MARQUEZ_DB_PASSWORD: marquez
    depends_on:
      - marquez-db

  marquez-web:
    image: marquezproject/marquez-web:latest
    ports:
      - "3000:3000"
    environment:
      MARQUEZ_HOST: marquez
      MARQUEZ_PORT: 5000

  marquez-db:
    image: postgres:14
    environment:
      POSTGRES_USER: marquez
      POSTGRES_PASSWORD: marquez
      POSTGRES_DB: marquez
    volumes:
      - marquez-db-data:/var/lib/postgresql/data

volumes:
  marquez-db-data:
# Start Marquez
docker compose up -d

# Verify the API is ready
curl http://localhost:5000/api/v1/namespaces

# Web UI available at http://localhost:3000
# API docs at http://localhost:5000/api/v1/docs

Python Client: Emitting Lineage Events Directly

The openlineage-python client library provides Python classes for every RunEvent, facet, and dataset type defined in the specification. Use it when you have custom pipeline code — a Python ETL script, a custom Airflow operator, or a data processing tool without a native OpenLineage integration — and want to emit lineage events without serializing raw JSON by hand.

pip install openlineage-python
import uuid
from datetime import datetime, timezone
from openlineage.client import OpenLineageClient
from openlineage.client.event_v2 import (
    RunEvent, RunState, Run, Job,
    Dataset, InputDataset, OutputDataset,
)
from openlineage.client.facet_v2 import (
    job_type_job,
    schema_dataset,
    sql_job,
    nominal_time_run,
    parent_run,
    error_message_run,
    data_quality_assertions_dataset,
)

# Point the client at your Marquez (or other) backend
client = OpenLineageClient(url="http://localhost:5000")

# --- Emit START event when the job begins ---
run_id = str(uuid.uuid4())
start_time = datetime.now(timezone.utc).isoformat()

client.emit(
    RunEvent(
        eventType=RunState.START,
        eventTime=start_time,
        run=Run(
            runId=run_id,
            facets={
                "nominalTime": nominal_time_run.NominalTimeRunFacet(
                    nominalStartTime=start_time,
                ),
            },
        ),
        job=Job(
            namespace="my-pipeline",
            name="etl.transform_sessions",
            facets={
                "jobType": job_type_job.JobTypeJobFacet(
                    processingType="BATCH",
                    integration="PYTHON",
                    jobType="SCRIPT",
                ),
                "sql": sql_job.SQLJobFacet(
                    query="""
                        SELECT
                            session_id,
                            user_id,
                            SUM(page_views) AS total_views,
                            MIN(started_at)  AS session_start
                        FROM raw.sessions
                        WHERE DATE(started_at) = CURRENT_DATE
                        GROUP BY 1, 2
                    """
                ),
            },
        ),
        inputs=[
            InputDataset(
                namespace="postgresql://prod-db:5432/analytics",
                name="raw.sessions",
                facets={
                    "schema": schema_dataset.SchemaDatasetFacet(
                        fields=[
                            schema_dataset.SchemaDatasetFacetFields(name="session_id",  type="UUID"),
                            schema_dataset.SchemaDatasetFacetFields(name="user_id",     type="INTEGER"),
                            schema_dataset.SchemaDatasetFacetFields(name="page_views",  type="INTEGER"),
                            schema_dataset.SchemaDatasetFacetFields(name="started_at",  type="TIMESTAMP"),
                        ]
                    ),
                },
            )
        ],
        outputs=[
            OutputDataset(
                namespace="postgresql://prod-db:5432/analytics",
                name="mart.daily_sessions",
                facets={
                    "schema": schema_dataset.SchemaDatasetFacet(
                        fields=[
                            schema_dataset.SchemaDatasetFacetFields(name="session_id",   type="UUID"),
                            schema_dataset.SchemaDatasetFacetFields(name="user_id",      type="INTEGER"),
                            schema_dataset.SchemaDatasetFacetFields(name="total_views",  type="INTEGER"),
                            schema_dataset.SchemaDatasetFacetFields(name="session_start", type="TIMESTAMP"),
                        ]
                    ),
                },
            )
        ],
    )
)

# --- Run your actual ETL logic ---
try:
    # ... your pipeline code here ...
    pass

    # Emit COMPLETE event when finished
    client.emit(
        RunEvent(
            eventType=RunState.COMPLETE,
            eventTime=datetime.now(timezone.utc).isoformat(),
            run=Run(runId=run_id),
            job=Job(namespace="my-pipeline", name="etl.transform_sessions"),
            inputs=[],
            outputs=[],
        )
    )

except Exception as exc:
    # Emit FAIL event with error details
    client.emit(
        RunEvent(
            eventType=RunState.FAIL,
            eventTime=datetime.now(timezone.utc).isoformat(),
            run=Run(
                runId=run_id,
                facets={
                    "errorMessage": error_message_run.ErrorMessageRunFacet(
                        message=str(exc),
                        programmingLanguage="PYTHON",
                    ),
                },
            ),
            job=Job(namespace="my-pipeline", name="etl.transform_sessions"),
            inputs=[],
            outputs=[],
        )
    )
    raise

Note

The Python client supports two transport backends: HTTP (synchronous POST to the lineage API, default) and Kafka (async publish to a lineage topic). For high-throughput pipelines, the Kafka transport avoids blocking the pipeline on lineage API latency. Configure it via environment variables: OPENLINEAGE_URL for the HTTP endpoint or OPENLINEAGE_TRANSPORT_TYPE=kafka with OPENLINEAGE_TRANSPORT_TOPIC and OPENLINEAGE_TRANSPORT_KAFKA_CONFIG. A Kafka consumer on the backend side processes events asynchronously without coupling pipeline throughput to lineage storage latency.

Airflow Integration — Automatic DAG-Level Lineage

The apache-airflow-providers-openlineage package adds automatic lineage emission to Airflow without modifying any DAG code. It works through a listener mechanism: when a task instance transitions to RUNNING, SUCCESS, or FAILED, the provider constructs and emits the appropriate RunEvent. Operators that understand their own lineage — BigQueryInsertJobOperator, SnowflakeOperator, SparkSubmitOperator — emit input and output dataset metadata; others emit job and run metadata only.

The Airflow production guide covers task dependency design, sensor patterns, and backfill strategies — all of which benefit from OpenLineage integration because the lineage graph makes cross-DAG dependencies visible in Marquez without requiring teams to read each other's DAG code — when a shared dataset is an output of DAG A and an input of DAG B, Marquez surfaces that dependency automatically from the emitted events.

# Install the provider
pip install apache-airflow-providers-openlineage

# airflow.cfg — enable OpenLineage
[openlineage]
transport = {"type": "http", "url": "http://marquez:5000", "endpoint": "api/v1/lineage"}
namespace = airflow-production
disabled = false

# Or via environment variables (preferred for containerized deployments):
# AIRFLOW__OPENLINEAGE__TRANSPORT='{"type":"http","url":"http://marquez:5000","endpoint":"api/v1/lineage"}'
# AIRFLOW__OPENLINEAGE__NAMESPACE=airflow-production
# AIRFLOW__OPENLINEAGE__DISABLED=false
# Example DAG — lineage emitted automatically, no DAG code changes needed
from datetime import datetime
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
    dag_id="daily_revenue_pipeline",
    schedule="0 6 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    # BigQueryInsertJobOperator automatically emits:
    # - Input dataset: bigquery://project/dataset/orders
    # - Output dataset: bigquery://project/dataset/revenue_daily
    # - SQL query as a SQLJobFacet
    transform_revenue = BigQueryInsertJobOperator(
        task_id="transform_revenue",
        configuration={
            "query": {
                "query": """
                    INSERT INTO `my-project.analytics.revenue_daily`
                    SELECT
                        DATE(created_at)           AS date,
                        COUNT(*)                   AS order_count,
                        SUM(total_amount)          AS revenue
                    FROM `my-project.raw.orders`
                    WHERE DATE(created_at) = CURRENT_DATE - 1
                    GROUP BY 1
                """,
                "useLegacySql": False,
            }
        },
        gcp_conn_id="google_cloud_default",
    )
# Verify lineage was emitted — query Marquez API
curl "http://localhost:5000/api/v1/jobs/airflow-production/daily_revenue_pipeline.transform_revenue/runs"   | jq '.[0] | {runId: .id, state: .state, inputs: [.inputVersions[].datasetVersionId], outputs: [.outputVersions[].datasetVersionId]}'

# Get the full lineage graph for a dataset
curl "http://localhost:5000/api/v1/lineage?nodeId=dataset:bigquery://my-project.analytics:revenue_daily&depth=5"   | jq '.graph | keys'

dbt Integration — Model-Level and Column-Level Lineage

The openlineage-dbt integration parses dbt's manifest.json and run_results.json artifacts after a dbt run completes and emits OpenLineage events for each model. Because dbt already resolves the DAG of model dependencies, the integration can emit accurate input-output relationships — model.staging.orders reads from raw.orders and writes to analytics.orders_cleaned— without parsing SQL. The schema facet is populated from dbt's compiled column metadata, and the SQL job facet includes the compiled SQL for each model.

pip install openlineage-dbt

# Set environment variables before running dbt
export OPENLINEAGE_URL=http://marquez:5000
export OPENLINEAGE_NAMESPACE=dbt-production

# Run dbt and emit lineage in one command
dbt run --target prod

# Or emit lineage separately from existing artifacts (for CI integration)
dbt run --target prod
dbt-ol process --manifest target/manifest.json --run-results target/run_results.json
# profiles.yml — configure dbt target with openlineage
my_project:
  outputs:
    prod:
      type: bigquery
      method: service-account
      project: my-gcp-project
      dataset: analytics
      keyfile: /secrets/bq-service-account.json
      threads: 4
      timeout_seconds: 300

  target: prod
# GitHub Actions CI: emit lineage after dbt production run
name: dbt Production Run

on:
  schedule:
    - cron: "0 5 * * *"

jobs:
  dbt-run:
    runs-on: ubuntu-latest
    env:
      OPENLINEAGE_URL: ${{ secrets.MARQUEZ_URL }}
      OPENLINEAGE_NAMESPACE: dbt-production
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install dependencies
        run: pip install dbt-bigquery openlineage-dbt

      - name: Run dbt
        run: dbt run --target prod --profiles-dir .

      - name: Emit lineage
        run: dbt-ol process --manifest target/manifest.json --run-results target/run_results.json

      - name: Run dbt tests
        run: dbt test --target prod --profiles-dir .

Note

openlineage-dbt processes artifacts post-run, so it captures actual run outcomes — pass or fail per model — rather than planned lineage. When a model fails in a dbt run, its entry in run_results.json carries an error status, and the integration emits a FAIL RunEvent for that model. Marquez records the failure, making failed model runs queryable in the lineage graph alongside successful ones — useful for incident timelines that need to pinpoint which model failure caused downstream staleness.

Spark Integration — Job-Level Lineage from Query Plans

The openlineage-sparkintegration hooks into Spark's QueryExecutionListener and SparkListener APIs to extract lineage from logical query plans at execution time. It does not require modifying Spark job code — it infers input and output datasets from the resolved logical plan, including reads from S3, GCS, HDFS, Delta Lake tables, and Iceberg tables, and writes to the same targets. Column-level lineage is extracted from the resolved operator tree, showing which input columns map to which output columns through projections, aggregations, and joins.

# Spark submit with openlineage-spark JAR
spark-submit   --conf "spark.jars.packages=io.openlineage:openlineage-spark_2.12:1.9.0"   --conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"   --conf "spark.openlineage.transport.type=http"   --conf "spark.openlineage.transport.url=http://marquez:5000"   --conf "spark.openlineage.transport.endpoint=api/v1/lineage"   --conf "spark.openlineage.namespace=spark-cluster"   --conf "spark.openlineage.parentJobName=nightly-etl"   my_spark_job.py
# PySpark job — lineage extracted automatically from query plan
from pyspark.sql import SparkSession

spark = SparkSession.builder     .appName("daily-aggregations")     .getOrCreate()

# OpenLineage captures: reads from s3://my-bucket/raw/events/ as input dataset
raw_events = spark.read.parquet("s3://my-bucket/raw/events/date=2026-07-05/")

# Transformation — column lineage tracked through aggregation
daily_counts = raw_events     .filter("event_type = 'purchase'")     .groupBy("user_id", "product_id")     .agg(
        {"amount": "sum", "event_id": "count"}
    )     .withColumnRenamed("sum(amount)", "total_spend")     .withColumnRenamed("count(event_id)", "purchase_count")

# OpenLineage captures: writes to s3://my-bucket/analytics/daily-counts/ as output dataset
daily_counts.write     .mode("overwrite")     .partitionBy("user_id")     .parquet("s3://my-bucket/analytics/daily-counts/date=2026-07-05/")

spark.stop()
# For Databricks — add to cluster init script or cluster configuration
# Cluster spark config:
spark.jars.packages io.openlineage:openlineage-spark_2.12:1.9.0
spark.extraListeners io.openlineage.spark.agent.OpenLineageSparkListener
spark.openlineage.transport.type http
spark.openlineage.transport.url https://marquez.internal.example.com
spark.openlineage.transport.endpoint api/v1/lineage
spark.openlineage.namespace databricks-production

Custom Facets — Extending the Standard for Domain Metadata

Custom facets let you attach domain-specific metadata to any RunEvent, Job, Run, or Dataset using the same extensible structure as built-in facets. A data quality facet might carry the row count, null rate, and anomaly score for an output dataset. A compliance facet might carry PII classification and retention policy. A business facet might carry the team owner, cost center, and SLA tier. Any JSON-serializable structure is valid as a custom facet as long as it includes the required _producer and _schemaURL fields pointing to a schema you publish.

from openlineage.client.facet_v2 import BaseFacet
from dataclasses import dataclass, field
from typing import Optional, List


# Custom DatasetFacet for data quality metrics
@dataclass
class DataQualityFacet(BaseFacet):
    """Carries per-column quality metrics for a dataset."""
    rowCount: int
    nullCounts: dict          # {"column_name": null_count, ...}
    anomalyScore: float       # 0.0 (no anomaly) to 1.0 (severe)
    qualityChecks: List[dict] # [{"name": "...", "passed": bool, "value": ...}]

    def __init__(self, rowCount, nullCounts, anomalyScore, qualityChecks):
        super().__init__(
            _producer="https://github.com/my-org/data-platform/tree/main/lineage",
            _schemaURL="https://schemas.my-org.com/openlineage/data-quality-facet.json",
        )
        self.rowCount = rowCount
        self.nullCounts = nullCounts
        self.anomalyScore = anomalyScore
        self.qualityChecks = qualityChecks


# Custom RunFacet for pipeline environment context
@dataclass
class PipelineContextFacet(BaseFacet):
    """Records infrastructure context for cost attribution."""
    team: str
    costCenter: str
    environment: str           # prod, staging, dev
    clusterSize: str           # small, medium, large
    estimatedCostUsd: float

    def __init__(self, team, costCenter, environment, clusterSize, estimatedCostUsd):
        super().__init__(
            _producer="https://github.com/my-org/data-platform/tree/main/lineage",
            _schemaURL="https://schemas.my-org.com/openlineage/pipeline-context-facet.json",
        )
        self.team = team
        self.costCenter = costCenter
        self.environment = environment
        self.clusterSize = clusterSize
        self.estimatedCostUsd = estimatedCostUsd


# Use the custom facets when emitting events
import uuid
from datetime import datetime, timezone
from openlineage.client import OpenLineageClient
from openlineage.client.event_v2 import RunEvent, RunState, Run, Job, OutputDataset

client = OpenLineageClient(url="http://marquez:5000")
run_id = str(uuid.uuid4())

# After pipeline completes — attach quality and context facets to COMPLETE event
quality = DataQualityFacet(
    rowCount=1_482_301,
    nullCounts={"customer_id": 0, "amount": 12, "product_id": 3},
    anomalyScore=0.04,
    qualityChecks=[
        {"name": "row_count_within_range", "passed": True,  "value": 1482301},
        {"name": "null_rate_amount_below_threshold", "passed": True, "value": 0.0000081},
        {"name": "revenue_total_matches_source", "passed": True, "value": 4839201.50},
    ],
)

context = PipelineContextFacet(
    team="data-platform",
    costCenter="CC-1042",
    environment="prod",
    clusterSize="large",
    estimatedCostUsd=1.82,
)

client.emit(
    RunEvent(
        eventType=RunState.COMPLETE,
        eventTime=datetime.now(timezone.utc).isoformat(),
        run=Run(
            runId=run_id,
            facets={"pipelineContext": context},
        ),
        job=Job(namespace="my-pipeline", name="etl.daily_revenue"),
        inputs=[],
        outputs=[
            OutputDataset(
                namespace="bigquery://my-project.analytics",
                name="revenue_daily",
                facets={"dataQuality": quality},
            )
        ],
    )
)

Querying the Lineage Graph

Marquez exposes a graph traversal API at /api/v1/lineage. Given a node ID — either a job or a dataset — it returns the surrounding graph up to a specified depth. This is the API that powers impact analysis: when a source table schema changes, query the lineage graph rooted at that dataset with depth 5 to discover every downstream dataset and job affected before the schema change reaches production. The response is a graph of nodes and edges that you can render, alert on, or feed into a dependency validator in CI.

# Get upstream lineage for a dataset (what feeds into it)
curl "http://localhost:5000/api/v1/lineage?nodeId=dataset:bigquery://my-project.analytics:revenue_daily&depth=5&withDownstream=false"   | python3 -c "
import json, sys
data = json.load(sys.stdin)
for node in data['graph']:
    ntype = node.get('type', 'unknown')
    nid   = node.get('id', '')
    print(f'  [{ntype:8}] {nid}')
"

# Get downstream lineage (what depends on this dataset)
curl "http://localhost:5000/api/v1/lineage?nodeId=dataset:bigquery://my-project.raw:orders&depth=5&withDownstream=true"   | python3 -c "
import json, sys
data = json.load(sys.stdin)
datasets = [n for n in data['graph'] if n.get('type') == 'DATASET']
jobs     = [n for n in data['graph'] if n.get('type') == 'JOB']
print(f'Downstream datasets ({len(datasets)}):')
for d in datasets:
    print(f'  {d["id"]}')
print(f'Downstream jobs ({len(jobs)}):')
for j in jobs:
    print(f'  {j["id"]}')
"

# List all recent runs for a job
curl "http://localhost:5000/api/v1/jobs/airflow-production/daily_revenue_pipeline.transform_revenue/runs?limit=10"   | python3 -c "
import json, sys
runs = json.load(sys.stdin)
for r in runs:
    print(f'  {r["id"]}  state={r["state"]}  start={r.get("startedAt","-")}')
"

# Search for datasets matching a pattern
curl "http://localhost:5000/api/v1/search?q=orders&filter=dataset&limit=20"   | python3 -c "
import json, sys
results = json.load(sys.stdin)
for item in results.get('results', []):
    print(f'  {item["name"]}  ({item["namespace"]})')
"
# Python helper for impact analysis — run before schema changes
import requests

MARQUEZ_URL = "http://localhost:5000"

def get_downstream_impact(namespace: str, dataset_name: str, depth: int = 5) -> dict:
    """Returns all downstream datasets and jobs for a given dataset."""
    node_id = f"dataset:{namespace}:{dataset_name}"
    resp = requests.get(
        f"{MARQUEZ_URL}/api/v1/lineage",
        params={"nodeId": node_id, "depth": depth, "withDownstream": "true"},
        timeout=10,
    )
    resp.raise_for_status()
    graph = resp.json().get("graph", [])

    return {
        "datasets": [n["id"] for n in graph if n.get("type") == "DATASET" and n["id"] != node_id],
        "jobs": [n["id"] for n in graph if n.get("type") == "JOB"],
    }


# Before dropping or altering raw.orders:
impact = get_downstream_impact(
    namespace="bigquery://my-project.raw",
    dataset_name="orders",
)
print(f"Dropping raw.orders would affect:")
print(f"  {len(impact['datasets'])} downstream datasets")
print(f"  {len(impact['jobs'])} downstream jobs")
for d in impact["datasets"]:
    print(f"    dataset: {d}")
for j in impact["jobs"]:
    print(f"    job:     {j}")

Production Checklist

1

Use consistent namespace conventions across all integrations. The namespace is the primary key that links datasets across tools — a BigQuery table emitted from Airflow with namespace bigquery://project.dataset must use the exact same namespace string when the same table is emitted from dbt or Spark. Establish a namespace registry: bigquery://project.dataset for BigQuery, postgresql://host:5432/db for Postgres, s3://bucket/prefix for object storage. Document the convention before connecting the first integration, because renaming namespaces later requires re-emitting historical events.

2

Run Marquez with a production-grade PostgreSQL instance, not the default embedded one. Marquez stores every RunEvent, every dataset version, and every facet in Postgres. For a moderately active data platform emitting 10,000 events per day, the database will grow several gigabytes per month. Use a managed Postgres service (Cloud SQL, RDS, or Aurora) with automated backups, point-in-time recovery, and connection pooling via PgBouncer. Configure Marquez's retention policy to purge run events older than 90 days to bound storage growth while preserving dataset and job metadata indefinitely.

3

Use the Kafka transport for high-throughput pipelines. The HTTP transport is synchronous — the pipeline waits for the Marquez API to acknowledge each event. Under high concurrency, this can add hundreds of milliseconds of latency per task. The Kafka transport publishes events to a topic asynchronously and returns immediately. A separate Marquez consumer reads from the topic and writes to the database at its own pace. This decouples pipeline throughput from lineage storage capacity and makes the lineage emission resilient to Marquez downtime — events queue in Kafka until the backend recovers.

4

Emit parent run facets to preserve hierarchical lineage. When Airflow triggers a dbt run or a Spark job as a task, the child run's RunEvent should include a parentRun facet pointing to the Airflow task's run ID. Without it, Airflow and dbt lineage appear as disconnected subgraphs — you can see that dbt models ran and that Airflow tasks ran, but not that Airflow triggered the dbt run. The Airflow OpenLineage provider automatically injects parent run facets into child runs when using the SparkSubmitOperator or DbtCloudOperator. For custom operators, inject the parent run ID from the task instance context.

5

Index the Marquez API for your most common queries. The default Marquez schema has indexes on dataset name, job name, and run ID. Impact analysis queries that traverse the lineage graph by depth are expensive on large graphs without additional indexes. Add a composite index on the edges table (source_node_id, target_node_id) if you frequently query downstream lineage for popular datasets. Profile slow Marquez API calls with pg_stat_statements and add indexes before the graph grows too large to traverse efficiently.

6

Validate lineage coverage regularly with a completeness audit. Write a script that queries Marquez for all datasets that are outputs of any job run in the last 24 hours and cross-references them against your data catalog's list of expected production tables. Datasets in the catalog with no lineage entry indicate a missing integration — a Spark job running without the OpenLineage listener, a dbt project not uploading artifacts, or a custom ETL script that hasn't been instrumented. Run this audit daily and alert on coverage regressions.

7

Use the column lineage facet for fine-grained impact analysis of schema changes. The basic RunEvent records which datasets are inputs and outputs of a job. The column lineage facet records which specific columns in the output derive from which columns in the inputs. With column lineage, a schema change impact analysis can answer not just 'which tables depend on orders?' but 'which tables use the customer_id column from orders?'. The openlineage-spark integration extracts column lineage from Spark's logical plan automatically. For SQL-based pipelines, the openlineage-sql library parses SQL strings to extract column-level dependencies without executing the query.

8

Test lineage emission in your CI pipeline before merging pipeline changes. Add a lineage validation step to your CI workflow: after running the pipeline in a test environment pointed at a test Marquez instance, query Marquez to assert that the expected datasets appear as inputs and outputs, that schema facets contain the expected columns, and that no FAIL events were emitted for any run. This catches integration regressions — a refactored Spark job that drops the OpenLineage listener jar, a dbt model renamed without updating the corresponding Airflow task — before they reach production and create gaps in the lineage graph.

9

Use OpenLineage events as the data source for SLA monitoring. Every COMPLETE event carries a timestamp; every FAIL event carries an error message. A monitoring script that polls Marquez for jobs that have not emitted a COMPLETE event within their expected window can serve as a freshness SLA monitor — without any additional instrumentation in pipeline code. This gives you SLA monitoring for all pipelines that emit OpenLineage events, including third-party tools and custom scripts, using the lineage infrastructure you already have rather than a separate monitoring system.

10

Document custom facet schemas in a shared registry. When teams add custom facets for cost attribution, PII classification, or data quality, they need to publish the JSON Schema for those facets at the _schemaURL they include in events. Create a shared schema registry — a Git repository or an internal web server — that hosts all custom facet schemas with versioned URLs. When a custom facet schema changes, increment the version in the URL rather than modifying the existing schema, so that events emitted with the old schema remain valid and old consumers continue to parse them correctly.

Your data incidents require hours of manual tracing through Airflow logs, dbt manifest files, and warehouse query history to identify which downstream tables are affected by a source schema change, your compliance team cannot answer which pipelines touch PII columns without reading every ETL script, or your data catalog shows dataset names but cannot tell you what process produced each dataset or what depends on it?

We design and implement OpenLineage programs — from namespace convention design and Marquez production deployment on managed PostgreSQL with retention policies and PgBouncer connection pooling, through apache-airflow-providers-openlineage installation and transport configuration for HTTP and Kafka backends, openlineage-dbt artifact upload pipeline integration in GitHub Actions CI after dbt production runs, openlineage-spark listener JAR configuration for Databricks clusters and on-premise Spark submit jobs with S3 and Iceberg dataset tracking, custom facet design for data quality metrics, cost attribution, and PII classification with JSON Schema registry setup, graph traversal API integration for automated schema change impact analysis before migrations, parent run facet wiring for hierarchical Airflow-to-dbt and Airflow-to-Spark lineage, lineage coverage audit scripts that cross-reference Marquez against your data catalog and alert on gaps, and CI lineage validation steps that assert expected input and output datasets appear in Marquez after every pipeline deployment. Let’s talk.

Let's Talk

Related Articles

DataSOps Consulting

Need help implementing this in production?

We build and operate data pipelines, AI systems, and observability stacks for engineering teams. Reach out for a free 30-minute architecture review.