Why Apache Flink? A Comparison with Spark Structured Streaming and Kafka Streams
Apache Flink is a distributed stream processing engine built around a unified batch-and-streaming model, but optimised from the ground up for low-latency stateful streaming. Where Spark Structured Streaming processes streams as micro-batches — executing a batch job on a sliding time window — Flink is a true continuous streaming engine. Records are processed one at a time as they arrive, with latency measured in single-digit milliseconds rather than seconds.
The three main contenders in production stream processing each have a clear sweet spot:
- Apache Flink. Best for stateful event-time processing, complex windowing, exactly-once end-to-end guarantees, and CEP (complex event processing). The only engine that correctly handles out-of-order events, late data, and long-lived keyed state at scale without workarounds. Chosen by Alibaba, Uber, Netflix, and LinkedIn for their most demanding streaming workloads.
- Spark Structured Streaming. Best when your team already runs Spark for batch, when processing latency of 1–30 seconds is acceptable, or when you need tight Delta Lake / Iceberg integration via the Lakehouse pattern. Watermark support exists but is less expressive than Flink.
- Kafka Streams. Best for lightweight stateful processing embedded in a Java microservice — no separate cluster required. Restricted to Kafka sources and sinks, single-JVM topology, and simpler windowing semantics. Ideal for enrichment, aggregation, and filtering within a Kafka-native architecture.
The most important distinction in stream processing is event time vs processing time. Processing time uses the wall clock when a record arrives at the operator — simple but wrong for out-of-order data. Event time uses the timestamp embedded in the record itself, allowing Flink to correctly aggregate events that arrive late due to network delays, mobile offline buffering, or upstream retries. Flink's watermark mechanism is the industry standard for managing event-time with late data tolerance.
Note
1.20.0) and validate connector compatibility — the Kafka, JDBC, and Iceberg connectors have independent release cycles from the Flink core.Flink Execution Model — JobGraph, Operators, and Parallelism
A Flink application is compiled into a JobGraph — a directed acyclic graph of operators connected by data streams. The JobManager (master) accepts the JobGraph, schedules tasks onto TaskManagers (workers), and coordinates checkpoints. Each operator runs as one or more parallel subtasks across TaskManagers; the degree of parallelism is set globally or per operator.
Flink pipelines chains operators that can be fused into a single JVM thread (an operator chain) to eliminate serialisation and network overhead for adjacent operators. When operators cannot be chained — due to different parallelism, keyBy shuffles, or explicit chain breaks — Flink transmits records over a network buffer between TaskManager slots.
# Flink cluster topology
JobManager (1 or 3 for HA)
├── Resource Manager → allocates TaskManager slots
├── Dispatcher → receives job submissions
└── JobMaster (per job) → schedules and coordinates
TaskManager (N workers, M slots each)
├── Slot 0 → subtask of operator A + subtask of operator B (chained)
├── Slot 1 → subtask of operator A + subtask of operator B (chained)
└── Slot 2 → subtask of operator C (different parallelism, separate slot)
# Recommended slot count per TaskManager: 1–4
# Total parallelism = taskmanager.numberOfTaskSlots * number of TaskManagers
# Rule of thumb: start with parallelism = number of Kafka partitionsThe network buffersystem is central to Flink's backpressure mechanism. When a downstream operator is slow, it stops consuming network buffers; upstream operators fill their send buffers and eventually slow their own processing, naturally propagating backpressure through the graph without a separate coordination layer. This is detectable in the Flink Web UI as a "backpressure ratio" on each task.
DataStream API — Keyed State and Managed State with RocksDB
Flink's managed state is the foundation of stateful stream processing. Unlike ad-hoc in-memory maps, managed state is automatically snapshotted during checkpoints and restored on failure. State is always accessed via descriptors registered in the operator's open() method — never as raw instance variables.
The three most commonly used state primitives are ValueState (single value per key), MapState (key-value map per key), and ListState (appendable list per key). All state is keyed — it is partitioned by the stream key set via keyBy(), ensuring that all records for a given key are processed by the same operator subtask.
// Java — Stateful fraud detection operator with ValueState and MapState
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
public class FraudDetector extends RichFlatMapFunction<Transaction, Alert> {
// State descriptors — registered in open(), accessed via RuntimeContext
private transient ValueState<Double> runningTotalState;
private transient ValueState<Long> lastTxnTimestampState;
private transient MapState<String, Integer> merchantCountState;
private static final double FRAUD_THRESHOLD = 10_000.0;
private static final int MAX_MERCHANTS = 5;
@Override
public void open(Configuration parameters) throws Exception {
// ValueState: single Double per userId key
ValueStateDescriptor<Double> totalDesc = new ValueStateDescriptor<>(
"running-total", Double.class, 0.0
);
// Set TTL: clear state after 24 hours of inactivity
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
totalDesc.enableTimeToLive(ttlConfig);
runningTotalState = getRuntimeContext().getState(totalDesc);
ValueStateDescriptor<Long> tsDesc = new ValueStateDescriptor<>(
"last-txn-ts", Long.class
);
lastTxnTimestampState = getRuntimeContext().getState(tsDesc);
// MapState: merchant → count per userId key
MapStateDescriptor<String, Integer> merchantDesc =
new MapStateDescriptor<>("merchant-counts", String.class, Integer.class);
merchantCountState = getRuntimeContext().getMapState(merchantDesc);
}
@Override
public void flatMap(Transaction txn, Collector<Alert> out) throws Exception {
double total = runningTotalState.value() + txn.getAmount();
runningTotalState.update(total);
// Track distinct merchants
merchantCountState.put(
txn.getMerchantId(),
merchantCountState.getOrDefault(txn.getMerchantId(), 0) + 1
);
long distinctMerchants = 0;
for (String ignored : merchantCountState.keys()) distinctMerchants++;
if (total > FRAUD_THRESHOLD || distinctMerchants > MAX_MERCHANTS) {
out.collect(new Alert(
txn.getUserId(),
"Threshold breach: total=" + total + " merchants=" + distinctMerchants
));
}
lastTxnTimestampState.update(txn.getTimestamp());
}
}
// Wire into a DataStream pipeline
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(16);
DataStream<Transaction> txnStream = env
.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
txnStream
.keyBy(Transaction::getUserId) // partition state by userId
.flatMap(new FraudDetector())
.sinkTo(alertSink);
env.execute("Fraud Detection Job");For production workloads with large state (gigabytes to terabytes per TaskManager), switch from the default HashMapStateBackend (in-heap, fast, limited by JVM heap) to the EmbeddedRocksDBStateBackend (off-heap, SST files on local disk, incremental checkpoints).
// RocksDB state backend configuration — flink-conf.yaml or programmatic
// Programmatic (overrides cluster config for this job):
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EmbeddedRocksDBStateBackend rocksDB = new EmbeddedRocksDBStateBackend(
true // incremental checkpoints — only send changed SST files, not full snapshot
);
env.setStateBackend(rocksDB);
// Checkpoint storage: write snapshots to S3 or HDFS
env.getCheckpointConfig().setCheckpointStorage(
"s3://my-bucket/flink-checkpoints/fraud-detector"
);
# flink-conf.yaml (cluster-wide defaults):
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: s3://my-bucket/flink-checkpoints
state.savepoints.dir: s3://my-bucket/flink-savepoints
# RocksDB tuning for high-throughput writes:
state.backend.rocksdb.block.cache-size: 256mb
state.backend.rocksdb.writebuffer.size: 64mb
state.backend.rocksdb.writebuffer.count: 4
state.backend.rocksdb.compaction.style: LEVELNote
Windowing — Tumbling, Sliding, and Session Windows
Flink's window API operates on keyed streams and assigns each record to one or more windows. A window function (ReduceFunction, AggregateFunction, or ProcessWindowFunction) is called when the window closes (its watermark passes the end timestamp), producing one output record per window per key.
// Java — Three window types on the same click stream
DataStream<ClickEvent> clicks = env.fromSource(source, watermarkStrategy, "Clicks");
KeyedStream<ClickEvent, String> keyedClicks = clicks.keyBy(ClickEvent::getUserId);
// 1. TUMBLING window: non-overlapping, fixed-size windows
// e.g., clicks per user per 5-minute bucket
DataStream<PageViewCount> tumblingCounts = keyedClicks
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
.aggregate(new ClickCountAggregator(), new WindowMetadataEnricher());
// 2. SLIDING window: overlapping windows, emits more frequently
// e.g., 10-minute window sliding every 2 minutes (5x overlap)
DataStream<PageViewCount> slidingCounts = keyedClicks
.window(SlidingEventTimeWindows.of(
Duration.ofMinutes(10), // window size
Duration.ofMinutes(2) // slide interval
))
.aggregate(new ClickCountAggregator());
// 3. SESSION window: gap-based, closes after inactivity
// e.g., group clicks until 30 minutes of silence = end of session
DataStream<SessionSummary> sessions = keyedClicks
.window(EventTimeSessionWindows.withGap(Duration.ofMinutes(30)))
.process(new SessionWindowFunction());
// AggregateFunction — stateful, incremental, low memory overhead
public class ClickCountAggregator
implements AggregateFunction<ClickEvent, Long, Long> {
@Override public Long createAccumulator() { return 0L; }
@Override public Long add(ClickEvent e, Long acc) { return acc + 1; }
@Override public Long getResult(Long acc) { return acc; }
@Override public Long merge(Long a, Long b) { return a + b; }
}
// ProcessWindowFunction — full window access, higher memory cost
public class SessionWindowFunction
extends ProcessWindowFunction<ClickEvent, SessionSummary, String, TimeWindow> {
@Override
public void process(String userId, Context ctx,
Iterable<ClickEvent> events,
Collector<SessionSummary> out) {
long count = 0; long firstTs = Long.MAX_VALUE; long lastTs = 0;
for (ClickEvent e : events) {
count++;
firstTs = Math.min(firstTs, e.getTimestamp());
lastTs = Math.max(lastTs, e.getTimestamp());
}
out.collect(new SessionSummary(userId, count, firstTs, lastTs,
ctx.window().getStart(), ctx.window().getEnd()));
}
}Watermarks drive event-time progress. Flink advances the watermark periodically based on the maximum observed event timestamp minus a configurable out-of-orderness bound. Records with timestamps below the current watermark are considered late and handled by the allowedLateness and side-output mechanisms:
// Watermark strategy — bounded out-of-orderness for event-time processing
WatermarkStrategy<ClickEvent> watermarkStrategy = WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((event, recordTimestamp) -> event.getTimestamp())
.withIdleness(Duration.ofMinutes(5)); // advance watermark if partition is idle
// allowedLateness + side output for late records
OutputTag<ClickEvent> lateClicksTag = new OutputTag<ClickEvent>("late-clicks") {};
SingleOutputStreamOperator<PageViewCount> result = keyedClicks
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
.allowedLateness(Duration.ofMinutes(1)) // keep window open 1 extra minute
.sideOutputLateData(lateClicksTag) // route truly late records elsewhere
.aggregate(new ClickCountAggregator());
// Handle late records separately (e.g., reprocess or dead-letter queue)
DataStream<ClickEvent> lateClicks = result.getSideOutput(lateClicksTag);
lateClicks.sinkTo(deadLetterSink);Exactly-Once Semantics — Checkpointing and Two-Phase Commit
Flink implements exactly-once end-to-end guarantees using the Chandy-Lamport distributed snapshot algorithm. The JobManager periodically injects checkpoint barriers into the source streams. Each operator, upon receiving barriers on all inputs, snapshots its state and forwards the barrier downstream. When all operators have acknowledged, the checkpoint is complete — a consistent global snapshot of the entire job state.
For end-to-end exactly-once with Kafka sinks, Flink implements the two-phase commit (2PC) protocol: records are written to Kafka within an open transaction during the checkpoint interval; the transaction is committed atomically when the checkpoint completes. If the job fails mid-interval, the open transaction is aborted and records are replayed from the last completed checkpoint.
// Checkpoint configuration — production settings
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing every 60 seconds
env.enableCheckpointing(60_000L, CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig ckptConfig = env.getCheckpointConfig();
ckptConfig.setCheckpointTimeout(120_000L); // abort if > 2 min
ckptConfig.setMinPauseBetweenCheckpoints(30_000L); // min gap between checkpoints
ckptConfig.setMaxConcurrentCheckpoints(1); // only one in-flight at a time
ckptConfig.setTolerableCheckpointFailureNumber(3); // allow 3 failures before failing job
// Retain checkpoints on cancellation (enables manual recovery)
ckptConfig.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// KafkaSink with EXACTLY_ONCE semantic (two-phase commit)
KafkaSink<PageViewCount> kafkaSink = KafkaSink.<PageViewCount>builder()
.setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("page-view-counts")
.setValueSerializationSchema(new PageViewCountSerializer())
.build())
// EXACTLY_ONCE requires Kafka transactions + matching checkpoint interval
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// Transaction timeout must be > checkpoint interval
.setTransactionalIdPrefix("flink-pageview-job")
.setKafkaProducerConfig(producerConfig(Map.of(
"transaction.timeout.ms", "120000", // must exceed checkpoint interval
"acks", "all",
"enable.idempotence", "true"
)))
.build();Note
transaction.timeout.ms is greater than your checkpoint interval plus checkpoint timeout. Downstream consumers of the exactly-once topic must set isolation.level=read_committed to avoid reading uncommitted (aborted) transactional records. If you can tolerate at-least-once with deduplication downstream, DeliveryGuarantee.AT_LEAST_ONCE has lower overhead and no transaction coordination latency.Flink SQL and Table API — Declarative Stream Processing
Flink SQL and the Table API provide a declarative layer over the DataStream API, allowing engineers to write familiar SQL against unbounded streams. Under the hood, Flink compiles SQL into a DataStream program with the same operators, state backends, and checkpointing semantics. Flink SQL is particularly powerful for stream-table joins (enriching events with slowly-changing reference data) and aggregations with GROUP BY over tumbling windows.
-- Flink SQL: define Kafka source table with event-time and watermark
CREATE TABLE clicks (
user_id STRING,
page_id STRING,
click_ts TIMESTAMP(3),
country STRING,
-- Declare event-time attribute and watermark strategy
WATERMARK FOR click_ts AS click_ts - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'click-events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-sql-consumer',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
-- Define a slowly-changing dimension table (bounded lookup)
CREATE TABLE user_profiles (
user_id STRING,
tier STRING,
signup_date DATE,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/profiles',
'table-name' = 'user_profiles',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '60s'
);
-- Define output sink table
CREATE TABLE click_aggregates (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
page_id STRING,
tier STRING,
click_count BIGINT,
PRIMARY KEY (window_start, page_id, tier) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'click-aggregates',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- Stream-table join + GROUP BY tumbling window
INSERT INTO click_aggregates
SELECT
TUMBLE_START(c.click_ts, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(c.click_ts, INTERVAL '5' MINUTE) AS window_end,
c.page_id,
p.tier,
COUNT(*) AS click_count
FROM clicks AS c
LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF c.click_ts AS p
ON c.user_id = p.user_id
GROUP BY
TUMBLE(c.click_ts, INTERVAL '5' MINUTE),
c.page_id,
p.tier;The FOR SYSTEM_TIME AS OF clause implements a temporal table join — the lookup table is accessed at the event time of each stream record, correctly handling slowly-changing dimensions without duplicating records when the reference data changes. This is one of Flink SQL's most powerful features for enrichment pipelines, and there is no equivalent in Spark Structured Streaming or Kafka Streams.
Kafka Integration — FlinkKafkaSource and KafkaSink
The Flink Kafka connector (flink-connector-kafka) implements the new FLIP-27 Source API and supports all three delivery guarantees. Offset management is handled by Flink — offsets are committed to Kafka only when a checkpoint completes, ensuring exactly-once coordination between state and source offsets.
// Kafka source with event-time watermarks and Avro deserialization
// Maven: org.apache.flink:flink-connector-kafka:3.4.0-1.20
KafkaSource<ClickEvent> kafkaSource = KafkaSource.<ClickEvent>builder()
.setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
.setTopics("click-events")
.setGroupId("flink-click-processor")
.setStartingOffsets(OffsetsInitializer.committedOffsets(
OffsetResetStrategy.EARLIEST // fall back if no committed offset
))
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(new AvroDeserializationSchema<>(ClickEvent.class))
)
// Consumer properties
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "SCRAM-SHA-256")
.setProperty("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"flink-consumer\" password=\"secret\";")
.setProperty("fetch.min.bytes", "65536") // reduce fetch requests
.setProperty("fetch.max.wait.ms", "500")
.build();
// WatermarkStrategy: bounded out-of-orderness with per-partition tracking
WatermarkStrategy<ClickEvent> watermarkStrategy = WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner(
(event, ts) -> event.getClickTimestampMs()
)
.withIdleness(Duration.ofMinutes(2)); // unblock watermark on idle partitions
DataStreamSource<ClickEvent> clickStream = env.fromSource(
kafkaSource,
watermarkStrategy,
"Kafka Click Source"
);Production Deployment with the Flink Kubernetes Operator
The Flink Kubernetes Operator manages the full lifecycle of Flink deployments as a Kubernetes custom resource (CRD: FlinkDeployment). It handles job submission, scaling, checkpointing, savepoints on upgrade, and automatic recovery — replacing the manual process of constructing JobManager/TaskManager Deployments and Services.
# FlinkDeployment — production Flink job on Kubernetes
# Install operator: helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: click-aggregation-job
namespace: flink-prod
spec:
image: my-registry/flink-click-aggregation:1.20.0-v2.3.1
flinkVersion: v1_20
flinkConfiguration:
# Checkpointing
execution.checkpointing.interval: "60000"
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: rocksdb
state.backend.incremental: "true"
state.checkpoints.dir: s3://my-bucket/flink-checkpoints
state.savepoints.dir: s3://my-bucket/flink-savepoints
# Restart strategy
restart-strategy: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 1s
restart-strategy.exponential-delay.max-backoff: 5min
restart-strategy.exponential-delay.reset-backoff-threshold: 10min
# Web UI and metrics
metrics.reporters: prometheus
metrics.reporter.prometheus.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prometheus.port: "9249"
# Network and buffer tuning
taskmanager.network.memory.fraction: "0.2"
taskmanager.memory.managed.fraction: "0.4"
serviceAccount: flink-service-account
podTemplate:
spec:
containers:
- name: flink-main-container
env:
- name: KAFKA_BOOTSTRAP_SERVERS
valueFrom:
secretKeyRef:
name: kafka-credentials
key: bootstrap-servers
resources: {} # overridden per component below
volumeMounts:
- name: flink-config-vol
mountPath: /opt/flink/conf
volumes:
- name: flink-config-vol
configMap:
name: flink-job-config
jobManager:
resource:
memory: "2048m"
cpu: 0.5
replicas: 1 # set to 3 + ha.mode=kubernetes for HA
taskManager:
resource:
memory: "4096m"
cpu: 2.0
replicas: 4 # 4 TMs * 2 slots = 8 total parallelism
job:
jarURI: local:///opt/flink/usrlib/click-aggregation.jar
entryClass: com.datasops.ClickAggregationJob
args: ["--env", "production", "--parallelism", "8"]
parallelism: 8
upgradeMode: savepoint # take savepoint before replacing deployment
savepointTriggerNonce: 0 # increment to trigger manual savepoint# Horizontal scaling — update replicas without job restart (since Operator 1.7)
kubectl patch flinkdeployment click-aggregation-job -n flink-prod --type=merge -p '{"spec":{"taskManager":{"replicas":8}}}'
# Trigger a savepoint manually
kubectl patch flinkdeployment click-aggregation-job -n flink-prod --type=merge -p '{"spec":{"job":{"savepointTriggerNonce":1}}}'
# Monitor savepoint status
kubectl get flinkdeployment click-aggregation-job -n flink-prod -o jsonpath='{.status.jobStatus}'Monitoring — Backpressure, Checkpoints, and Prometheus Metrics
Flink exposes hundreds of metrics via its metrics system. The Prometheus reporter scrapes them at the /metrics endpoint on each TaskManager (port 9249 in our config above). The most critical metrics for production operations fall into four categories:
- Checkpoint metrics.
lastCheckpointDuration(alert if > 2× checkpoint interval),lastCheckpointSize(alert on sudden growth),numberOfFailedCheckpoints. - Backpressure.
backPressuredTimeMsPerSecondper operator subtask — alert if > 100 ms/s sustained. Also visible as coloured nodes in the Flink Web UI job graph. - Lag.
KafkaSourceReader.KafkaConsumer.records-lag-max— measures how far behind the Flink consumer is from the Kafka head partition. - Watermark lag.
currentOutputWatermarksubtracted from wall-clock time — reveals event-time processing lag independently of checkpoint health.
# Prometheus alert rules for Flink — prometheus-flink-rules.yaml
groups:
- name: flink.checkpoints
rules:
- alert: FlinkCheckpointDurationHigh
expr: |
flink_jobmanager_job_lastCheckpointDuration > 90000
for: 5m
labels:
severity: warning
annotations:
summary: "Flink checkpoint duration {{ $value }}ms exceeds 90s"
runbook: "https://wiki.internal/flink-checkpoint-runbook"
- alert: FlinkCheckpointFailed
expr: |
increase(flink_jobmanager_job_numberOfFailedCheckpoints[10m]) > 2
for: 0m
labels:
severity: critical
annotations:
summary: "Flink job {{ $labels.job_name }} has {{ $value }} failed checkpoints"
- name: flink.backpressure
rules:
- alert: FlinkBackpressureHigh
expr: |
flink_taskmanager_job_task_backPressuredTimeMsPerSecond > 200
for: 3m
labels:
severity: warning
annotations:
summary: "Flink task {{ $labels.task_name }} backpressured {{ $value }}ms/s"
- name: flink.kafka.lag
rules:
- alert: FlinkKafkaLagHigh
expr: |
flink_source_KafkaSourceReader_KafkaConsumer_records_lag_max > 100000
for: 10m
labels:
severity: warning
annotations:
summary: "Flink Kafka consumer lag {{ $value }} records on {{ $labels.topic }}"
---
# Grafana dashboard provisioning — key panels
# Panel 1: Checkpoint duration heatmap
# metric: flink_jobmanager_job_lastCheckpointDuration{job_name="$job"}
# Panel 2: Backpressure ratio per task
# metric: flink_taskmanager_job_task_backPressuredTimeMsPerSecond
# Panel 3: Kafka consumer lag per partition
# metric: flink_source_KafkaSourceReader_KafkaConsumer_records_lag_max
# Panel 4: Watermark lag (ms)
# metric: time() * 1000 - flink_taskmanager_job_task_operator_currentOutputWatermark / 1000000When backpressure is detected, the diagnostic process follows a consistent pattern: identify the slowest operator (the first one upstream of a backpressure boundary in the Web UI), check whether it is CPU-bound (increase parallelism), I/O-bound (optimise sink or external call), or state-bound (tune RocksDB compaction, enable incremental checkpoints, reduce state TTL). The Flink Web UI's "Flame Graph" tab (enable via rest.flamegraph.enabled: true) samples TaskManager thread stacks on-demand without a profiler agent.
Decision Framework — Flink vs Spark Structured Streaming vs Kafka Streams
Choosing a stream processing engine is an architectural decision that is expensive to reverse. The right choice depends on latency requirements, state complexity, team expertise, and operational overhead tolerance.
# Decision matrix: stream processing engine selection
CHOOSE Apache Flink when:
✓ Sub-second latency is required (true continuous streaming, not micro-batch)
✓ Complex event-time windowing with out-of-order data and late arrivals
✓ Long-lived keyed state (GBs per key, multi-week retention)
✓ Exactly-once end-to-end guarantees to Kafka/JDBC/Iceberg
✓ Complex event processing (CEP patterns, temporal joins)
✓ High-throughput stateful joins between multiple streams
✓ Flink SQL temporal table joins for enrichment at scale
Examples: fraud detection, real-time ML feature computation,
IoT telemetry aggregation, financial risk calculations
CHOOSE Spark Structured Streaming when:
✓ Team has existing Spark expertise (PySpark, Delta Lake, MLflow)
✓ Latency of 1–60 seconds is acceptable (micro-batch)
✓ Deep Lakehouse integration (Delta Lake CDC, Iceberg write via Spark)
✓ Unified batch + streaming code sharing the same DataFrame API
✓ Need to run complex ML inference inside the pipeline (Pandas UDFs)
Examples: ETL to data warehouse, CDC ingestion to lakehouse,
batch + streaming shared pipelines
CHOOSE Kafka Streams when:
✓ Processing logic embedded in a Java/Kotlin microservice
✓ No separate streaming cluster to operate
✓ Source and sink are always Kafka topics
✓ State requirements are modest (GBs total, not per key at scale)
✓ Team is more familiar with Java microservice patterns than distributed systems
Examples: event enrichment, stateful deduplication within a service,
topic-to-topic transformations, aggregation per session window
AVOID Flink when:
✗ Team has no JVM expertise (Python-first teams should evaluate PyFlink
or use Flink SQL via a managed service like Confluent Cloud or Immerok)
✗ Operational maturity for a separate cluster is not available
✗ Workload is purely batch (use Spark or dbt instead)
✗ Budget constraints rule out a dedicated Flink cluster
(consider Kafka Streams embedded in the application)Note
Work with us
Building streaming data pipelines and hitting the limits of batch processing or simple Kafka consumers?
We design and implement Apache Flink streaming architectures — from DataStream API keyed state design and window function selection to exactly-once Kafka sink configuration, Flink SQL table definitions, FlinkDeployment Kubernetes Operator setup, checkpoint tuning, backpressure resolution, and Prometheus monitoring. Let’s talk.
Get in touch