Why Polars — The pandas Problem at Scale
pandas has been the lingua franca of Python data manipulation for over a decade. It is expressive, well-documented, and supported by virtually every Python data library. But pandas was designed for single-threaded, in-memory processing on datasets that fit comfortably on one machine. Three structural weaknesses surface when you push it to production scale.
First, pandas uses a NumPy memory model with nullable types implemented as object arrays — a layout that is cache-unfriendly and forces Python-level type dispatch on every operation. Second, most pandas operations are single-threaded: a 64-core server runs at 1/64 its potential. Third, pandas materialises every intermediate result in memory, so a pipeline of five transformations on a 10 GB DataFrame can spike to 40 GB+ of resident memory before the final output is produced.
Polars addresses all three weaknesses simultaneously. Written in Rust and built on Apache Arrow columnar memory format, Polars is multi-threaded by default, exposes a lazy query planner that eliminates redundant computation, and supports streaming mode for processing datasets larger than RAM. The result: Polars consistently outperforms pandas by 5–50× on single-node benchmarks, and beats Spark on datasets under ~1 TB that fit within a single powerful machine.
Polars Architecture — Arrow Memory, the Query Optimizer, and the Expression Engine
Understanding why Polars is fast requires understanding its three architectural layers. At the bottom sits the Apache Arrow columnar memory format. Arrow stores each column as a contiguous buffer of fixed-width values (or fixed-width offsets for strings), allowing SIMD vectorisation on modern CPUs. Column-oriented storage also means that a query touching only two of twenty columns reads 10% of the data from disk and RAM — projection pushdown is not an optimisation; it is the default physical layout.
Above Arrow sits the Polars query optimizer, a Rust implementation of a Volcano/Cascades-style logical and physical query planner. When you call .lazy() on a DataFrame and chain transformations, Polars builds a logical plan tree. Calling .collect() triggers optimisation passes: predicate pushdown (move filters as close to the source as possible), projection pushdown (eliminate unused columns early), common subexpression elimination (compute shared sub-expressions once), and join reordering. The physical plan is then executed by the multi-threaded expression engine, which partitions data across CPU cores and merges partial results using a lock-free aggregation approach.
# Install Polars (with all optional extras for Parquet, Delta, S3)
pip install "polars[all]"
# Or minimal install
pip install polarsNote
Lazy vs Eager Evaluation — When to Use Each
Polars exposes two execution modes. Eager mode (default when you call methods on a DataFrame) executes each operation immediately and returns a new DataFrame. It is convenient for interactive exploration — every intermediate result is available for inspection. Lazy mode (via .lazy()) defers execution until .collect() is called, allowing the optimizer to rewrite the plan. Use lazy mode for all production pipelines.
import polars as pl
from pathlib import Path
# ── Eager mode (fine for small datasets / exploration) ──
df = pl.read_parquet("events.parquet")
result = (
df
.filter(pl.col("event_type") == "purchase")
.with_columns(
(pl.col("amount") * 1.23).alias("amount_with_tax"),
pl.col("ts").dt.date().alias("event_date"),
)
.group_by("event_date")
.agg(
pl.col("amount_with_tax").sum().alias("daily_revenue"),
pl.col("user_id").n_unique().alias("unique_buyers"),
)
.sort("event_date")
)
print(result)
# ── Lazy mode (recommended for production) ──
# The query plan is built without reading any data
lf = pl.scan_parquet("events/*.parquet") # scan_* returns a LazyFrame
result = (
lf
.filter(pl.col("event_type") == "purchase")
.with_columns(
(pl.col("amount") * 1.23).alias("amount_with_tax"),
pl.col("ts").dt.date().alias("event_date"),
)
.group_by("event_date")
.agg(
pl.col("amount_with_tax").sum().alias("daily_revenue"),
pl.col("user_id").n_unique().alias("unique_buyers"),
)
.sort("event_date")
.collect() # <— triggers planning + execution
)
# Inspect the optimized plan before running
lf.explain(optimized=True)A common pattern is to read raw data with scan_parquet, scan_csv, or scan_ipc (Arrow IPC / Feather), build the full transformation pipeline as a LazyFrame, call .collect(), and sink the result with .write_parquet() or .sink_parquet() (streaming). The sink_* family of methods is part of the streaming engine — they never materialise the full dataset in RAM.
The Expression API — Composable, Type-Safe Transformations
The expression API is what separates Polars from pandas most concretely at the developer level. In pandas, many operations require either apply() (row-by-row Python loop, slow) or NumPy ufuncs (powerful but low-level). In Polars, expressions are first-class objects that describe a computation without executing it. They compose naturally, can be passed as arguments, and are executed in parallel across columns automatically.
import polars as pl
df = pl.read_parquet("orders.parquet")
# ── Column selection and renaming ──
result = df.select([
pl.col("order_id"),
pl.col("amount").alias("revenue"),
pl.col("user_id").cast(pl.Utf8),
pl.lit("production").alias("env"), # literal column
pl.all().exclude(["internal_flag"]), # all except one
])
# ── String expressions ──
cleaned = df.with_columns([
pl.col("email").str.to_lowercase().str.strip_chars(),
pl.col("phone").str.replace_all(r"[^0-9]", ""),
pl.col("product_name").str.split(",").list.len().alias("variant_count"),
pl.col("sku").str.contains(r"^SKU-\d{6}$").alias("sku_valid"),
])
# ── Date/time expressions ──
with_time = df.with_columns([
pl.col("created_at").dt.convert_time_zone("UTC").alias("created_utc"),
pl.col("created_at").dt.truncate("1d").alias("created_date"),
pl.col("created_at").dt.offset_by("7d").alias("follow_up_date"),
pl.col("created_at").dt.month().alias("month"),
])
# ── Conditional expressions (when/then/otherwise = vectorised if-else) ──
categorised = df.with_columns(
pl.when(pl.col("amount") >= 1000)
.then(pl.lit("high"))
.when(pl.col("amount") >= 100)
.then(pl.lit("medium"))
.otherwise(pl.lit("low"))
.alias("value_tier")
)
# ── List and struct expressions ──
# Explode list column to rows
exploded = df.explode("tags")
# Struct (record) access
nested = df.with_columns(
pl.col("metadata").struct.field("source").alias("source"),
pl.col("metadata").struct.field("campaign_id").alias("campaign_id"),
)Note
NaN for non-float nulls. Use pl.col("x").is_null(), .fill_null(strategy="forward"), and .drop_nulls() instead of pandas' isna() family. The distinction between null and NaN matters especially in float columns — both can coexist.GroupBy Aggregations and Window Expressions
Polars distinguishes between group_by().agg() (reduces rows — output has one row per group) and .over() (window function — output has the same number of rows as input, with aggregate value broadcast back). This maps directly to SQL's GROUP BY vs window functions and eliminates the need for merge-on-index patterns that are common — and slow — in pandas.
import polars as pl
orders = pl.read_parquet("orders.parquet")
# ── GroupBy: one row per group ──
daily_stats = (
orders
.group_by(["user_id", pl.col("created_at").dt.date().alias("date")])
.agg([
pl.col("amount").sum().alias("daily_spend"),
pl.col("amount").mean().alias("avg_order_value"),
pl.col("order_id").count().alias("num_orders"),
pl.col("amount").max().alias("max_order"),
pl.col("product_id").n_unique().alias("unique_products"),
# collect all amounts into a list column
pl.col("amount").sort(descending=True).head(3).alias("top3_amounts"),
])
)
# ── Window expressions: aggregate without losing rows ──
# Running total per user, user rank, percentage of user total
enriched = orders.with_columns([
pl.col("amount").sum().over("user_id").alias("user_total_spend"),
pl.col("amount").rank(descending=True).over("user_id").alias("user_order_rank"),
(pl.col("amount") / pl.col("amount").sum().over("user_id")).alias("pct_of_user_total"),
pl.col("created_at").min().over("user_id").alias("user_first_order_date"),
# rolling 7-day revenue per store (requires sort within group)
pl.col("amount")
.sort_by("created_at")
.rolling_sum(window_size=7)
.over("store_id")
.alias("rolling_7d_revenue"),
])
# ── Dynamic group_by for time-series aggregations ──
# Note: sort the LazyFrame by time first for deterministic windows
hourly = (
orders.lazy()
.sort("created_at")
.group_by_dynamic("created_at", every="1h", closed="left")
.agg([
pl.col("amount").sum().alias("hourly_revenue"),
pl.col("order_id").count().alias("order_count"),
])
.collect()
)Join Strategies — Hash, Sort-Merge, Cross, and Semi/Anti Joins
Polars supports all standard SQL join types and exposes the physical join strategy as an explicit parameter. The default is a hash join — optimal for equi-joins when one side fits in memory. For range joins or joins on sorted keys, sort-merge can be more efficient. Polars also supports semi and anti joins natively, eliminating the common pandas pattern of merge followed by boolean mask on an indicator column.
import polars as pl
orders = pl.scan_parquet("orders.parquet")
users = pl.scan_parquet("users.parquet")
products = pl.scan_parquet("products.parquet")
# ── Inner join (default: hash strategy) ──
joined = (
orders
.join(users, on="user_id", how="inner")
.join(products, left_on="product_id", right_on="id", how="left")
.collect()
)
# ── Semi join: orders where user exists in premium_users ──
premium_users = pl.scan_parquet("premium_users.parquet")
premium_orders = (
orders
.join(premium_users, on="user_id", how="semi")
.collect()
)
# ── Anti join: orders with NO matching user record (data quality check) ──
orphaned_orders = (
orders
.join(users, on="user_id", how="anti")
.collect()
)
# ── Cross join (all combinations — use with care) ──
# Useful for generating feature interaction matrices on small tables
small_a = pl.DataFrame({"x": [1, 2, 3]})
small_b = pl.DataFrame({"y": ["a", "b"]})
cross = small_a.join(small_b, how="cross")
# ── Inequality / range join (asof join) ──
# Join each event to the closest previous snapshot (by timestamp)
events = pl.scan_parquet("events.parquet").collect().sort("ts")
snapshots = pl.scan_parquet("snapshots.parquet").collect().sort("snapshot_ts")
asof = events.join_asof(
snapshots,
left_on="ts",
right_on="snapshot_ts",
strategy="backward", # nearest snapshot BEFORE the event
by="store_id", # partition key — join within store
)Note
join_strategy="sort_merge" to avoid rehashing. For point-lookup joins where the right side is small (<100k rows), Polars will automatically switch to a broadcast hash join without any configuration needed.Streaming Mode — Processing Data Larger Than RAM
Polars streaming mode processes data in fixed-size batches called morsels rather than loading the full dataset into memory at once. It is activated by replacing .collect() with .collect(streaming=True) or by using the sink_* family of methods. Not all operations support streaming yet — notably, sort and global group_by require full materialisation — but filters, projections, local aggregations, and joins against a small broadcast table all stream correctly.
import polars as pl
# ── Streaming collect: processes in batches, result fits in RAM ──
result = (
pl.scan_parquet("large_events/*.parquet") # 200 GB dataset
.filter(pl.col("country") == "DE")
.with_columns(
pl.col("amount").cast(pl.Float64),
pl.col("ts").dt.truncate("1h").alias("hour"),
)
.group_by(["hour", "product_id"])
.agg(pl.col("amount").sum(), pl.col("user_id").n_unique())
.collect(streaming=True)
)
# ── sink_parquet: result is written to disk without materialising in RAM ──
(
pl.scan_parquet("raw_logs/*.parquet")
.filter(pl.col("level") == "ERROR")
.select(["ts", "service", "message", "trace_id"])
.sort("ts")
.sink_parquet(
"filtered_errors/errors.parquet",
compression="zstd",
row_group_size=100_000,
)
)
# ── sink_csv: for downstream tools that don't speak Parquet ──
(
pl.scan_csv("input/*.csv")
.with_columns(pl.col("price").str.replace(",", ".").cast(pl.Float64))
.filter(pl.col("price") > 0)
.sink_csv("cleaned_prices.csv")
)
# ── Streaming with Parquet partitioning ──
# Write one file per partition key without loading all partitions into RAM
(
pl.scan_parquet("orders/*.parquet")
.with_columns(pl.col("created_at").dt.year().alias("year"))
.sink_parquet(
"partitioned_orders/",
partition_by=["year"],
compression="snappy",
)
)Parquet, Delta Lake, and Cloud Storage Integration
Polars natively reads and writes Apache Parquet via the Rust parquet2 crate with full predicate and projection pushdown support at the file level. For S3, GCS, and Azure Blob Storage, scan_parquet accepts s3://, gs://, and az:// URIs through the object_store crate — no Hadoop or JVM required. delta-rs provides Delta Lake reads and writes from pure Python/Rust, with first-class Polars support via DeltaTable.to_pyarrow() and the write_deltalake helper.
import polars as pl
import boto3
from deltalake import DeltaTable, write_deltalake
# ── Reading from S3 with Polars ──
# Credentials via environment variables or IAM role
df = pl.scan_parquet(
"s3://my-data-lake/events/year=2026/month=06/*.parquet",
storage_options={
"aws_region": "eu-west-1",
"aws_access_key_id": "...", # prefer IAM roles in production
"aws_secret_access_key": "...",
},
).filter(
pl.col("amount") > 0
).collect()
# ── Writing Parquet with optimal settings ──
df.write_parquet(
"output/events.parquet",
compression="zstd", # ~30% smaller than snappy, slightly slower to write
compression_level=3, # 1-22, 3 is a good balance
statistics=True, # embed min/max per row group for pushdown
row_group_size=100_000, # 50k–200k rows per row group is typical
use_pyarrow=False, # use native Polars writer (faster)
)
# ── Delta Lake: incremental upsert with delta-rs ──
table_path = "s3://my-lake/orders_delta"
# Initial write (creates Delta table with schema + transaction log)
write_deltalake(
table_path,
df.to_arrow(),
mode="overwrite",
storage_options={"AWS_REGION": "eu-west-1"},
)
# Subsequent merge — upsert by order_id
dt = DeltaTable(table_path)
dt.merge(
source=new_orders.to_arrow(),
predicate="target.order_id = source.order_id",
source_alias="source",
target_alias="target",
).when_matched_update_all().when_not_matched_insert_all().execute()
# Read back as Polars LazyFrame via Arrow
lf = pl.from_arrow(
DeltaTable(table_path).to_pyarrow()
).lazy()Performance Tuning — Schema Types, Parallelism, and Memory Layout
Polars is fast by default, but several decisions at the data model level compound over millions of rows. The most impactful are schema type selection, categorical encoding for low-cardinality strings, and partition count tuning for wide parallelism.
import polars as pl
# ── 1. Use specific dtypes at scan time — avoid object columns ──
schema_override = {
"user_id": pl.UInt32, # 4 bytes vs Int64's 8 bytes
"product_id": pl.UInt32,
"amount": pl.Float32, # fine if precision < 6 decimals
"status": pl.Categorical, # replaces Utf8 with integer codes
"country": pl.Categorical,
"created_at": pl.Datetime("us", "UTC"),
}
df = pl.scan_csv(
"events.csv",
schema_overrides=schema_override,
try_parse_dates=True,
).collect()
# ── 2. Categorical encoding: 10-100× faster group_by on string columns ──
# Cast at read time or later — both are efficient
df = df.with_columns([
pl.col("status").cast(pl.Categorical),
pl.col("country").cast(pl.Categorical),
])
# ── 3. Rechunk: consolidate fragmented Arrow chunks for sequential scans ──
# Relevant after many appends or vstack operations
df = df.rechunk()
# ── 4. Profile slow queries ──
lf = pl.scan_parquet("large_dataset/*.parquet")
query = (
lf
.filter(pl.col("status") == "active")
.group_by("region")
.agg(pl.col("revenue").sum())
)
# Collect with profiling
result, profile = query.profile()
print(profile) # shows node timings in microseconds
# ── 5. Control thread count ──
import os
os.environ["POLARS_MAX_THREADS"] = "16" # set before importing polars
# Or at runtime:
pl.Config.set_tbl_rows(20) # display rows
# (thread count must be set via env var before first import)
# ── 6. Avoid Python UDFs — use native expressions where possible ──
# SLOW: applies Python function row-by-row
slow_result = df.with_columns(
pl.col("name").map_elements(lambda x: x.upper(), return_dtype=pl.Utf8)
)
# FAST: Polars native string expression (vectorized in Rust)
fast_result = df.with_columns(
pl.col("name").str.to_uppercase()
)
# If you MUST use a Python UDF, use map_batches (passes entire Series)
def custom_logic(s: pl.Series) -> pl.Series:
return pl.Series(your_numpy_operation(s.to_numpy()))
df.with_columns(
pl.col("value").map_batches(custom_logic, return_dtype=pl.Float64)
)Note
pl.Categorical is unordered by default. For ordinal categories (e.g., “low” < “medium” < “high”), use pl.Enum(["low", "medium", "high"]) instead — it enforces a fixed ordering, enables order-based comparisons, and prevents new values from silently being added at runtime.Migrating from pandas — Common Patterns Side by Side
Most pandas workflows map directly to Polars, but the idioms differ. The key shifts are: replace df["col"] column access with pl.col("col") expressions; replace apply() with native Polars expressions or map_batches(); replace index-based merges with expression-based joins; and replace mutation (df["col"] = ...) with .with_columns().
# ┌──────────────────────────────────────────────────────────────┐
# │ pandas → Polars migration cheat sheet │
# └──────────────────────────────────────────────────────────────┘
import pandas as pd
import polars as pl
# ── Read CSV ──
# pandas: pd.read_csv("f.csv")
# polars: pl.read_csv("f.csv") or pl.scan_csv("f.csv") [lazy]
# ── Filter ──
# pandas: df[df["amount"] > 100]
# polars: df.filter(pl.col("amount") > 100)
# ── New column ──
# pandas: df["tax"] = df["amount"] * 0.23
# polars: df.with_columns((pl.col("amount") * 0.23).alias("tax"))
# ── Multiple new columns at once ──
# pandas: df["a"] = ...; df["b"] = ... (two copies)
# polars: df.with_columns([expr_a, expr_b]) (one pass)
# ── GroupBy ──
# pandas: df.groupby("region")["amount"].sum()
# polars: df.group_by("region").agg(pl.col("amount").sum())
# ── Merge ──
# pandas: pd.merge(orders, users, on="user_id", how="left")
# polars: orders.join(users, on="user_id", how="left")
# ── Apply (row-wise) ──
# pandas: df["clean"] = df["name"].apply(lambda x: x.strip().lower())
# polars: df.with_columns(pl.col("name").str.strip_chars().str.to_lowercase())
# ── Pivot ──
# pandas: df.pivot_table(values="amount", index="date", columns="region", aggfunc="sum")
# polars: df.pivot(values="amount", index="date", on="region", aggregate_function="sum")
# ── Convert between pandas and Polars ──
pandas_df = df.to_pandas() # Polars → pandas (zero-copy when possible)
polars_df = pl.from_pandas(pandas_df) # pandas → Polars
# ── Bridge for libraries that require pandas (matplotlib, sklearn, etc.) ──
import matplotlib.pyplot as plt
df.to_pandas().set_index("date")["revenue"].plot()
plt.show()Polars in Production Data Pipelines — Dagster, Prefect, and dbt Integration
Polars integrates naturally into modern Python pipeline orchestrators. In Dagster it can be used inside @asset functions as a drop-in replacement for pandas — Dagster's IO Managers handle serialisation. In Prefect, Polars DataFrames pass between tasks with no special handling needed. For dbt Python models running on Databricks or Snowflake, Polars is available via a simple import polars as pl once the package is installed in the runtime environment.
# ── Dagster asset using Polars ──
# pipelines/assets/order_metrics.py
import polars as pl
from dagster import asset, Output, MetadataValue
from dagster_aws.s3 import S3Resource
@asset(
group_name="order_analytics",
description="Daily order revenue and buyer count by region",
)
def daily_order_metrics(context, s3: S3Resource) -> pl.DataFrame:
lf = pl.scan_parquet(
"s3://data-lake/orders/year=2026/month=06/*.parquet",
storage_options=s3.storage_options(),
)
result = (
lf
.filter(pl.col("status") == "completed")
.with_columns(pl.col("created_at").dt.date().alias("date"))
.group_by(["date", "region"])
.agg([
pl.col("amount").sum().alias("revenue"),
pl.col("user_id").n_unique().alias("buyers"),
pl.col("order_id").count().alias("orders"),
])
.sort(["date", "region"])
.collect(streaming=True)
)
context.add_output_metadata({
"num_rows": MetadataValue.int(len(result)),
"date_range": MetadataValue.text(
f"{result['date'].min()} to {result['date'].max()}"
),
"total_revenue": MetadataValue.float(result["revenue"].sum()),
})
return result
# ── Custom Dagster IO Manager for Polars ──
# io_managers/polars_parquet_io_manager.py
from dagster import IOManager, InputContext, OutputContext
import polars as pl
class PolarsParquetIOManager(IOManager):
def __init__(self, base_path: str):
self.base_path = base_path
def handle_output(self, context: OutputContext, obj: pl.DataFrame):
path = f"{self.base_path}/{context.asset_key.path[-1]}.parquet"
obj.write_parquet(path, compression="zstd")
context.log.info(f"Wrote {len(obj)} rows to {path}")
def load_input(self, context: InputContext) -> pl.LazyFrame:
path = f"{self.base_path}/{context.asset_key.path[-1]}.parquet"
return pl.scan_parquet(path)Polars + DuckDB — The In-Process Analytics Power Pair
DuckDB and Polars share the Apache Arrow in-process memory model, which enables zero-copy data exchange between them via the Arrow PyCapsule Interface. A practical pattern is to use Polars for raw data cleaning and transformations (where its expression API is more ergonomic) and DuckDB for complex SQL analytics, geospatial queries with the spatial extension, or ad-hoc exploration. Both operate in-process with no serialisation overhead between them.
import polars as pl
import duckdb
# ── Pass a Polars DataFrame to DuckDB — zero-copy via Arrow ──
orders = pl.read_parquet("orders.parquet")
con = duckdb.connect()
# DuckDB can query Polars DataFrames directly (they share Arrow memory)
result = con.execute("""
SELECT
region,
DATE_TRUNC('month', created_at) AS month,
SUM(amount) AS revenue,
COUNT(DISTINCT user_id) AS buyers,
revenue / buyers AS arpu
FROM orders
GROUP BY 1, 2
ORDER BY month, revenue DESC
""").pl() # .pl() returns a Polars DataFrame
# ── DuckDB → Polars pipeline ──
# Use DuckDB for SQL-heavy transforms, Polars for post-processing
raw = con.execute("""
SELECT *
FROM read_parquet('s3://bucket/events/*.parquet')
WHERE event_date >= '2026-01-01'
""").pl()
# Now apply Polars expressions on the result
clean = (
raw
.with_columns([
pl.col("user_id").cast(pl.UInt32),
pl.col("amount").fill_null(0.0),
])
.filter(pl.col("amount") >= 0)
)Polars vs pandas vs Spark — When to Choose What
The H2O.ai database benchmark (group-by and join on 0.5 GB and 5 GB datasets) consistently shows Polars finishing 5–20× faster than pandas and within 2× of DuckDB. On a 50 GB groupby, Polars in streaming mode can beat Spark on a single 32-core node because Spark's task serialisation and shuffle overhead dominates at that scale. The crossover point where distributed Spark becomes worth its overhead is roughly 500 GB+ on a single node, or any job requiring true multi-node parallelism.
Decision heuristic: use Polars for datasets up to 10× your RAM (streaming handles the rest), use Spark or Trino when data genuinely requires multi-node distribution, use DuckDB when the primary interface is SQL, and keep pandas only where ecosystem compatibility (scikit-learn, matplotlib, statsmodels) requires it — and bridge via .to_pandas() at the last step.
Note
Work with us
Running large-scale data processing in Python and hitting pandas performance walls?
We design and implement high-performance data processing pipelines with Polars — from lazy query plan design and streaming mode configuration for out-of-core workloads to Delta Lake upsert workflows with delta-rs, Dagster asset integration with custom Polars IO Managers, Parquet schema optimization with Categorical and Enum types, DuckDB+Polars zero-copy pipelines, and pandas-to-Polars migration for analytics engineering teams. Let’s talk.
Get in touch