Back to Blog
TrinoDistributed SQLAnalyticsQuery FederationData LakeJavaData EngineeringApache IcebergSQLBig Data

Trino for Distributed SQL Analytics — Architecture, Connectors, and Query Federation

A practical guide to Trino for distributed SQL analytics: MPP coordinator-worker architecture where the coordinator parses, plans, and schedules queries while workers execute pipelined in-memory stages over HTTP/2 buffers for sub-second interactive latency, the Connector SPI with ConnectorMetadata for schema discovery, ConnectorSplitManager for parallel split generation, and ConnectorPageSource for reading columnar data pages, built-in connectors for Hive/HMS, Iceberg, Delta Lake, PostgreSQL, MySQL, Kafka, and HTTP with catalog configuration via properties files, federation queries joining Iceberg S3 tables with live PostgreSQL dimensions and Kafka topics in a single ANSI SQL statement with broadcast join selection for small dimension tables and distributed hash join for large-to-large joins, cost-based optimizer statistics collected with ANALYZE for row counts, NDV, null fractions, and column histograms enabling join reordering and strategy selection, partition pruning and dynamic filtering that push bloom filters from the build side of joins to the Iceberg connector reader to skip non-matching S3 files before any data is transferred, column projection pushdown for Parquet row-group skipping, TLS and LDAP password authentication configuration with etc/password-authenticator.properties, file-based access control with JSON rules for catalog read-only grants and column-level masking of PII fields including email and IP address, Python trino DB-API 2.0 driver and SQLAlchemy integration for pandas and dbt workflows, EXPLAIN ANALYZE for distributed plan inspection, Kubernetes Helm chart deployment with coordinator Deployment and HPA worker autoscaling targeting 70% CPU utilization, spill-to-disk configuration for sort, aggregation, and join buffers that exceed heap on NVMe-backed volumes, and a 10-point production checklist.

2026-06-18

What is Trino?

Trino (formerly PrestoSQL) is an open-source MPP (Massively Parallel Processing) distributed SQL query engine that separates compute from storage. It reads data directly from wherever it lives — HDFS, S3, Iceberg tables, relational databases, Kafka topics — and executes SQL over all of them without ETL. A single Trino query can join an Apache Iceberg lakehouse table with a live PostgreSQL customer dimension and a Kafka event stream in one statement.

The key differentiator versus Spark SQL or Hive is Trino's pipelined execution model: stages exchange data through in-memory HTTP/2 buffers instead of writing intermediate results to disk. This delivers sub-second to low-second query latency for interactive analytics where Hive would take minutes.

Interactive Speed

Sub-second to low-second query latency on large datasets. Pipelined in-memory execution eliminates stage barriers that make Hive slow.

True Federation

Query across Iceberg, PostgreSQL, MySQL, Kafka, and HTTP APIs in a single SQL statement. No data movement or custom ETL required.

ANSI SQL

Full ANSI SQL with window functions, CTEs, LATERAL joins, UNNEST, and a rich built-in function library. No vendor dialect lock-in.

Architecture: Coordinator, Workers, and the Connector SPI

A Trino cluster has one coordinator and N workers. The coordinator receives SQL from clients, parses and plans the query, splits it into distributed stages, and schedules tasks across workers. Workers execute tasks and exchange data between stages through in-memory HTTP/2 buffers — no disk spill between stages by default.

Data source integration uses the Connector SPI (Service Provider Interface). Each connector implements three Java interfaces: ConnectorMetadata for schema discovery, ConnectorSplitManager for partitioning work into parallel splits, and ConnectorPageSource for reading columnar data pages. Built-in connectors cover Hive/HMS, Iceberg, Delta Lake, PostgreSQL, MySQL, MariaDB, SQL Server, Redis, Kafka, Elasticsearch, MongoDB, HTTP, and an in-memory connector for small reference tables.

# Trino query execution flow
#
# Client ──► Coordinator
#               │  ① Parse SQL → AST
#               │  ② Analyze: resolve catalog.schema.table via ConnectorMetadata
#               │  ③ Plan: logical → distributed physical plan (stages + tasks)
#               │  ④ Schedule: assign tasks to workers via HTTP
#               │
#   ┌───────────┼────────────────────────────────┐
#   ▼           ▼                                ▼
# Worker 1   Worker 2  ...                   Worker N
# (stage 0)  (stage 0)                       (stage 0)
#   │           │                                │
#   └──── exchange buffers (HTTP/2) ────────────┘
#              │
#           Worker 1..N
#           (stage 1)  →  final stage output → coordinator → client
#
# Key configuration files:
# etc/config.properties   ← coordinator or worker role, discovery URI
# etc/jvm.config          ← heap size, GC flags
# etc/node.properties     ← node ID, data directory, environment name
# etc/catalog/*.properties ← one file per data source connector

Installing Trino with Docker Compose

The official image trinodb/trino bundles the server and all connectors. Mount config directories for coordinator, worker, and catalog configuration without rebuilding the image. The Docker Compose below runs a coordinator, two workers, and a Hive Metastore for the Iceberg connector.

# docker-compose.yml — Trino coordinator + workers + Hive Metastore
services:
  trino-coordinator:
    image: trinodb/trino:451
    ports:
      - "8080:8080"
    volumes:
      - ./etc/coordinator:/etc/trino
      - ./etc/catalog:/etc/trino/catalog
    environment:
      - JAVA_TOOL_OPTIONS=-Xmx4G
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/v1/info"]
      interval: 10s
      retries: 10

  trino-worker:
    image: trinodb/trino:451
    depends_on:
      trino-coordinator:
        condition: service_healthy
    volumes:
      - ./etc/worker:/etc/trino
      - ./etc/catalog:/etc/trino/catalog
    environment:
      - JAVA_TOOL_OPTIONS=-Xmx8G
    deploy:
      replicas: 2

  hive-metastore:
    image: apache/hive:4.0.0
    environment:
      - SERVICE_NAME=metastore
    ports:
      - "9083:9083"

---
# etc/coordinator/config.properties
coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8080
discovery.uri=http://trino-coordinator:8080
query.max-memory=20GB
query.max-memory-per-node=2GB

# etc/worker/config.properties
coordinator=false
http-server.http.port=8080
discovery.uri=http://trino-coordinator:8080

# etc/coordinator/jvm.config
-server
-Xmx8G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError
-Djdk.attach.allowAttachSelf=true

Note

Always set node-scheduler.include-coordinator=false in production. Scheduling tasks on the coordinator node competes with query planning, causing latency spikes under concurrent query load. The coordinator should be a dedicated planning and scheduling node only.

Connector Configuration — Iceberg, PostgreSQL, and Kafka

Each connector is a single .properties file dropped into etc/catalog/. The filename becomes the catalog name used in SQL. Connectors load at startup; Trino 404+ supports live catalog reload via REST API without a rolling restart.

# etc/catalog/iceberg.properties — Iceberg connector backed by S3 + HMS
connector.name=iceberg
hive.metastore.uri=thrift://hive-metastore:9083
iceberg.file-format=PARQUET
hive.s3.endpoint=http://minio:9000
hive.s3.path-style-access=true
hive.s3.aws-access-key=minioadmin
hive.s3.aws-secret-key=minioadmin
iceberg.max-partitions-per-writer=200
# Push runtime bloom filters from join build side to the Iceberg reader
enable-dynamic-filtering=true
dynamic-filtering.small-broadcast-max-distinct-values-per-driver=1000000

# etc/catalog/postgresql.properties — PostgreSQL JDBC connector
connector.name=postgresql
connection-url=jdbc:postgresql://postgres:5432/analytics
connection-user=trino
connection-password=secret
# Pushes COUNT, SUM aggregations to PostgreSQL instead of fetching all rows
postgresql.experimental.stored-procedure-table-function-enabled=true

# etc/catalog/kafka.properties — read Kafka topics as SQL tables
connector.name=kafka
kafka.nodes=kafka:9092
kafka.table-names=events.page_views,events.purchases
kafka.hide-internal-columns=false
kafka.messages-per-split=10000
# Deserialize Avro messages via Confluent Schema Registry
kafka.confluent-schema-registry-url=http://schema-registry:8081

# etc/catalog/delta.properties — Delta Lake connector (S3 + HMS)
connector.name=delta_lake
hive.metastore.uri=thrift://hive-metastore:9083
delta.target-max-file-size=134217728
hive.s3.endpoint=http://minio:9000
hive.s3.path-style-access=true

For teams using DuckDB for local development analytics — DuckDB reads the same Parquet files from S3 that the Iceberg connector exposes to Trino in production. The local DuckDB workflow and the production Trino cluster can share the same data lake, with Trino adding distributed scale and multi-source federation.

Query Federation — Joining Across Data Sources

Federation is Trino's headline capability. A catalog-qualified reference (catalog.schema.table) identifies which connector to use. The coordinator collects metadata from all referenced connectors simultaneously, builds a unified distributed plan, and pushes predicates to each connector so only relevant data crosses the network.

-- Federation query: join Iceberg events with PostgreSQL customers
-- Trino reads Iceberg splits from S3 and PostgreSQL rows in parallel
SELECT
    c.customer_id,
    c.country,
    COUNT(e.event_id)                      AS total_events,
    SUM(e.revenue_usd)                     AS total_revenue,
    DATE_TRUNC('month', e.event_ts)        AS month
FROM iceberg.analytics.events e           -- S3-backed Iceberg table
JOIN postgresql.crm.customers c           -- live PostgreSQL table
  ON e.customer_id = c.customer_id
WHERE
    e.event_ts >= DATE '2026-01-01'
    AND c.country IN ('US', 'DE', 'PL')
GROUP BY c.customer_id, c.country, DATE_TRUNC('month', e.event_ts)
ORDER BY total_revenue DESC
LIMIT 100;

-- Cross-catalog CTAS: materialize federation result into Iceberg
CREATE TABLE iceberg.analytics.monthly_revenue
WITH (
    format       = 'PARQUET',
    partitioning = ARRAY['month'],
    sorted_by    = ARRAY['total_revenue DESC']
)
AS
SELECT
    c.country,
    DATE_TRUNC('month', e.event_ts) AS month,
    SUM(e.revenue_usd)              AS total_revenue,
    COUNT(DISTINCT e.customer_id)   AS unique_customers
FROM iceberg.analytics.events e
JOIN postgresql.crm.customers c ON e.customer_id = c.customer_id
GROUP BY c.country, DATE_TRUNC('month', e.event_ts);

-- Reading a Kafka topic as a real-time SQL table
SELECT
    _partition_id,
    _partition_offset,
    JSON_EXTRACT_SCALAR(payload, '$.event_type') AS event_type,
    JSON_EXTRACT_SCALAR(payload, '$.user_id')    AS user_id,
    CAST(JSON_EXTRACT_SCALAR(payload, '$.amount') AS DOUBLE) AS amount
FROM kafka.events.page_views
WHERE _timestamp > NOW() - INTERVAL '5' MINUTE
LIMIT 1000;

Note

When a federation query joins a large Iceberg table with a small PostgreSQL dimension, Trino's cost-based optimizer automatically selects a broadcast join — the PostgreSQL side is fetched by the coordinator and broadcast to all workers, avoiding a shuffle of the large S3 dataset. For large-to-large joins, Trino uses a distributed hash join where both sides are repartitioned by the join key.

Cost-Based Optimizer and Predicate Pushdown

Trino's planner includes a cost-based optimizer (CBO) that uses table statistics to choose join strategies, reorder joins for optimal execution, and eliminate redundant work. Activate the CBO by running ANALYZE on tables, which collects row counts, NDV (number of distinct values), null fractions, and min/max histograms per column and pushes them to connector metadata.

-- Collect statistics for the CBO (run after significant data loads)
ANALYZE iceberg.analytics.events;

-- Scope ANALYZE to specific partitions to avoid full-table scans
ANALYZE iceberg.analytics.events
WITH (partitions = ARRAY[ARRAY['2026-06']]);

-- Inspect collected statistics
SHOW STATS FOR iceberg.analytics.events;
-- column_name | data_size | distinct_values_count | nulls_fraction | low_value | high_value | row_count

-- EXPLAIN ANALYZE shows the physical plan with actual runtime stats
EXPLAIN ANALYZE
SELECT customer_id, SUM(revenue_usd)
FROM iceberg.analytics.events
WHERE event_ts >= DATE '2026-06-01'
GROUP BY customer_id;

-- ① Partition pruning: Iceberg connector reads only matching partitions
SELECT * FROM iceberg.analytics.events
WHERE event_date = DATE '2026-06-15';
-- Trino reads ONLY the 2026-06-15 partition files from S3

-- ② Dynamic filtering: Trino builds a bloom filter from the smaller join side
--    and sends it to workers reading the larger Iceberg table before they
--    start reading S3 files — non-matching row groups are skipped entirely
SELECT e.*, c.segment
FROM iceberg.analytics.events e
JOIN postgresql.crm.customers c ON e.customer_id = c.customer_id
WHERE c.segment = 'enterprise';

-- ③ Column projection pushdown: Parquet row groups are read for requested
--    columns only — other column chunks in the file are not fetched
SELECT event_id, revenue_usd
FROM iceberg.analytics.events;

-- ④ Aggregate pushdown to JDBC: Trino can push COUNT/SUM to PostgreSQL
SELECT country, COUNT(*)
FROM postgresql.crm.customers
GROUP BY country;
-- Executes as: SELECT country, COUNT(*) FROM customers GROUP BY country
-- on the PostgreSQL server, not in Trino workers

For teams evaluating query engines: ClickHouse delivers higher single-source analytical throughput with pre-materialized aggregations, but lacks cross-source federation. Trino is the right choice when data lives in heterogeneous systems and joining without ETL is the priority.

Security: TLS, LDAP, and File-Based Access Control

Trino supports password (LDAP/PAM), OAuth 2.0/OIDC, Kerberos, and certificate-based authentication. TLS must be enabled before activating any authentication method. Authorization is handled through file-based rules or Apache Ranger integration for attribute-based access control with column masking.

# etc/config.properties — enable HTTPS and LDAP authentication
http-server.https.enabled=true
http-server.https.port=8443
http-server.https.keystore.path=/etc/trino/certs/keystore.jks
http-server.https.keystore.key=changeit
http-server.authentication.type=PASSWORD

# etc/password-authenticator.properties
password-authenticator.name=ldap
ldap.url=ldaps://ldap.company.com:636
ldap.bind-dn=cn=trino-svc,ou=service-accounts,dc=company,dc=com
ldap.bind-password=${LDAP_SERVICE_PASSWORD}
ldap.user-base-dn=ou=users,dc=company,dc=com
ldap.user-bind-pattern=uid=${USER},ou=users,dc=company,dc=com

# etc/access-control.properties — file-based authorization
access-control.name=file
security.config-file=/etc/trino/rules.json
// rules.json — catalog, schema, and column-level access control
{
  "catalogs": [
    { "user": "alice",    "catalog": "iceberg",     "allow": "all"       },
    { "user": "bob",      "catalog": "postgresql",   "allow": "read-only" },
    { "group": "analysts","catalog": ".*",           "allow": "read-only" }
  ],
  "schemas": [
    { "user": "alice",    "schema": "staging",       "owner": true }
  ],
  "tables": [
    {
      "user": "bob",
      "catalog": "iceberg",
      "schema": "analytics",
      "table": "events",
      "privileges": ["SELECT"],
      "columns": [
        { "name": "email",      "mask": "NULL"          },
        { "name": "ip_address", "mask": "'[REDACTED]'" }
      ]
    }
  ]
}

Note

Column masking in Trino's file-based rules applies server-side, independently of the client query. When Bob selects from iceberg.analytics.events, the email column always returns NULL and ip_address returns '[REDACTED]', regardless of the SQL he writes. This is enforced at the connector page-read level, not in a post-processing filter.

Python and CLI Integration

The Python trino package provides a DB-API 2.0 interface and a SQLAlchemy dialect, making it compatible with pandas, dbt, and any ORM-based tooling. For interactive queries and plan inspection, the Trino CLI is the go-to tool.

# Install the Python driver
pip install trino sqlalchemy

# ── DB-API 2.0 — direct query execution ──────────────────────────────
import trino

conn = trino.dbapi.connect(
    host="trino-coordinator",
    port=8443,
    user="alice",
    auth=trino.auth.BasicAuthentication("alice", "password"),
    http_scheme="https",
    verify=True,
    catalog="iceberg",
    schema="analytics",
)

cur = conn.cursor()
cur.execute("""
    SELECT country, SUM(revenue_usd) AS revenue
    FROM events
    WHERE event_ts >= DATE '2026-06-01'
    GROUP BY country
    ORDER BY revenue DESC
    LIMIT 20
""")
for row in cur.fetchall():
    print(f"{row[0]:20s}  {row[1]:>15,.0f}")

# ── SQLAlchemy + pandas ───────────────────────────────────────────────
from sqlalchemy import create_engine
import pandas as pd

engine = create_engine(
    "trino://alice@trino-coordinator:8443/iceberg/analytics",
    connect_args={
        "auth": trino.auth.BasicAuthentication("alice", "password"),
        "http_scheme": "https",
    },
)

df = pd.read_sql(
    "SELECT * FROM events WHERE event_date = current_date LIMIT 100000",
    engine,
)
print(df.dtypes)

# ── CLI: interactive query and plan inspection ────────────────────────
# docker run --rm -it trinodb/trino:451 trino --server http://trino-coordinator:8080
#
# trino> SHOW CATALOGS;
# trino> USE iceberg.analytics;
# trino> SHOW TABLES;
# trino> EXPLAIN (TYPE DISTRIBUTED) SELECT ...;
# trino> EXPLAIN ANALYZE SELECT ...;    -- runs the query and shows actual stats

Kubernetes Deployment with Helm

The official Trino Helm chart deploys a coordinator Deployment and a worker Deployment with configurable replicas. Catalogs are injected as ConfigMaps and mounted into both coordinator and worker pods. Worker HPA scales on CPU utilization or custom metrics like active query queue depth exposed via the Trino REST API.

# Add the Trino Helm repository
helm repo add trino https://trinodb.github.io/charts
helm repo update

# values.yaml — production Trino Helm configuration
image:
  tag: "451"

coordinator:
  resources:
    requests: { cpu: "2",  memory: "10Gi" }
    limits:   { cpu: "4",  memory: "14Gi" }
  jvm:
    maxHeapSize: "8G"
    gcMethod:
      type: UseG1GC
      g1:
        heapRegionSize: "32M"
  config:
    query:
      maxMemory: "200GB"
      maxMemoryPerNode: "8GB"

worker:
  replicas: 4
  resources:
    requests: { cpu: "4",  memory: "20Gi" }
    limits:   { cpu: "8",  memory: "28Gi" }
  jvm:
    maxHeapSize: "18G"
  autoscaling:
    enabled: true
    minReplicas: 2
    maxReplicas: 20
    targetCPUUtilizationPercentage: 70

# Catalogs injected as ConfigMaps — edit + rolling-restart to apply
catalogs:
  iceberg: |
    connector.name=iceberg
    hive.metastore.uri=thrift://hive-metastore:9083
    hive.s3.endpoint=https://s3.amazonaws.com
    hive.s3.region=eu-central-1
    enable-dynamic-filtering=true
  postgresql: |
    connector.name=postgresql
    connection-url=jdbc:postgresql://postgres.prod.svc:5432/analytics
    connection-user=trino
    connection-password=${ENV:POSTGRES_PASSWORD}

# Secrets projected as environment variables into pods
secretMounts:
  - name: trino-secrets
    secretName: trino-db-credentials
    path: /etc/trino/secrets

Spill to Disk and Memory Management

Trino runs entirely in-memory by default. When queries exceed available heap, Trino can spill sort buffers, aggregation state, and join build tables to local disk. Spill is disabled by default — enable it to allow larger-than-memory queries to complete at the cost of I/O latency.

# etc/config.properties — memory limits and spill configuration

# Per-node query memory budget
query.max-memory-per-node=12GB

# Cluster-wide query memory limit (sum across all nodes)
query.max-memory=100GB

# Enable spill to disk for sort, hash aggregation, hash joins
spill-enabled=true
spiller-spill-path=/data/trino-spill      # use fast NVMe in production
spill-max-used-space-threshold=0.9

# Exchange buffer sizing
exchange.max-buffer-size=32MB
task.max-partial-aggregation-memory=16MB

# Concurrency limits — prevent OOM from too many concurrent queries
query.max-queued-queries=1000
query.max-concurrent-queries=100

# Operator-specific memory limits
query.max-total-memory-per-node=16GB

Note

Enable spill only on workers with fast NVMe local disks. Spilling to a network-attached volume (EBS gp2, NFS) typically makes spilling queries slower than simply failing them with an OOM error, because the I/O latency exceeds the query timeout. On Kubernetes, use local PVCs backed by NVMe instance storage (e.g., AWS i3en or i4i instances) for spill paths.

Production Checklist

1

Pin the exact Trino version in Docker image tags and Helm chart; minor releases introduce breaking connector API changes.

2

Set node-scheduler.include-coordinator=false so the coordinator stays dedicated to planning and scheduling.

3

Run ANALYZE on Iceberg and Hive tables after major data loads to keep CBO statistics current.

4

Enable dynamic-filtering=true for Iceberg and Hive connectors to push runtime bloom filters to S3 readers.

5

Configure spill-to-disk only on workers with fast NVMe local storage — network-attached spill is slower than failing queries.

6

Use HPA on workers with a target CPU of 60–70%; Trino queries are CPU-intensive during exchange decoding.

7

Export JMX metrics via the Prometheus JMX exporter and alert on query failure rate and peak reserved memory.

8

Set connection pool limits on JDBC connectors (PostgreSQL, MySQL) to cap the number of database connections per worker.

9

Apply file-based access control with column masking before granting analyst access — test with EXPLAIN to confirm pushdown.

10

Run EXPLAIN ANALYZE on federation queries before promoting them to production pipelines to catch plan regressions early.

Running SQL analytics across Hive, Iceberg, PostgreSQL, and Kafka and unable to join data without custom ETL pipelines?

We design and implement Trino query federation platforms — from connector configuration for Hive, Iceberg, Delta Lake, and relational databases to cost-based optimizer statistics collection with ANALYZE, dynamic filtering setup for Iceberg partition pruning, Kubernetes Helm chart deployment with HPA worker autoscaling, TLS and LDAP authentication, file-based access control with column masking, Python trino and SQLAlchemy client integration, and production performance tuning with spill-to-disk configuration and query memory management. Let’s talk.

Let's Talk

Related Articles

DataSOps Consulting

Need help implementing this in production?

We build and operate data pipelines, AI systems, and observability stacks for engineering teams. Reach out for a free 30-minute architecture review.