Why a Lineage Standard Matters
Data lineage — knowing which dataset produced which other dataset, through which job, at what time — is the foundation of every incident response, compliance audit, and impact analysis in a data platform. The problem is not capturing lineage; every modern orchestration tool already emits some form of it. The problem is that each tool emits lineage in its own format. Airflow logs task dependencies in its metadata database. dbt writes a manifest JSON with model-to-model dependencies. Spark has its own lineage API. When a source table schema changes, tracing the impact requires querying three incompatible systems and manually correlating the results.
OpenLineage is an open standard — a specification, not a product — that defines a common event format for data lineage. Every tool that emits OpenLineage events produces the same JSON structure: a RunEvent containing a Job, a Run, and arrays of input and output Dataset objects. A backend that receives these events — Marquez, the reference implementation; or the OpenLineage integrations in Monte Carlo, which ingests OpenLineage events to populate its automated field-level lineage graph without requiring per-table configuration — can build a unified lineage graph spanning every tool in your stack, queryable through a single API.
Open Standard
A specification, not a vendor product. Any tool can emit events and any backend can consume them — vendor lock-in is structurally impossible.
Facet-Based
The core event carries minimal required fields. Rich metadata — column schemas, data quality assertions, SQL queries, custom domain attributes — attaches as typed facet objects.
Ecosystem Coverage
Native integrations for Airflow, dbt, Spark, Flink, Hive, and Trino. Custom integrations use the Python or Java client library to emit events from any pipeline code.
The OpenLineage Data Model
Every OpenLineage event is a RunEvent. The event type — START, COMPLETE, FAIL, or ABORT — marks the lifecycle transition being recorded. Three objects hang off every RunEvent:
- →Job — a logical processing step, identified by a namespace and name. An Airflow task, a dbt model run, a Spark application — each is a Job. Jobs persist across runs; their history is the sequence of Runs attributed to them.
- →Run — a single execution of a Job, identified by a UUID. Runs carry
RunFacets: metadata specific to this execution, such as the nominal scheduled time, the parent run that triggered it, or an error message on failure. - →Dataset — an input or output data source, identified by a namespace (e.g.
bigquery://project.dataset) and name (e.g.table_name). Datasets carryDatasetFacets: schema, data quality assertions, data source connection details, and custom domain metadata.
# Minimal OpenLineage RunEvent — raw JSON structure
{
"eventType": "COMPLETE",
"eventTime": "2026-07-05T08:00:00.000Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.0.0/client/python",
"schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json",
"job": {
"namespace": "airflow",
"name": "my_dag.transform_orders"
},
"run": {
"runId": "d3f2a9c4-1e5b-4c8d-9f2a-3b6c7e8d9f0a",
"facets": {
"nominalTime": {
"_producer": "...",
"_schemaURL": "...",
"nominalStartTime": "2026-07-05T08:00:00Z",
"nominalEndTime": "2026-07-05T08:05:00Z"
}
}
},
"inputs": [
{
"namespace": "bigquery://my-project.raw",
"name": "orders",
"facets": {
"schema": {
"_producer": "...",
"_schemaURL": "...",
"fields": [
{ "name": "order_id", "type": "INTEGER" },
{ "name": "customer_id", "type": "INTEGER" },
{ "name": "total", "type": "FLOAT" }
]
}
}
}
],
"outputs": [
{
"namespace": "bigquery://my-project.analytics",
"name": "orders_daily",
"facets": {
"schema": {
"_producer": "...",
"_schemaURL": "...",
"fields": [
{ "name": "date", "type": "DATE" },
{ "name": "total_orders", "type": "INTEGER" },
{ "name": "total_revenue","type": "FLOAT" }
]
}
}
}
]
}Note
_producer (a URI identifying the software that created the facet) and _schemaURL (a URI pointing to the JSON Schema that validates the facet). The _producer field is what allows backends like Marquez to distinguish facets emitted by the Airflow provider from those emitted by a custom Python integration — useful for debugging lineage gaps when different tools disagree on dataset identity.Marquez: The Reference OpenLineage Backend
Marquez is the reference implementation of an OpenLineage backend — an open-source metadata service that stores RunEvents, builds a lineage graph, and exposes it through a REST API and web UI. It stores jobs, runs, and datasets in PostgreSQL and serves the lineage graph as a queryable API. Marquez is the easiest way to get started: run it locally with Docker Compose to validate your OpenLineage integrations before pointing them at a production backend.
# docker-compose.yml for local Marquez
version: "3"
services:
marquez:
image: marquezproject/marquez:latest
ports:
- "5000:5000" # HTTP API
- "5001:5001" # Admin/health
environment:
MARQUEZ_PORT: 5000
MARQUEZ_ADMIN_PORT: 5001
MARQUEZ_DB_HOST: marquez-db
MARQUEZ_DB_PORT: 5432
MARQUEZ_DB_USER: marquez
MARQUEZ_DB_PASSWORD: marquez
depends_on:
- marquez-db
marquez-web:
image: marquezproject/marquez-web:latest
ports:
- "3000:3000"
environment:
MARQUEZ_HOST: marquez
MARQUEZ_PORT: 5000
marquez-db:
image: postgres:14
environment:
POSTGRES_USER: marquez
POSTGRES_PASSWORD: marquez
POSTGRES_DB: marquez
volumes:
- marquez-db-data:/var/lib/postgresql/data
volumes:
marquez-db-data:# Start Marquez
docker compose up -d
# Verify the API is ready
curl http://localhost:5000/api/v1/namespaces
# Web UI available at http://localhost:3000
# API docs at http://localhost:5000/api/v1/docsPython Client: Emitting Lineage Events Directly
The openlineage-python client library provides Python classes for every RunEvent, facet, and dataset type defined in the specification. Use it when you have custom pipeline code — a Python ETL script, a custom Airflow operator, or a data processing tool without a native OpenLineage integration — and want to emit lineage events without serializing raw JSON by hand.
pip install openlineage-pythonimport uuid
from datetime import datetime, timezone
from openlineage.client import OpenLineageClient
from openlineage.client.event_v2 import (
RunEvent, RunState, Run, Job,
Dataset, InputDataset, OutputDataset,
)
from openlineage.client.facet_v2 import (
job_type_job,
schema_dataset,
sql_job,
nominal_time_run,
parent_run,
error_message_run,
data_quality_assertions_dataset,
)
# Point the client at your Marquez (or other) backend
client = OpenLineageClient(url="http://localhost:5000")
# --- Emit START event when the job begins ---
run_id = str(uuid.uuid4())
start_time = datetime.now(timezone.utc).isoformat()
client.emit(
RunEvent(
eventType=RunState.START,
eventTime=start_time,
run=Run(
runId=run_id,
facets={
"nominalTime": nominal_time_run.NominalTimeRunFacet(
nominalStartTime=start_time,
),
},
),
job=Job(
namespace="my-pipeline",
name="etl.transform_sessions",
facets={
"jobType": job_type_job.JobTypeJobFacet(
processingType="BATCH",
integration="PYTHON",
jobType="SCRIPT",
),
"sql": sql_job.SQLJobFacet(
query="""
SELECT
session_id,
user_id,
SUM(page_views) AS total_views,
MIN(started_at) AS session_start
FROM raw.sessions
WHERE DATE(started_at) = CURRENT_DATE
GROUP BY 1, 2
"""
),
},
),
inputs=[
InputDataset(
namespace="postgresql://prod-db:5432/analytics",
name="raw.sessions",
facets={
"schema": schema_dataset.SchemaDatasetFacet(
fields=[
schema_dataset.SchemaDatasetFacetFields(name="session_id", type="UUID"),
schema_dataset.SchemaDatasetFacetFields(name="user_id", type="INTEGER"),
schema_dataset.SchemaDatasetFacetFields(name="page_views", type="INTEGER"),
schema_dataset.SchemaDatasetFacetFields(name="started_at", type="TIMESTAMP"),
]
),
},
)
],
outputs=[
OutputDataset(
namespace="postgresql://prod-db:5432/analytics",
name="mart.daily_sessions",
facets={
"schema": schema_dataset.SchemaDatasetFacet(
fields=[
schema_dataset.SchemaDatasetFacetFields(name="session_id", type="UUID"),
schema_dataset.SchemaDatasetFacetFields(name="user_id", type="INTEGER"),
schema_dataset.SchemaDatasetFacetFields(name="total_views", type="INTEGER"),
schema_dataset.SchemaDatasetFacetFields(name="session_start", type="TIMESTAMP"),
]
),
},
)
],
)
)
# --- Run your actual ETL logic ---
try:
# ... your pipeline code here ...
pass
# Emit COMPLETE event when finished
client.emit(
RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.now(timezone.utc).isoformat(),
run=Run(runId=run_id),
job=Job(namespace="my-pipeline", name="etl.transform_sessions"),
inputs=[],
outputs=[],
)
)
except Exception as exc:
# Emit FAIL event with error details
client.emit(
RunEvent(
eventType=RunState.FAIL,
eventTime=datetime.now(timezone.utc).isoformat(),
run=Run(
runId=run_id,
facets={
"errorMessage": error_message_run.ErrorMessageRunFacet(
message=str(exc),
programmingLanguage="PYTHON",
),
},
),
job=Job(namespace="my-pipeline", name="etl.transform_sessions"),
inputs=[],
outputs=[],
)
)
raiseNote
OPENLINEAGE_URL for the HTTP endpoint or OPENLINEAGE_TRANSPORT_TYPE=kafka with OPENLINEAGE_TRANSPORT_TOPIC and OPENLINEAGE_TRANSPORT_KAFKA_CONFIG. A Kafka consumer on the backend side processes events asynchronously without coupling pipeline throughput to lineage storage latency.Airflow Integration — Automatic DAG-Level Lineage
The apache-airflow-providers-openlineage package adds automatic lineage emission to Airflow without modifying any DAG code. It works through a listener mechanism: when a task instance transitions to RUNNING, SUCCESS, or FAILED, the provider constructs and emits the appropriate RunEvent. Operators that understand their own lineage — BigQueryInsertJobOperator, SnowflakeOperator, SparkSubmitOperator — emit input and output dataset metadata; others emit job and run metadata only.
The Airflow production guide covers task dependency design, sensor patterns, and backfill strategies — all of which benefit from OpenLineage integration because the lineage graph makes cross-DAG dependencies visible in Marquez without requiring teams to read each other's DAG code — when a shared dataset is an output of DAG A and an input of DAG B, Marquez surfaces that dependency automatically from the emitted events.
# Install the provider
pip install apache-airflow-providers-openlineage
# airflow.cfg — enable OpenLineage
[openlineage]
transport = {"type": "http", "url": "http://marquez:5000", "endpoint": "api/v1/lineage"}
namespace = airflow-production
disabled = false
# Or via environment variables (preferred for containerized deployments):
# AIRFLOW__OPENLINEAGE__TRANSPORT='{"type":"http","url":"http://marquez:5000","endpoint":"api/v1/lineage"}'
# AIRFLOW__OPENLINEAGE__NAMESPACE=airflow-production
# AIRFLOW__OPENLINEAGE__DISABLED=false# Example DAG — lineage emitted automatically, no DAG code changes needed
from datetime import datetime
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
dag_id="daily_revenue_pipeline",
schedule="0 6 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
# BigQueryInsertJobOperator automatically emits:
# - Input dataset: bigquery://project/dataset/orders
# - Output dataset: bigquery://project/dataset/revenue_daily
# - SQL query as a SQLJobFacet
transform_revenue = BigQueryInsertJobOperator(
task_id="transform_revenue",
configuration={
"query": {
"query": """
INSERT INTO `my-project.analytics.revenue_daily`
SELECT
DATE(created_at) AS date,
COUNT(*) AS order_count,
SUM(total_amount) AS revenue
FROM `my-project.raw.orders`
WHERE DATE(created_at) = CURRENT_DATE - 1
GROUP BY 1
""",
"useLegacySql": False,
}
},
gcp_conn_id="google_cloud_default",
)# Verify lineage was emitted — query Marquez API
curl "http://localhost:5000/api/v1/jobs/airflow-production/daily_revenue_pipeline.transform_revenue/runs" | jq '.[0] | {runId: .id, state: .state, inputs: [.inputVersions[].datasetVersionId], outputs: [.outputVersions[].datasetVersionId]}'
# Get the full lineage graph for a dataset
curl "http://localhost:5000/api/v1/lineage?nodeId=dataset:bigquery://my-project.analytics:revenue_daily&depth=5" | jq '.graph | keys'dbt Integration — Model-Level and Column-Level Lineage
The openlineage-dbt integration parses dbt's manifest.json and run_results.json artifacts after a dbt run completes and emits OpenLineage events for each model. Because dbt already resolves the DAG of model dependencies, the integration can emit accurate input-output relationships — model.staging.orders reads from raw.orders and writes to analytics.orders_cleaned— without parsing SQL. The schema facet is populated from dbt's compiled column metadata, and the SQL job facet includes the compiled SQL for each model.
pip install openlineage-dbt
# Set environment variables before running dbt
export OPENLINEAGE_URL=http://marquez:5000
export OPENLINEAGE_NAMESPACE=dbt-production
# Run dbt and emit lineage in one command
dbt run --target prod
# Or emit lineage separately from existing artifacts (for CI integration)
dbt run --target prod
dbt-ol process --manifest target/manifest.json --run-results target/run_results.json# profiles.yml — configure dbt target with openlineage
my_project:
outputs:
prod:
type: bigquery
method: service-account
project: my-gcp-project
dataset: analytics
keyfile: /secrets/bq-service-account.json
threads: 4
timeout_seconds: 300
target: prod# GitHub Actions CI: emit lineage after dbt production run
name: dbt Production Run
on:
schedule:
- cron: "0 5 * * *"
jobs:
dbt-run:
runs-on: ubuntu-latest
env:
OPENLINEAGE_URL: ${{ secrets.MARQUEZ_URL }}
OPENLINEAGE_NAMESPACE: dbt-production
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: pip install dbt-bigquery openlineage-dbt
- name: Run dbt
run: dbt run --target prod --profiles-dir .
- name: Emit lineage
run: dbt-ol process --manifest target/manifest.json --run-results target/run_results.json
- name: Run dbt tests
run: dbt test --target prod --profiles-dir .Note
openlineage-dbt processes artifacts post-run, so it captures actual run outcomes — pass or fail per model — rather than planned lineage. When a model fails in a dbt run, its entry in run_results.json carries an error status, and the integration emits a FAIL RunEvent for that model. Marquez records the failure, making failed model runs queryable in the lineage graph alongside successful ones — useful for incident timelines that need to pinpoint which model failure caused downstream staleness.Spark Integration — Job-Level Lineage from Query Plans
The openlineage-sparkintegration hooks into Spark's QueryExecutionListener and SparkListener APIs to extract lineage from logical query plans at execution time. It does not require modifying Spark job code — it infers input and output datasets from the resolved logical plan, including reads from S3, GCS, HDFS, Delta Lake tables, and Iceberg tables, and writes to the same targets. Column-level lineage is extracted from the resolved operator tree, showing which input columns map to which output columns through projections, aggregations, and joins.
# Spark submit with openlineage-spark JAR
spark-submit --conf "spark.jars.packages=io.openlineage:openlineage-spark_2.12:1.9.0" --conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener" --conf "spark.openlineage.transport.type=http" --conf "spark.openlineage.transport.url=http://marquez:5000" --conf "spark.openlineage.transport.endpoint=api/v1/lineage" --conf "spark.openlineage.namespace=spark-cluster" --conf "spark.openlineage.parentJobName=nightly-etl" my_spark_job.py# PySpark job — lineage extracted automatically from query plan
from pyspark.sql import SparkSession
spark = SparkSession.builder .appName("daily-aggregations") .getOrCreate()
# OpenLineage captures: reads from s3://my-bucket/raw/events/ as input dataset
raw_events = spark.read.parquet("s3://my-bucket/raw/events/date=2026-07-05/")
# Transformation — column lineage tracked through aggregation
daily_counts = raw_events .filter("event_type = 'purchase'") .groupBy("user_id", "product_id") .agg(
{"amount": "sum", "event_id": "count"}
) .withColumnRenamed("sum(amount)", "total_spend") .withColumnRenamed("count(event_id)", "purchase_count")
# OpenLineage captures: writes to s3://my-bucket/analytics/daily-counts/ as output dataset
daily_counts.write .mode("overwrite") .partitionBy("user_id") .parquet("s3://my-bucket/analytics/daily-counts/date=2026-07-05/")
spark.stop()# For Databricks — add to cluster init script or cluster configuration
# Cluster spark config:
spark.jars.packages io.openlineage:openlineage-spark_2.12:1.9.0
spark.extraListeners io.openlineage.spark.agent.OpenLineageSparkListener
spark.openlineage.transport.type http
spark.openlineage.transport.url https://marquez.internal.example.com
spark.openlineage.transport.endpoint api/v1/lineage
spark.openlineage.namespace databricks-productionCustom Facets — Extending the Standard for Domain Metadata
Custom facets let you attach domain-specific metadata to any RunEvent, Job, Run, or Dataset using the same extensible structure as built-in facets. A data quality facet might carry the row count, null rate, and anomaly score for an output dataset. A compliance facet might carry PII classification and retention policy. A business facet might carry the team owner, cost center, and SLA tier. Any JSON-serializable structure is valid as a custom facet as long as it includes the required _producer and _schemaURL fields pointing to a schema you publish.
from openlineage.client.facet_v2 import BaseFacet
from dataclasses import dataclass, field
from typing import Optional, List
# Custom DatasetFacet for data quality metrics
@dataclass
class DataQualityFacet(BaseFacet):
"""Carries per-column quality metrics for a dataset."""
rowCount: int
nullCounts: dict # {"column_name": null_count, ...}
anomalyScore: float # 0.0 (no anomaly) to 1.0 (severe)
qualityChecks: List[dict] # [{"name": "...", "passed": bool, "value": ...}]
def __init__(self, rowCount, nullCounts, anomalyScore, qualityChecks):
super().__init__(
_producer="https://github.com/my-org/data-platform/tree/main/lineage",
_schemaURL="https://schemas.my-org.com/openlineage/data-quality-facet.json",
)
self.rowCount = rowCount
self.nullCounts = nullCounts
self.anomalyScore = anomalyScore
self.qualityChecks = qualityChecks
# Custom RunFacet for pipeline environment context
@dataclass
class PipelineContextFacet(BaseFacet):
"""Records infrastructure context for cost attribution."""
team: str
costCenter: str
environment: str # prod, staging, dev
clusterSize: str # small, medium, large
estimatedCostUsd: float
def __init__(self, team, costCenter, environment, clusterSize, estimatedCostUsd):
super().__init__(
_producer="https://github.com/my-org/data-platform/tree/main/lineage",
_schemaURL="https://schemas.my-org.com/openlineage/pipeline-context-facet.json",
)
self.team = team
self.costCenter = costCenter
self.environment = environment
self.clusterSize = clusterSize
self.estimatedCostUsd = estimatedCostUsd
# Use the custom facets when emitting events
import uuid
from datetime import datetime, timezone
from openlineage.client import OpenLineageClient
from openlineage.client.event_v2 import RunEvent, RunState, Run, Job, OutputDataset
client = OpenLineageClient(url="http://marquez:5000")
run_id = str(uuid.uuid4())
# After pipeline completes — attach quality and context facets to COMPLETE event
quality = DataQualityFacet(
rowCount=1_482_301,
nullCounts={"customer_id": 0, "amount": 12, "product_id": 3},
anomalyScore=0.04,
qualityChecks=[
{"name": "row_count_within_range", "passed": True, "value": 1482301},
{"name": "null_rate_amount_below_threshold", "passed": True, "value": 0.0000081},
{"name": "revenue_total_matches_source", "passed": True, "value": 4839201.50},
],
)
context = PipelineContextFacet(
team="data-platform",
costCenter="CC-1042",
environment="prod",
clusterSize="large",
estimatedCostUsd=1.82,
)
client.emit(
RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.now(timezone.utc).isoformat(),
run=Run(
runId=run_id,
facets={"pipelineContext": context},
),
job=Job(namespace="my-pipeline", name="etl.daily_revenue"),
inputs=[],
outputs=[
OutputDataset(
namespace="bigquery://my-project.analytics",
name="revenue_daily",
facets={"dataQuality": quality},
)
],
)
)Querying the Lineage Graph
Marquez exposes a graph traversal API at /api/v1/lineage. Given a node ID — either a job or a dataset — it returns the surrounding graph up to a specified depth. This is the API that powers impact analysis: when a source table schema changes, query the lineage graph rooted at that dataset with depth 5 to discover every downstream dataset and job affected before the schema change reaches production. The response is a graph of nodes and edges that you can render, alert on, or feed into a dependency validator in CI.
# Get upstream lineage for a dataset (what feeds into it)
curl "http://localhost:5000/api/v1/lineage?nodeId=dataset:bigquery://my-project.analytics:revenue_daily&depth=5&withDownstream=false" | python3 -c "
import json, sys
data = json.load(sys.stdin)
for node in data['graph']:
ntype = node.get('type', 'unknown')
nid = node.get('id', '')
print(f' [{ntype:8}] {nid}')
"
# Get downstream lineage (what depends on this dataset)
curl "http://localhost:5000/api/v1/lineage?nodeId=dataset:bigquery://my-project.raw:orders&depth=5&withDownstream=true" | python3 -c "
import json, sys
data = json.load(sys.stdin)
datasets = [n for n in data['graph'] if n.get('type') == 'DATASET']
jobs = [n for n in data['graph'] if n.get('type') == 'JOB']
print(f'Downstream datasets ({len(datasets)}):')
for d in datasets:
print(f' {d["id"]}')
print(f'Downstream jobs ({len(jobs)}):')
for j in jobs:
print(f' {j["id"]}')
"
# List all recent runs for a job
curl "http://localhost:5000/api/v1/jobs/airflow-production/daily_revenue_pipeline.transform_revenue/runs?limit=10" | python3 -c "
import json, sys
runs = json.load(sys.stdin)
for r in runs:
print(f' {r["id"]} state={r["state"]} start={r.get("startedAt","-")}')
"
# Search for datasets matching a pattern
curl "http://localhost:5000/api/v1/search?q=orders&filter=dataset&limit=20" | python3 -c "
import json, sys
results = json.load(sys.stdin)
for item in results.get('results', []):
print(f' {item["name"]} ({item["namespace"]})')
"# Python helper for impact analysis — run before schema changes
import requests
MARQUEZ_URL = "http://localhost:5000"
def get_downstream_impact(namespace: str, dataset_name: str, depth: int = 5) -> dict:
"""Returns all downstream datasets and jobs for a given dataset."""
node_id = f"dataset:{namespace}:{dataset_name}"
resp = requests.get(
f"{MARQUEZ_URL}/api/v1/lineage",
params={"nodeId": node_id, "depth": depth, "withDownstream": "true"},
timeout=10,
)
resp.raise_for_status()
graph = resp.json().get("graph", [])
return {
"datasets": [n["id"] for n in graph if n.get("type") == "DATASET" and n["id"] != node_id],
"jobs": [n["id"] for n in graph if n.get("type") == "JOB"],
}
# Before dropping or altering raw.orders:
impact = get_downstream_impact(
namespace="bigquery://my-project.raw",
dataset_name="orders",
)
print(f"Dropping raw.orders would affect:")
print(f" {len(impact['datasets'])} downstream datasets")
print(f" {len(impact['jobs'])} downstream jobs")
for d in impact["datasets"]:
print(f" dataset: {d}")
for j in impact["jobs"]:
print(f" job: {j}")Production Checklist
Use consistent namespace conventions across all integrations. The namespace is the primary key that links datasets across tools — a BigQuery table emitted from Airflow with namespace bigquery://project.dataset must use the exact same namespace string when the same table is emitted from dbt or Spark. Establish a namespace registry: bigquery://project.dataset for BigQuery, postgresql://host:5432/db for Postgres, s3://bucket/prefix for object storage. Document the convention before connecting the first integration, because renaming namespaces later requires re-emitting historical events.
Run Marquez with a production-grade PostgreSQL instance, not the default embedded one. Marquez stores every RunEvent, every dataset version, and every facet in Postgres. For a moderately active data platform emitting 10,000 events per day, the database will grow several gigabytes per month. Use a managed Postgres service (Cloud SQL, RDS, or Aurora) with automated backups, point-in-time recovery, and connection pooling via PgBouncer. Configure Marquez's retention policy to purge run events older than 90 days to bound storage growth while preserving dataset and job metadata indefinitely.
Use the Kafka transport for high-throughput pipelines. The HTTP transport is synchronous — the pipeline waits for the Marquez API to acknowledge each event. Under high concurrency, this can add hundreds of milliseconds of latency per task. The Kafka transport publishes events to a topic asynchronously and returns immediately. A separate Marquez consumer reads from the topic and writes to the database at its own pace. This decouples pipeline throughput from lineage storage capacity and makes the lineage emission resilient to Marquez downtime — events queue in Kafka until the backend recovers.
Emit parent run facets to preserve hierarchical lineage. When Airflow triggers a dbt run or a Spark job as a task, the child run's RunEvent should include a parentRun facet pointing to the Airflow task's run ID. Without it, Airflow and dbt lineage appear as disconnected subgraphs — you can see that dbt models ran and that Airflow tasks ran, but not that Airflow triggered the dbt run. The Airflow OpenLineage provider automatically injects parent run facets into child runs when using the SparkSubmitOperator or DbtCloudOperator. For custom operators, inject the parent run ID from the task instance context.
Index the Marquez API for your most common queries. The default Marquez schema has indexes on dataset name, job name, and run ID. Impact analysis queries that traverse the lineage graph by depth are expensive on large graphs without additional indexes. Add a composite index on the edges table (source_node_id, target_node_id) if you frequently query downstream lineage for popular datasets. Profile slow Marquez API calls with pg_stat_statements and add indexes before the graph grows too large to traverse efficiently.
Validate lineage coverage regularly with a completeness audit. Write a script that queries Marquez for all datasets that are outputs of any job run in the last 24 hours and cross-references them against your data catalog's list of expected production tables. Datasets in the catalog with no lineage entry indicate a missing integration — a Spark job running without the OpenLineage listener, a dbt project not uploading artifacts, or a custom ETL script that hasn't been instrumented. Run this audit daily and alert on coverage regressions.
Use the column lineage facet for fine-grained impact analysis of schema changes. The basic RunEvent records which datasets are inputs and outputs of a job. The column lineage facet records which specific columns in the output derive from which columns in the inputs. With column lineage, a schema change impact analysis can answer not just 'which tables depend on orders?' but 'which tables use the customer_id column from orders?'. The openlineage-spark integration extracts column lineage from Spark's logical plan automatically. For SQL-based pipelines, the openlineage-sql library parses SQL strings to extract column-level dependencies without executing the query.
Test lineage emission in your CI pipeline before merging pipeline changes. Add a lineage validation step to your CI workflow: after running the pipeline in a test environment pointed at a test Marquez instance, query Marquez to assert that the expected datasets appear as inputs and outputs, that schema facets contain the expected columns, and that no FAIL events were emitted for any run. This catches integration regressions — a refactored Spark job that drops the OpenLineage listener jar, a dbt model renamed without updating the corresponding Airflow task — before they reach production and create gaps in the lineage graph.
Use OpenLineage events as the data source for SLA monitoring. Every COMPLETE event carries a timestamp; every FAIL event carries an error message. A monitoring script that polls Marquez for jobs that have not emitted a COMPLETE event within their expected window can serve as a freshness SLA monitor — without any additional instrumentation in pipeline code. This gives you SLA monitoring for all pipelines that emit OpenLineage events, including third-party tools and custom scripts, using the lineage infrastructure you already have rather than a separate monitoring system.
Document custom facet schemas in a shared registry. When teams add custom facets for cost attribution, PII classification, or data quality, they need to publish the JSON Schema for those facets at the _schemaURL they include in events. Create a shared schema registry — a Git repository or an internal web server — that hosts all custom facet schemas with versioned URLs. When a custom facet schema changes, increment the version in the URL rather than modifying the existing schema, so that events emitted with the old schema remain valid and old consumers continue to parse them correctly.
Your data incidents require hours of manual tracing through Airflow logs, dbt manifest files, and warehouse query history to identify which downstream tables are affected by a source schema change, your compliance team cannot answer which pipelines touch PII columns without reading every ETL script, or your data catalog shows dataset names but cannot tell you what process produced each dataset or what depends on it?
We design and implement OpenLineage programs — from namespace convention design and Marquez production deployment on managed PostgreSQL with retention policies and PgBouncer connection pooling, through apache-airflow-providers-openlineage installation and transport configuration for HTTP and Kafka backends, openlineage-dbt artifact upload pipeline integration in GitHub Actions CI after dbt production runs, openlineage-spark listener JAR configuration for Databricks clusters and on-premise Spark submit jobs with S3 and Iceberg dataset tracking, custom facet design for data quality metrics, cost attribution, and PII classification with JSON Schema registry setup, graph traversal API integration for automated schema change impact analysis before migrations, parent run facet wiring for hierarchical Airflow-to-dbt and Airflow-to-Spark lineage, lineage coverage audit scripts that cross-reference Marquez against your data catalog and alert on gaps, and CI lineage validation steps that assert expected input and output datasets appear in Marquez after every pipeline deployment. Let’s talk.
Let's Talk