What is Trino?
Trino (formerly PrestoSQL) is an open-source MPP (Massively Parallel Processing) distributed SQL query engine that separates compute from storage. It reads data directly from wherever it lives — HDFS, S3, Iceberg tables, relational databases, Kafka topics — and executes SQL over all of them without ETL. A single Trino query can join an Apache Iceberg lakehouse table with a live PostgreSQL customer dimension and a Kafka event stream in one statement.
The key differentiator versus Spark SQL or Hive is Trino's pipelined execution model: stages exchange data through in-memory HTTP/2 buffers instead of writing intermediate results to disk. This delivers sub-second to low-second query latency for interactive analytics where Hive would take minutes.
Interactive Speed
Sub-second to low-second query latency on large datasets. Pipelined in-memory execution eliminates stage barriers that make Hive slow.
True Federation
Query across Iceberg, PostgreSQL, MySQL, Kafka, and HTTP APIs in a single SQL statement. No data movement or custom ETL required.
ANSI SQL
Full ANSI SQL with window functions, CTEs, LATERAL joins, UNNEST, and a rich built-in function library. No vendor dialect lock-in.
Architecture: Coordinator, Workers, and the Connector SPI
A Trino cluster has one coordinator and N workers. The coordinator receives SQL from clients, parses and plans the query, splits it into distributed stages, and schedules tasks across workers. Workers execute tasks and exchange data between stages through in-memory HTTP/2 buffers — no disk spill between stages by default.
Data source integration uses the Connector SPI (Service Provider Interface). Each connector implements three Java interfaces: ConnectorMetadata for schema discovery, ConnectorSplitManager for partitioning work into parallel splits, and ConnectorPageSource for reading columnar data pages. Built-in connectors cover Hive/HMS, Iceberg, Delta Lake, PostgreSQL, MySQL, MariaDB, SQL Server, Redis, Kafka, Elasticsearch, MongoDB, HTTP, and an in-memory connector for small reference tables.
# Trino query execution flow
#
# Client ──► Coordinator
# │ ① Parse SQL → AST
# │ ② Analyze: resolve catalog.schema.table via ConnectorMetadata
# │ ③ Plan: logical → distributed physical plan (stages + tasks)
# │ ④ Schedule: assign tasks to workers via HTTP
# │
# ┌───────────┼────────────────────────────────┐
# ▼ ▼ ▼
# Worker 1 Worker 2 ... Worker N
# (stage 0) (stage 0) (stage 0)
# │ │ │
# └──── exchange buffers (HTTP/2) ────────────┘
# │
# Worker 1..N
# (stage 1) → final stage output → coordinator → client
#
# Key configuration files:
# etc/config.properties ← coordinator or worker role, discovery URI
# etc/jvm.config ← heap size, GC flags
# etc/node.properties ← node ID, data directory, environment name
# etc/catalog/*.properties ← one file per data source connectorInstalling Trino with Docker Compose
The official image trinodb/trino bundles the server and all connectors. Mount config directories for coordinator, worker, and catalog configuration without rebuilding the image. The Docker Compose below runs a coordinator, two workers, and a Hive Metastore for the Iceberg connector.
# docker-compose.yml — Trino coordinator + workers + Hive Metastore
services:
trino-coordinator:
image: trinodb/trino:451
ports:
- "8080:8080"
volumes:
- ./etc/coordinator:/etc/trino
- ./etc/catalog:/etc/trino/catalog
environment:
- JAVA_TOOL_OPTIONS=-Xmx4G
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/v1/info"]
interval: 10s
retries: 10
trino-worker:
image: trinodb/trino:451
depends_on:
trino-coordinator:
condition: service_healthy
volumes:
- ./etc/worker:/etc/trino
- ./etc/catalog:/etc/trino/catalog
environment:
- JAVA_TOOL_OPTIONS=-Xmx8G
deploy:
replicas: 2
hive-metastore:
image: apache/hive:4.0.0
environment:
- SERVICE_NAME=metastore
ports:
- "9083:9083"
---
# etc/coordinator/config.properties
coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8080
discovery.uri=http://trino-coordinator:8080
query.max-memory=20GB
query.max-memory-per-node=2GB
# etc/worker/config.properties
coordinator=false
http-server.http.port=8080
discovery.uri=http://trino-coordinator:8080
# etc/coordinator/jvm.config
-server
-Xmx8G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError
-Djdk.attach.allowAttachSelf=trueNote
node-scheduler.include-coordinator=false in production. Scheduling tasks on the coordinator node competes with query planning, causing latency spikes under concurrent query load. The coordinator should be a dedicated planning and scheduling node only.Connector Configuration — Iceberg, PostgreSQL, and Kafka
Each connector is a single .properties file dropped into etc/catalog/. The filename becomes the catalog name used in SQL. Connectors load at startup; Trino 404+ supports live catalog reload via REST API without a rolling restart.
# etc/catalog/iceberg.properties — Iceberg connector backed by S3 + HMS
connector.name=iceberg
hive.metastore.uri=thrift://hive-metastore:9083
iceberg.file-format=PARQUET
hive.s3.endpoint=http://minio:9000
hive.s3.path-style-access=true
hive.s3.aws-access-key=minioadmin
hive.s3.aws-secret-key=minioadmin
iceberg.max-partitions-per-writer=200
# Push runtime bloom filters from join build side to the Iceberg reader
enable-dynamic-filtering=true
dynamic-filtering.small-broadcast-max-distinct-values-per-driver=1000000
# etc/catalog/postgresql.properties — PostgreSQL JDBC connector
connector.name=postgresql
connection-url=jdbc:postgresql://postgres:5432/analytics
connection-user=trino
connection-password=secret
# Pushes COUNT, SUM aggregations to PostgreSQL instead of fetching all rows
postgresql.experimental.stored-procedure-table-function-enabled=true
# etc/catalog/kafka.properties — read Kafka topics as SQL tables
connector.name=kafka
kafka.nodes=kafka:9092
kafka.table-names=events.page_views,events.purchases
kafka.hide-internal-columns=false
kafka.messages-per-split=10000
# Deserialize Avro messages via Confluent Schema Registry
kafka.confluent-schema-registry-url=http://schema-registry:8081
# etc/catalog/delta.properties — Delta Lake connector (S3 + HMS)
connector.name=delta_lake
hive.metastore.uri=thrift://hive-metastore:9083
delta.target-max-file-size=134217728
hive.s3.endpoint=http://minio:9000
hive.s3.path-style-access=trueFor teams using DuckDB for local development analytics — DuckDB reads the same Parquet files from S3 that the Iceberg connector exposes to Trino in production. The local DuckDB workflow and the production Trino cluster can share the same data lake, with Trino adding distributed scale and multi-source federation.
Query Federation — Joining Across Data Sources
Federation is Trino's headline capability. A catalog-qualified reference (catalog.schema.table) identifies which connector to use. The coordinator collects metadata from all referenced connectors simultaneously, builds a unified distributed plan, and pushes predicates to each connector so only relevant data crosses the network.
-- Federation query: join Iceberg events with PostgreSQL customers
-- Trino reads Iceberg splits from S3 and PostgreSQL rows in parallel
SELECT
c.customer_id,
c.country,
COUNT(e.event_id) AS total_events,
SUM(e.revenue_usd) AS total_revenue,
DATE_TRUNC('month', e.event_ts) AS month
FROM iceberg.analytics.events e -- S3-backed Iceberg table
JOIN postgresql.crm.customers c -- live PostgreSQL table
ON e.customer_id = c.customer_id
WHERE
e.event_ts >= DATE '2026-01-01'
AND c.country IN ('US', 'DE', 'PL')
GROUP BY c.customer_id, c.country, DATE_TRUNC('month', e.event_ts)
ORDER BY total_revenue DESC
LIMIT 100;
-- Cross-catalog CTAS: materialize federation result into Iceberg
CREATE TABLE iceberg.analytics.monthly_revenue
WITH (
format = 'PARQUET',
partitioning = ARRAY['month'],
sorted_by = ARRAY['total_revenue DESC']
)
AS
SELECT
c.country,
DATE_TRUNC('month', e.event_ts) AS month,
SUM(e.revenue_usd) AS total_revenue,
COUNT(DISTINCT e.customer_id) AS unique_customers
FROM iceberg.analytics.events e
JOIN postgresql.crm.customers c ON e.customer_id = c.customer_id
GROUP BY c.country, DATE_TRUNC('month', e.event_ts);
-- Reading a Kafka topic as a real-time SQL table
SELECT
_partition_id,
_partition_offset,
JSON_EXTRACT_SCALAR(payload, '$.event_type') AS event_type,
JSON_EXTRACT_SCALAR(payload, '$.user_id') AS user_id,
CAST(JSON_EXTRACT_SCALAR(payload, '$.amount') AS DOUBLE) AS amount
FROM kafka.events.page_views
WHERE _timestamp > NOW() - INTERVAL '5' MINUTE
LIMIT 1000;Note
Cost-Based Optimizer and Predicate Pushdown
Trino's planner includes a cost-based optimizer (CBO) that uses table statistics to choose join strategies, reorder joins for optimal execution, and eliminate redundant work. Activate the CBO by running ANALYZE on tables, which collects row counts, NDV (number of distinct values), null fractions, and min/max histograms per column and pushes them to connector metadata.
-- Collect statistics for the CBO (run after significant data loads)
ANALYZE iceberg.analytics.events;
-- Scope ANALYZE to specific partitions to avoid full-table scans
ANALYZE iceberg.analytics.events
WITH (partitions = ARRAY[ARRAY['2026-06']]);
-- Inspect collected statistics
SHOW STATS FOR iceberg.analytics.events;
-- column_name | data_size | distinct_values_count | nulls_fraction | low_value | high_value | row_count
-- EXPLAIN ANALYZE shows the physical plan with actual runtime stats
EXPLAIN ANALYZE
SELECT customer_id, SUM(revenue_usd)
FROM iceberg.analytics.events
WHERE event_ts >= DATE '2026-06-01'
GROUP BY customer_id;
-- ① Partition pruning: Iceberg connector reads only matching partitions
SELECT * FROM iceberg.analytics.events
WHERE event_date = DATE '2026-06-15';
-- Trino reads ONLY the 2026-06-15 partition files from S3
-- ② Dynamic filtering: Trino builds a bloom filter from the smaller join side
-- and sends it to workers reading the larger Iceberg table before they
-- start reading S3 files — non-matching row groups are skipped entirely
SELECT e.*, c.segment
FROM iceberg.analytics.events e
JOIN postgresql.crm.customers c ON e.customer_id = c.customer_id
WHERE c.segment = 'enterprise';
-- ③ Column projection pushdown: Parquet row groups are read for requested
-- columns only — other column chunks in the file are not fetched
SELECT event_id, revenue_usd
FROM iceberg.analytics.events;
-- ④ Aggregate pushdown to JDBC: Trino can push COUNT/SUM to PostgreSQL
SELECT country, COUNT(*)
FROM postgresql.crm.customers
GROUP BY country;
-- Executes as: SELECT country, COUNT(*) FROM customers GROUP BY country
-- on the PostgreSQL server, not in Trino workersFor teams evaluating query engines: ClickHouse delivers higher single-source analytical throughput with pre-materialized aggregations, but lacks cross-source federation. Trino is the right choice when data lives in heterogeneous systems and joining without ETL is the priority.
Security: TLS, LDAP, and File-Based Access Control
Trino supports password (LDAP/PAM), OAuth 2.0/OIDC, Kerberos, and certificate-based authentication. TLS must be enabled before activating any authentication method. Authorization is handled through file-based rules or Apache Ranger integration for attribute-based access control with column masking.
# etc/config.properties — enable HTTPS and LDAP authentication
http-server.https.enabled=true
http-server.https.port=8443
http-server.https.keystore.path=/etc/trino/certs/keystore.jks
http-server.https.keystore.key=changeit
http-server.authentication.type=PASSWORD
# etc/password-authenticator.properties
password-authenticator.name=ldap
ldap.url=ldaps://ldap.company.com:636
ldap.bind-dn=cn=trino-svc,ou=service-accounts,dc=company,dc=com
ldap.bind-password=${LDAP_SERVICE_PASSWORD}
ldap.user-base-dn=ou=users,dc=company,dc=com
ldap.user-bind-pattern=uid=${USER},ou=users,dc=company,dc=com
# etc/access-control.properties — file-based authorization
access-control.name=file
security.config-file=/etc/trino/rules.json// rules.json — catalog, schema, and column-level access control
{
"catalogs": [
{ "user": "alice", "catalog": "iceberg", "allow": "all" },
{ "user": "bob", "catalog": "postgresql", "allow": "read-only" },
{ "group": "analysts","catalog": ".*", "allow": "read-only" }
],
"schemas": [
{ "user": "alice", "schema": "staging", "owner": true }
],
"tables": [
{
"user": "bob",
"catalog": "iceberg",
"schema": "analytics",
"table": "events",
"privileges": ["SELECT"],
"columns": [
{ "name": "email", "mask": "NULL" },
{ "name": "ip_address", "mask": "'[REDACTED]'" }
]
}
]
}Note
iceberg.analytics.events, the email column always returns NULL and ip_address returns '[REDACTED]', regardless of the SQL he writes. This is enforced at the connector page-read level, not in a post-processing filter.Python and CLI Integration
The Python trino package provides a DB-API 2.0 interface and a SQLAlchemy dialect, making it compatible with pandas, dbt, and any ORM-based tooling. For interactive queries and plan inspection, the Trino CLI is the go-to tool.
# Install the Python driver
pip install trino sqlalchemy
# ── DB-API 2.0 — direct query execution ──────────────────────────────
import trino
conn = trino.dbapi.connect(
host="trino-coordinator",
port=8443,
user="alice",
auth=trino.auth.BasicAuthentication("alice", "password"),
http_scheme="https",
verify=True,
catalog="iceberg",
schema="analytics",
)
cur = conn.cursor()
cur.execute("""
SELECT country, SUM(revenue_usd) AS revenue
FROM events
WHERE event_ts >= DATE '2026-06-01'
GROUP BY country
ORDER BY revenue DESC
LIMIT 20
""")
for row in cur.fetchall():
print(f"{row[0]:20s} {row[1]:>15,.0f}")
# ── SQLAlchemy + pandas ───────────────────────────────────────────────
from sqlalchemy import create_engine
import pandas as pd
engine = create_engine(
"trino://alice@trino-coordinator:8443/iceberg/analytics",
connect_args={
"auth": trino.auth.BasicAuthentication("alice", "password"),
"http_scheme": "https",
},
)
df = pd.read_sql(
"SELECT * FROM events WHERE event_date = current_date LIMIT 100000",
engine,
)
print(df.dtypes)
# ── CLI: interactive query and plan inspection ────────────────────────
# docker run --rm -it trinodb/trino:451 trino --server http://trino-coordinator:8080
#
# trino> SHOW CATALOGS;
# trino> USE iceberg.analytics;
# trino> SHOW TABLES;
# trino> EXPLAIN (TYPE DISTRIBUTED) SELECT ...;
# trino> EXPLAIN ANALYZE SELECT ...; -- runs the query and shows actual statsKubernetes Deployment with Helm
The official Trino Helm chart deploys a coordinator Deployment and a worker Deployment with configurable replicas. Catalogs are injected as ConfigMaps and mounted into both coordinator and worker pods. Worker HPA scales on CPU utilization or custom metrics like active query queue depth exposed via the Trino REST API.
# Add the Trino Helm repository
helm repo add trino https://trinodb.github.io/charts
helm repo update
# values.yaml — production Trino Helm configuration
image:
tag: "451"
coordinator:
resources:
requests: { cpu: "2", memory: "10Gi" }
limits: { cpu: "4", memory: "14Gi" }
jvm:
maxHeapSize: "8G"
gcMethod:
type: UseG1GC
g1:
heapRegionSize: "32M"
config:
query:
maxMemory: "200GB"
maxMemoryPerNode: "8GB"
worker:
replicas: 4
resources:
requests: { cpu: "4", memory: "20Gi" }
limits: { cpu: "8", memory: "28Gi" }
jvm:
maxHeapSize: "18G"
autoscaling:
enabled: true
minReplicas: 2
maxReplicas: 20
targetCPUUtilizationPercentage: 70
# Catalogs injected as ConfigMaps — edit + rolling-restart to apply
catalogs:
iceberg: |
connector.name=iceberg
hive.metastore.uri=thrift://hive-metastore:9083
hive.s3.endpoint=https://s3.amazonaws.com
hive.s3.region=eu-central-1
enable-dynamic-filtering=true
postgresql: |
connector.name=postgresql
connection-url=jdbc:postgresql://postgres.prod.svc:5432/analytics
connection-user=trino
connection-password=${ENV:POSTGRES_PASSWORD}
# Secrets projected as environment variables into pods
secretMounts:
- name: trino-secrets
secretName: trino-db-credentials
path: /etc/trino/secretsSpill to Disk and Memory Management
Trino runs entirely in-memory by default. When queries exceed available heap, Trino can spill sort buffers, aggregation state, and join build tables to local disk. Spill is disabled by default — enable it to allow larger-than-memory queries to complete at the cost of I/O latency.
# etc/config.properties — memory limits and spill configuration
# Per-node query memory budget
query.max-memory-per-node=12GB
# Cluster-wide query memory limit (sum across all nodes)
query.max-memory=100GB
# Enable spill to disk for sort, hash aggregation, hash joins
spill-enabled=true
spiller-spill-path=/data/trino-spill # use fast NVMe in production
spill-max-used-space-threshold=0.9
# Exchange buffer sizing
exchange.max-buffer-size=32MB
task.max-partial-aggregation-memory=16MB
# Concurrency limits — prevent OOM from too many concurrent queries
query.max-queued-queries=1000
query.max-concurrent-queries=100
# Operator-specific memory limits
query.max-total-memory-per-node=16GBNote
Production Checklist
Pin the exact Trino version in Docker image tags and Helm chart; minor releases introduce breaking connector API changes.
Set node-scheduler.include-coordinator=false so the coordinator stays dedicated to planning and scheduling.
Run ANALYZE on Iceberg and Hive tables after major data loads to keep CBO statistics current.
Enable dynamic-filtering=true for Iceberg and Hive connectors to push runtime bloom filters to S3 readers.
Configure spill-to-disk only on workers with fast NVMe local storage — network-attached spill is slower than failing queries.
Use HPA on workers with a target CPU of 60–70%; Trino queries are CPU-intensive during exchange decoding.
Export JMX metrics via the Prometheus JMX exporter and alert on query failure rate and peak reserved memory.
Set connection pool limits on JDBC connectors (PostgreSQL, MySQL) to cap the number of database connections per worker.
Apply file-based access control with column masking before granting analyst access — test with EXPLAIN to confirm pushdown.
Run EXPLAIN ANALYZE on federation queries before promoting them to production pipelines to catch plan regressions early.
Running SQL analytics across Hive, Iceberg, PostgreSQL, and Kafka and unable to join data without custom ETL pipelines?
We design and implement Trino query federation platforms — from connector configuration for Hive, Iceberg, Delta Lake, and relational databases to cost-based optimizer statistics collection with ANALYZE, dynamic filtering setup for Iceberg partition pruning, Kubernetes Helm chart deployment with HPA worker autoscaling, TLS and LDAP authentication, file-based access control with column masking, Python trino and SQLAlchemy client integration, and production performance tuning with spill-to-disk configuration and query memory management. Let’s talk.
Let's Talk