Why Spark Jobs Underperform — and How to Diagnose Them
Apache Spark is one of the most powerful distributed compute engines available, yet it is also one of the easiest to misconfigure. A job that runs in 10 minutes on a well-tuned cluster can take 2 hours on an identically sized cluster with default settings. The culprits are almost always the same: too many tiny tasks, a handful of skewed partitions monopolizing executors, an avoidable shuffle triggered by an un-tuned join, or an executor running out of memory and spilling gigabytes to disk.
The diagnostic starting point is always the Spark Web UI. The Stages tab exposes per-task duration distributions — if the 75th percentile task takes 30 seconds but the maximum task takes 18 minutes, you have data skew. The SQL tab shows the physical query plan with row estimates, exchange (shuffle) operators, and sort nodes. The Storage tab shows cached datasets and spill metrics. Before touching a single config parameter, spend five minutes in the UI understanding where time is actually going. Teams running Spark on Kubernetes will find the companion guide on Spark Operators and resource management useful for cluster-level sizing before diving into job-level tuning.
# PySpark — read the physical plan before tuning anything
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("perf-audit").getOrCreate()
df = spark.read.parquet("s3://my-bucket/events/year=2026/")
df_filtered = df.filter("event_type = 'purchase'").groupBy("user_id").count()
# Read the logical plan
df_filtered.explain()
# Read the full physical plan with statistics and cost estimates
df_filtered.explain("formatted")
# Output sections to look for:
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- HashAggregate (keys=[user_id], functions=[count(1)])
# +- Exchange hashpartitioning(user_id, 200), ENSURE_REQUIREMENTS, [id=#12]
# +- HashAggregate (keys=[user_id], functions=[partial_count(1)])
# +- Filter (event_type = purchase)
# +- Scan parquet [user_id, event_type]
# Batched: true, DataFilters: [isnotnull(event_type), (event_type = purchase)]
# PartitionFilters: [], PushedFilters: [IsNotNull(event_type), EqualTo(event_type,purchase)]
# ReadSchema: struct<user_id:string,event_type:string>
# Key things to check in the plan:
# 1. Exchange nodes — each Exchange is a shuffle (expensive)
# 2. Sort nodes before joins — may indicate a sort-merge join where broadcast is possible
# 3. BroadcastHashJoin vs SortMergeJoin — the former is much faster for small tables
# 4. Filter pushdown — filters should appear inside Scan, not above it
# 5. Partition pruning — PartitionFilters should not be empty if you filter on partition columnsThe Catalyst Optimizer — What It Does and When It Fails You
Spark's Catalyst optimizer transforms your DataFrame or SQL query through four phases: analysis (resolve column names and types), logical optimization (predicate pushdown, constant folding, subquery elimination), physical planning (select join strategy, choose aggregation algorithm), and code generation (generate JVM bytecode via Janino). Most of the time Catalyst does the right thing automatically, but it makes suboptimal decisions when statistics are missing or stale, when the data distribution is unknown, or when you hand it a UDF that it cannot introspect.
# Common Catalyst anti-patterns and how to fix them
# ── Anti-pattern 1: UDFs break predicate pushdown ────────────────────────────
# BAD: Wrapping a filter in a UDF prevents Catalyst from pushing it to the scan
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
is_purchase = udf(lambda x: x == "purchase", BooleanType())
df.filter(is_purchase("event_type")) # Catalyst cannot push this down
# GOOD: Use native SQL functions — Catalyst understands these
df.filter(df.event_type == "purchase") # pushed into Scan node
# ── Anti-pattern 2: .count() inside a loop recomputes the full plan ───────────
# BAD: triggers a full scan + shuffle on every iteration
for threshold in [100, 200, 500]:
n = df.filter(df.value > threshold).count() # 3 full scans
# GOOD: Persist the filtered base, then count in a single pass per threshold
df_base = df.persist()
counts = {t: df_base.filter(df.value > t).count() for t in [100, 200, 500]}
df_base.unpersist()
# ── Anti-pattern 3: Wide transformations inside withColumn loops ───────────────
# BAD: Each withColumn adds a Project node — 50 columns = 50 nested projections
for col_name in column_list:
df = df.withColumn(col_name, ...) # O(n²) plan size
# GOOD: Build all expressions in one select()
from pyspark.sql.functions import col, when, lit
exprs = [col(c) for c in df.columns] + [
when(col("status") == s, lit(1)).otherwise(lit(0)).alias(f"is_{s}")
for s in status_list
]
df = df.select(*exprs) # single Project node
# ── Anti-pattern 4: Stale table statistics causing bad join plans ──────────────
# After a large write, update stats so Catalyst can pick the right join strategy
spark.sql("ANALYZE TABLE my_schema.events COMPUTE STATISTICS FOR ALL COLUMNS")
spark.sql("ANALYZE TABLE my_schema.users COMPUTE STATISTICS FOR ALL COLUMNS")Partitioning — Right-Sizing Tasks and Fixing Data Skew
Spark processes data in partitions, and each partition maps to one task on one executor core. The ideal partition size for in-memory operations is 100–200 MB of uncompressed data. Too small and you waste task scheduling overhead (millions of 1 KB tasks is never fast). Too large and you risk out-of-memory errors and GC pressure in each task. After any wide transformation (groupBy, join, distinct), Spark defaults to 200 output partitions regardless of data volume — spark.sql.shuffle.partitions = 200 — which is usually wrong for both very small and very large jobs.
# ── Diagnosing partition problems ────────────────────────────────────────────
# Check current partition count and approximate sizes
df = spark.read.parquet("s3://my-bucket/sales/")
print(f"Partitions: {df.rdd.getNumPartitions()}")
# Sample task sizes (proxy for partition sizes)
sizes = df.rdd.mapPartitions(lambda it: [sum(1 for _ in it)]).collect()
import statistics
print(f"Min rows/partition: {min(sizes)}")
print(f"Max rows/partition: {max(sizes)}")
print(f"Median: {statistics.median(sizes)}")
print(f"Skew ratio (max/median): {max(sizes)/statistics.median(sizes):.1f}x")
# ── Fixing too many small partitions (input stage) ───────────────────────────
# coalesce() — narrow transformation, no shuffle, reduces partition count only
df_compacted = df.coalesce(50) # merge small partitions without shuffle
# repartition() — wide transformation, full shuffle, can increase or decrease
df_repartitioned = df.repartition(400) # use when you need balanced distribution
# ── Fixing shuffle partition count after groupBy/join ────────────────────────
# Rule of thumb: total shuffle data size (GB) × 10 for the partition count
# For a 500 GB shuffle: 500 × 10 = 5000 partitions → ~100 MB each
spark.conf.set("spark.sql.shuffle.partitions", "5000")
# ── AQE auto-coalesce (Spark 3.0+) ───────────────────────────────────────────
# With AQE enabled, Spark merges small shuffle partitions automatically
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128mb")
# ── Data skew — detecting and fixing hot partitions ──────────────────────────
# Skew detection: a few tasks take 10–100× longer than the rest
# In Spark UI → Stages → click a stage → Task Metrics → Duration column
# Fix 1: Enable AQE skew join (Spark 3.0+) — automatically splits skewed partitions
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256mb")
# Fix 2: Salting — manually distribute skewed keys across N sub-partitions
import pyspark.sql.functions as F
import random
SALT_BUCKETS = 10
# On the large table: add a random salt suffix to skewed keys
df_large = df_large.withColumn(
"user_id_salted",
F.when(
F.col("user_id").isin(skewed_user_ids),
F.concat(F.col("user_id"), F.lit("_"), (F.rand() * SALT_BUCKETS).cast("int").cast("string"))
).otherwise(F.col("user_id"))
)
# On the small table: explode each skewed key into N copies
from pyspark.sql.functions import array, explode, lit
df_small_exploded = df_small.withColumn(
"user_id_salted",
F.when(
F.col("user_id").isin(skewed_user_ids),
explode(array([F.concat(F.col("user_id"), F.lit(f"_{i}")) for i in range(SALT_BUCKETS)]))
).otherwise(F.col("user_id"))
)
# Join on the salted key, then drop the salt column
result = df_large.join(df_small_exploded, "user_id_salted").drop("user_id_salted")Note
spark.sql.adaptive.skewJoin.enabled) works transparently at runtime without code changes — enable it first before resorting to manual salting. AQE detects skewed partitions mid-shuffle and splits them into sub-tasks while replicating the corresponding build-side data. Manual salting is only needed when the skew is in a non-join aggregation or when you need consistent behavior across repeated runs.Join Strategies — Broadcast, Sort-Merge, and Bucketing
Joins are the most common cause of unnecessary shuffles in Spark jobs. A shuffle-based sort-merge join on two large tables copies all data over the network twice — once to sort by the join key and once to merge. The alternatives are broadcast hash joins (zero shuffle for the smaller side) and bucketed joins (pre-sorted, pre-partitioned tables that skip the shuffle entirely). Choosing the right strategy can turn a 20-minute join into a 2-minute one.
from pyspark.sql.functions import broadcast
import pyspark.sql.functions as F
# ── Strategy 1: Broadcast Hash Join ──────────────────────────────────────────
# Spark broadcasts the smaller table to every executor — no shuffle on either side
# Default threshold: 10 MB (autoBroadcastJoinThreshold)
# Increase to 200 MB for large dimension tables if memory allows
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(200 * 1024 * 1024))
# Auto-broadcast: Spark uses table statistics to decide
orders = spark.table("orders") # 500 GB
countries = spark.table("dim_countries") # 2 MB — will auto-broadcast
result = orders.join(countries, "country_code")
# Explicit broadcast hint: force broadcast even without statistics
result = orders.join(broadcast(countries), "country_code")
# Spark SQL hint
spark.sql("""
SELECT /*+ BROADCAST(c) */ o.*, c.country_name
FROM orders o
JOIN dim_countries c ON o.country_code = c.country_code
""")
# ── Strategy 2: Sort-Merge Join with pre-bucketed tables ────────────────────
# Bucketing writes data pre-sorted and pre-partitioned by the join key
# When both sides are bucketed on the same key with the same bucket count,
# Spark skips the shuffle entirely
# Write tables with bucketing (one-time cost)
orders.write .bucketBy(256, "customer_id") .sortBy("customer_id") .saveAsTable("orders_bucketed")
customers.write .bucketBy(256, "customer_id") .sortBy("customer_id") .saveAsTable("customers_bucketed")
# Join on bucketed tables — no shuffle in the query plan
orders_b = spark.table("orders_bucketed")
customers_b = spark.table("customers_bucketed")
result = orders_b.join(customers_b, "customer_id")
result.explain()
# Expect: SortMergeJoin with NO Exchange (shuffle) operators above it
# ── Strategy 3: Shuffle Hash Join for medium tables ──────────────────────────
# Better than sort-merge when one side fits in a hash table but is too large to broadcast
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
# Explicit hint
spark.sql("""
SELECT /*+ SHUFFLE_HASH(orders) */ *
FROM orders
JOIN returns USING (order_id)
""")
# ── Detecting which join strategy was chosen ─────────────────────────────────
# Look for these nodes in the physical plan:
# BroadcastHashJoin — best, no shuffle on build side
# ShuffledHashJoin — one shuffle + hash build
# SortMergeJoin — two sorts + merge (worst, but handles arbitrary sizes)
# BroadcastNestedLoopJoin — cartesian product, usually a bugCaching and Persistence — When to Materialize Intermediate Results
Spark recomputes the full lineage from source on every action unless you explicitly persist intermediate DataFrames. Caching is the right tool when the same DataFrame is consumed by multiple downstream operations (multiple actions, iterative algorithms, multiple branches in a pipeline). Over-caching is also a problem — a 200 GB DataFrame cached in memory evicts other data and forces spill-to-disk, often making things worse.
from pyspark import StorageLevel
# ── Storage levels — choose based on memory pressure ─────────────────────────
# MEMORY_ONLY — fastest, OOM risk on large data
# MEMORY_AND_DISK — safest default, spills to disk if memory is full
# MEMORY_ONLY_SER — serialized (Kryo), 2-3× smaller, slightly slower to read
# MEMORY_AND_DISK_SER — serialized with disk fallback (best default for big data)
# DISK_ONLY — slowest, use only for checkpointing large lineages
# OFF_HEAP — bypasses JVM GC, best for very large DataFrames (Spark 3+)
# Default .cache() uses MEMORY_AND_DISK
df_enriched = (
raw_events
.join(broadcast(dim_users), "user_id")
.join(broadcast(dim_products), "product_id")
.withColumn("revenue_usd", F.col("quantity") * F.col("unit_price"))
)
df_enriched.cache()
# Force materialization (otherwise Spark caches lazily on first action)
count = df_enriched.count() # triggers the cache fill
# Multiple downstream consumers — Spark reads from cache, not from source
by_user = df_enriched.groupBy("user_id").agg(F.sum("revenue_usd"))
by_product = df_enriched.groupBy("product_id").agg(F.count("*"))
by_day = df_enriched.groupBy(F.to_date("event_ts")).agg(F.sum("revenue_usd"))
# Always unpersist when done — frees executor memory for subsequent stages
df_enriched.unpersist()
# ── Explicit storage level for large DataFrames ───────────────────────────────
df_large.persist(StorageLevel.MEMORY_AND_DISK_SER)
# ── Off-heap cache (Spark 3.0+ with Tungsten off-heap enabled) ───────────────
# spark.conf.set("spark.memory.offHeap.enabled", "true")
# spark.conf.set("spark.memory.offHeap.size", "4g")
df_large.persist(StorageLevel.OFF_HEAP)
# ── Checkpointing — breaks long lineage chains in iterative algorithms ────────
# Without checkpointing, 100 iterations = 100-level deep lineage = stack overflow
spark.sparkContext.setCheckpointDir("s3://my-bucket/checkpoints/")
for iteration in range(100):
df = compute_iteration(df)
if iteration % 10 == 0:
df = df.checkpoint() # materializes and truncates lineage
# ── Cache-aware pipeline structure ───────────────────────────────────────────
# Pattern: read → filter → cache → fan out → aggregate → unpersist
base = (
spark.read.parquet("s3://my-bucket/events/")
.filter("event_date >= '2026-01-01'")
.select("user_id", "product_id", "revenue_usd", "event_ts")
.persist(StorageLevel.MEMORY_AND_DISK_SER)
)
base.count() # warm up the cache
results = {
"daily_revenue": base.groupBy(F.to_date("event_ts")).agg(F.sum("revenue_usd")),
"user_metrics": base.groupBy("user_id").agg(F.count("*"), F.sum("revenue_usd")),
"product_metrics": base.groupBy("product_id").agg(F.count("*")),
}
for name, df in results.items():
df.write.parquet(f"s3://my-bucket/aggregates/{name}/")
base.unpersist()Memory Configuration — Executor and Driver Sizing
Getting Spark memory configuration wrong is the fastest path to OOM kills and excessive GC pauses. Spark divides executor JVM heap into execution memory (used for shuffles, joins, aggregations, sorts) and storage memory (used for caching). With unified memory management (default since Spark 1.6), these two pools share a single region and borrow from each other. The remaining heap is reserved for user code, internal Spark objects, and GC overhead.
# ── Executor memory anatomy ─────────────────────────────────────────────────
#
# Total executor memory = spark.executor.memory + spark.executor.memoryOverhead
#
# spark.executor.memory (e.g. "8g")
# ├── Reserved memory: 300 MB (fixed — internal Spark metadata)
# ├── User memory: (1 - spark.memory.fraction) × (heap - 300 MB)
# │ └── default: 40% — UDFs, deserialized RDDs, user data structures
# └── Spark memory: spark.memory.fraction × (heap - 300 MB)
# ├── Storage pool: spark.memory.storageFraction × Spark memory
# │ └── default: 50% — .cache(), .persist()
# └── Execution pool: remaining Spark memory (dynamic, can borrow from storage)
# └── shuffles, sorts, joins, aggregations
#
# spark.executor.memoryOverhead (e.g. "1g" or "10%")
# └── Off-heap: JVM overhead, NIO buffers, Python worker (PySpark!), native libs
# ── Recommended configuration for PySpark ETL jobs ───────────────────────────
spark = SparkSession.builder .config("spark.executor.memory", "8g") .config("spark.executor.memoryOverhead", "2g") .config("spark.executor.cores", "4") .config("spark.memory.fraction", "0.8") .config("spark.memory.storageFraction", "0.3") .config("spark.python.worker.memory", "1024") .getOrCreate()
# ── Diagnosing memory pressure ────────────────────────────────────────────────
# Spill to disk — Spark UI → Stage → click stage → Shuffle Spill (Memory/Disk)
# If spill > 0: execution memory is too small; increase executor.memory
# or reduce spark.sql.shuffle.partitions (fewer, larger partitions need less RAM each)
# GC pressure — Spark UI → Stage → GC Time column
# If GC > 10% of task time: too many live objects; switch to Kryo serializer
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "false")
# OOM kills — executor lost messages in the driver log
# Fix 1: Increase executor.memoryOverhead (especially for PySpark / pandas UDFs)
# Fix 2: Reduce executor.cores (fewer concurrent tasks per executor = less peak memory)
# Fix 3: Increase number of shuffle partitions (smaller data per partition task)
# ── Driver memory — often overlooked ─────────────────────────────────────────
# The driver collects results with .collect(), .toPandas(), and broadcast variables
# A 10 GB broadcast variable + a .collect() on a 5 GB result = 15 GB driver peak memory
spark = SparkSession.builder .config("spark.driver.memory", "4g") .config("spark.driver.memoryOverhead", "1g") .config("spark.driver.maxResultSize", "2g") .getOrCreate()
# ── Dynamic allocation — right-size the cluster at runtime ────────────────────
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "100")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "10")
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s")Note
spark.executor.memoryOverhead to at least max(384m, 10% of executor.memory) for pure JVM jobs, and 20–30% for PySpark jobs that use pandas UDFs or Arrow conversions. spark.python.worker.memory controls the soft limit per Python worker — it does not cap JVM heap, only triggers GC pressure alerts.Adaptive Query Execution — Let Spark Tune Itself at Runtime
Adaptive Query Execution (AQE), introduced in Spark 3.0 and enabled by default in Spark 3.2+, re-optimizes the query plan at runtime using actual shuffle statistics rather than the pre-execution estimates that Catalyst is forced to use. AQE covers three major optimizations automatically: coalescing small shuffle partitions into larger ones, converting sort-merge joins to broadcast hash joins when a table turns out smaller than expected, and splitting skewed partitions mid-join. Enabling AQE is the single highest-leverage configuration change for most production jobs.
# ── AQE configuration (Spark 3.0+) ──────────────────────────────────────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
# 1. Auto-coalesce shuffle partitions
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128mb")
# AQE merges adjacent shuffle partitions until each reaches ~128 MB
# Result: instead of 5000 × 20 KB partitions, you get ~800 × 128 MB partitions
# 2. Runtime join conversion (sort-merge → broadcast)
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "30mb")
# If a table materializes smaller than 30 MB after filtering, AQE converts the join
# to a broadcast hash join — even if the pre-execution estimate was 500 MB
# 3. Skew join splitting
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
# A partition is considered skewed if its size > 5× the median partition size
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256mb")
# AND if its size > 256 MB
# ── AQE + DPP (Dynamic Partition Pruning) — eliminating partition scans ───────
# DPP pushes the result of a dimension filter (e.g. country='US') into the fact table
# scan, skipping all partitions that don't match — even across a shuffle boundary
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.useStats", "true")
# This query scans ONLY the partitions matching country='US' in the fact table
# even though the fact table is not filtered directly on country_code
spark.sql("""
SELECT f.order_id, f.revenue_usd, c.country_name
FROM fact_orders f
JOIN dim_countries c ON f.country_code = c.country_code
WHERE c.country_name = 'United States'
""")
# ── Verifying AQE decisions in the query plan ─────────────────────────────────
# After execution, re-run .explain("formatted") on the completed plan
# Look for:
# AdaptiveSparkPlan isFinalPlan=true — AQE re-optimized this plan
# CustomShuffleReaderExec — coalesced partitions after shuffle
# BroadcastHashJoin (converted at runtime) — AQE chose broadcast over sort-mergeFile Formats, Compression, and Delta Lake Optimization
The file format and layout of your source data has an enormous impact on Spark read performance — often more than any runtime configuration change. Parquet with Snappy or Zstd compression, properly sized files, and partition pruning can reduce scan times by 10–50× compared to reading CSV or JSON. When writing to Delta Lake or Apache Iceberg tables, additional optimizations like Z-ordering, liquid clustering, and file compaction keep read performance from degrading as the table grows. The article on Delta Lake vs Iceberg table formats covers the format-level trade-offs; here we focus on the Spark-side tuning.
# ── Parquet write optimization ───────────────────────────────────────────────
df.write .option("compression", "zstd") .option("parquet.block.size", str(128*1024*1024)) .option("parquet.page.size", str(1*1024*1024)) .option("parquet.enable.dictionary", "true") .partitionBy("event_date", "event_type") .parquet("s3://my-bucket/events/")
# ── Avoid the small file problem ──────────────────────────────────────────────
# Each Spark task writes one file — if you have 5000 shuffle partitions, you get
# 5000 files. Subsequent reads create 5000 tasks, each reading a tiny file.
# Fix: repartition before writing to control file count
TARGET_FILE_SIZE_MB = 128
estimated_size_gb = 50
num_files = int(estimated_size_gb * 1024 / TARGET_FILE_SIZE_MB)
df.repartition(num_files).write.parquet("s3://my-bucket/output/")
# Better with AQE: let Spark coalesce automatically
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128mb")
# ── Delta Lake: OPTIMIZE and Z-ordering ──────────────────────────────────────
from delta.tables import DeltaTable
dt = DeltaTable.forName(spark, "my_catalog.events")
# Compact small files into 1 GB files (configurable via spark.databricks.delta.optimize.maxFileSize)
dt.optimize().executeCompaction()
# Z-ordering: co-locate rows with similar values for high-cardinality filter columns
# Best columns for Z-ordering: columns you frequently filter on that are NOT partition columns
dt.optimize().executeZOrderBy("user_id", "product_id")
# ── Auto-optimize for streaming / incremental writes (Databricks) ─────────────
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
spark.conf.set("spark.databricks.delta.optimizeWrite.binSize", "512") # MB
# ── Column pruning and predicate pushdown — rely on Parquet statistics ────────
# Write sorted data within each file to maximize min/max statistics effectiveness
df_sorted = df.sortWithinPartitions("event_date", "user_id")
df_sorted.write.parquet("s3://my-bucket/events_sorted/")
# After writing sorted data, queries with event_date or user_id filters
# can skip entire row groups based on min/max statistics — even without partitioningProduction Configuration Reference — EMR, Databricks, and On-Premises
Translating the individual tuning knobs into a coherent cluster configuration requires knowing your job profile: data volume, shuffle intensity, join pattern, and workload type (ETL batch, interactive query, streaming). The configurations below are battle-tested starting points for three common deployment environments. Pair them with the observability practices from OpenTelemetry for data pipelines to instrument Spark jobs with distributed tracing and SLO alerting.
# ── AWS EMR — r6g.2xlarge workers (64 GB RAM, 8 vCPU) ───────────────────────
# Rule of 1 core for overhead: 7 usable cores per node
# Executors: 5 cores each → 1 executor per node + 1 driver node
# Memory: 64 GB × 0.9 overhead = 57 GB usable; 57 GB / 1 executor = 57 GB
# Practical: executor.memory = 48g, overhead = 6g → 54g / 64g = 84%
[spark-defaults]
spark.executor.cores = 5
spark.executor.memory = 48g
spark.executor.memoryOverhead = 6g
spark.driver.memory = 8g
spark.driver.memoryOverhead = 2g
spark.memory.fraction = 0.8
spark.memory.storageFraction = 0.3
spark.sql.adaptive.enabled = true
spark.sql.shuffle.partitions = 2000
spark.serializer = org.apache.spark.serializer.KryoSerializer
spark.sql.parquet.compression.codec = zstd
spark.sql.files.maxPartitionBytes = 134217728 # 128 MB
# ── Databricks — photon-accelerated cluster ───────────────────────────────────
# Databricks Runtime 13+ with Photon vectorized execution
# Many params auto-tuned by Databricks; focus on data-layout optimization
[Databricks cluster config — Advanced Spark Config]
spark.databricks.delta.optimizeWrite.enabled true
spark.databricks.delta.autoCompact.enabled true
spark.sql.adaptive.enabled true
spark.sql.adaptive.skewJoin.enabled true
spark.databricks.adaptive.autoBroadcastJoinThreshold 50m
spark.sql.shuffle.partitions auto
# ── On-premises YARN cluster (Hadoop 3 + Spark 3.4) ──────────────────────────
# Worker nodes: 256 GB RAM, 32 cores each
# YARN reserves 10% for overhead: 230 GB usable, 29 usable cores
# Executors: 5 cores each → 5 executors per node
# Memory per executor: 230 GB / 5 = 46 GB
# With overhead: executor.memory = 38g, overhead = 4g
[spark-defaults.conf]
spark.executor.cores = 5
spark.executor.memory = 38g
spark.executor.memoryOverhead = 4g
spark.driver.memory = 12g
spark.driver.memoryOverhead = 2g
spark.dynamicAllocation.enabled = true
spark.dynamicAllocation.minExecutors = 5
spark.dynamicAllocation.maxExecutors = 80
spark.shuffle.service.enabled = true
spark.sql.adaptive.enabled = true
spark.sql.adaptive.coalescePartitions.enabled = true
spark.sql.adaptive.advisoryPartitionSizeInBytes = 128m
spark.sql.adaptive.skewJoin.enabled = true
spark.serializer = org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max = 512m
spark.sql.autoBroadcastJoinThreshold = 104857600 # 100 MB
spark.eventLog.enabled = true
spark.eventLog.dir = hdfs:///spark-history/Spark UI Triage — Reading the Metrics That Matter
The Spark UI is the source of truth for job performance. Most engineers read it for error messages but ignore the metrics that reveal the actual bottleneck. Here is a systematic triage workflow for a slow job.
- Jobs tab — find the longest-running job. A job with many short completed stages but one very long active stage points to skew or a blocking shuffle.
- Stages tab → Task Metrics — check the duration distribution. If max task duration is
>5×the median, you have skew. Check "Shuffle Spill (Memory)" and "Shuffle Spill (Disk)" — any non-zero values indicate memory pressure. - SQL tab → query details — look for Exchange nodes in the DAG (each is a shuffle), SortMergeJoin nodes (can you broadcast one side?), and the row counts at each operator. Large fan-outs at a join suggest a cross join or missing join condition.
- Storage tab— shows cached RDDs and DataFrames. Look for "Fraction Cached" below 100% — the dataset did not fit in memory and partially spilled to disk.
- Executors tab— compare "GC Time" across executors. GC time above 10% of task time signals heap pressure. Check "Peak JVM Memory Usage" to see if executors are near their memory ceiling.
# ── Programmatic metrics access via SparkListener ────────────────────────────
# Useful for automated performance regression testing in CI
from pyspark import SparkContext
from pyspark.sql import SparkSession
class JobMetricsListener:
def __init__(self, sc: SparkContext):
self._metrics = {}
sc._jvm.org.apache.spark.SparkContext.getOrCreate() .addSparkListener(
sc._jvm.org.apache.spark.scheduler.StatsReportListener()
)
# ── Prometheus JMX exporter for production monitoring ────────────────────────
# Add to spark-submit: --packages io.prometheus.jmx:jmx_prometheus_javaagent:0.20.0
# --conf "spark.driver.extraJavaOptions=-javaagent:/opt/jmx_exporter.jar=9090:/opt/spark.yaml"
# --conf "spark.executor.extraJavaOptions=-javaagent:/opt/jmx_exporter.jar=9091:/opt/spark.yaml"
# Key Prometheus metrics to alert on:
# metrics_executor_threadpool_activeTasks — running tasks per executor
# metrics_executor_shuffle_bytesWritten_total — total shuffle write volume
# metrics_executor_memoryUsed_bytes — heap usage (alert > 85%)
# metrics_driver_BlockManager_memory_memUsed — driver block manager memory
# jvm_gc_collection_seconds_sum{gc="G1 Old Generation"} — major GC time
# Grafana alert rule (PromQL) — executor nearing OOM:
# (metrics_executor_memoryUsed_bytes / spark_executor_memory_bytes) > 0.9Work with us
Running Apache Spark jobs that take hours instead of minutes, hitting OOM kills, or struggling with data skew on production clusters?
We tune Apache Spark workloads for production — from Catalyst query plan analysis and partition strategy design to broadcast join configuration, AQE skew join setup, executor and driver memory sizing for PySpark workloads, shuffle partition tuning, Delta Lake Z-ordering and file compaction pipelines, Prometheus JMX monitoring with Grafana dashboards, and cluster configuration for EMR, Databricks, and on-premises YARN. Let’s talk.
Get in touch