Back to Blog
Apache FlinkStreamingKafkaJavaPythonData EngineeringStateful ProcessingExactly-Once

Apache Flink for Streaming Analytics — Stateful Processing, Windowing, and Exactly-Once Semantics

A practical guide to Apache Flink in production: DataStream API architecture with operators and parallelism model, keyed streams and managed state backends (HashMap vs RocksDB) with ValueState, MapState, and ListState, tumbling and sliding window functions with event-time watermarks and allowedLateness for late data handling, exactly-once semantics with distributed checkpointing (Chandy-Lamport algorithm) and two-phase commit KafkaSink, Flink SQL and Table API for declarative stream-table joins with CREATE TABLE Kafka connector, FlinkKafkaSource with WatermarkStrategy for event-time processing, Flink Kubernetes Operator with FlinkDeployment CRD for production-grade cluster management, backpressure detection and checkpoint monitoring with Flink Web UI and Prometheus metrics, and a decision framework for choosing between Apache Flink, Spark Structured Streaming, and Kafka Streams.

2026-05-31

Why Apache Flink? A Comparison with Spark Structured Streaming and Kafka Streams

Apache Flink is a distributed stream processing engine built around a unified batch-and-streaming model, but optimised from the ground up for low-latency stateful streaming. Where Spark Structured Streaming processes streams as micro-batches — executing a batch job on a sliding time window — Flink is a true continuous streaming engine. Records are processed one at a time as they arrive, with latency measured in single-digit milliseconds rather than seconds.

The three main contenders in production stream processing each have a clear sweet spot:

  • Apache Flink. Best for stateful event-time processing, complex windowing, exactly-once end-to-end guarantees, and CEP (complex event processing). The only engine that correctly handles out-of-order events, late data, and long-lived keyed state at scale without workarounds. Chosen by Alibaba, Uber, Netflix, and LinkedIn for their most demanding streaming workloads.
  • Spark Structured Streaming. Best when your team already runs Spark for batch, when processing latency of 1–30 seconds is acceptable, or when you need tight Delta Lake / Iceberg integration via the Lakehouse pattern. Watermark support exists but is less expressive than Flink.
  • Kafka Streams. Best for lightweight stateful processing embedded in a Java microservice — no separate cluster required. Restricted to Kafka sources and sinks, single-JVM topology, and simpler windowing semantics. Ideal for enrichment, aggregation, and filtering within a Kafka-native architecture.

The most important distinction in stream processing is event time vs processing time. Processing time uses the wall clock when a record arrives at the operator — simple but wrong for out-of-order data. Event time uses the timestamp embedded in the record itself, allowing Flink to correctly aggregate events that arrive late due to network delays, mobile offline buffering, or upstream retries. Flink's watermark mechanism is the industry standard for managing event-time with late data tolerance.

Note

The Apache Flink 1.20 documentation is the authoritative reference. Flink follows a roughly 4-month release cadence; always pin your dependencies to a specific minor version (e.g., 1.20.0) and validate connector compatibility — the Kafka, JDBC, and Iceberg connectors have independent release cycles from the Flink core.

Flink Execution Model — JobGraph, Operators, and Parallelism

A Flink application is compiled into a JobGraph — a directed acyclic graph of operators connected by data streams. The JobManager (master) accepts the JobGraph, schedules tasks onto TaskManagers (workers), and coordinates checkpoints. Each operator runs as one or more parallel subtasks across TaskManagers; the degree of parallelism is set globally or per operator.

Flink pipelines chains operators that can be fused into a single JVM thread (an operator chain) to eliminate serialisation and network overhead for adjacent operators. When operators cannot be chained — due to different parallelism, keyBy shuffles, or explicit chain breaks — Flink transmits records over a network buffer between TaskManager slots.

# Flink cluster topology

JobManager (1 or 3 for HA)
  ├── Resource Manager   → allocates TaskManager slots
  ├── Dispatcher         → receives job submissions
  └── JobMaster (per job) → schedules and coordinates

TaskManager (N workers, M slots each)
  ├── Slot 0 → subtask of operator A + subtask of operator B (chained)
  ├── Slot 1 → subtask of operator A + subtask of operator B (chained)
  └── Slot 2 → subtask of operator C (different parallelism, separate slot)

# Recommended slot count per TaskManager: 1–4
# Total parallelism = taskmanager.numberOfTaskSlots * number of TaskManagers
# Rule of thumb: start with parallelism = number of Kafka partitions

The network buffersystem is central to Flink's backpressure mechanism. When a downstream operator is slow, it stops consuming network buffers; upstream operators fill their send buffers and eventually slow their own processing, naturally propagating backpressure through the graph without a separate coordination layer. This is detectable in the Flink Web UI as a "backpressure ratio" on each task.

DataStream API — Keyed State and Managed State with RocksDB

Flink's managed state is the foundation of stateful stream processing. Unlike ad-hoc in-memory maps, managed state is automatically snapshotted during checkpoints and restored on failure. State is always accessed via descriptors registered in the operator's open() method — never as raw instance variables.

The three most commonly used state primitives are ValueState (single value per key), MapState (key-value map per key), and ListState (appendable list per key). All state is keyed — it is partitioned by the stream key set via keyBy(), ensuring that all records for a given key are processed by the same operator subtask.

// Java — Stateful fraud detection operator with ValueState and MapState
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

public class FraudDetector extends RichFlatMapFunction<Transaction, Alert> {

    // State descriptors — registered in open(), accessed via RuntimeContext
    private transient ValueState<Double> runningTotalState;
    private transient ValueState<Long>   lastTxnTimestampState;
    private transient MapState<String, Integer> merchantCountState;

    private static final double FRAUD_THRESHOLD = 10_000.0;
    private static final int    MAX_MERCHANTS    = 5;

    @Override
    public void open(Configuration parameters) throws Exception {
        // ValueState: single Double per userId key
        ValueStateDescriptor<Double> totalDesc = new ValueStateDescriptor<>(
            "running-total", Double.class, 0.0
        );
        // Set TTL: clear state after 24 hours of inactivity
        StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(org.apache.flink.api.common.time.Time.hours(24))
            .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build();
        totalDesc.enableTimeToLive(ttlConfig);
        runningTotalState = getRuntimeContext().getState(totalDesc);

        ValueStateDescriptor<Long> tsDesc = new ValueStateDescriptor<>(
            "last-txn-ts", Long.class
        );
        lastTxnTimestampState = getRuntimeContext().getState(tsDesc);

        // MapState: merchant → count per userId key
        MapStateDescriptor<String, Integer> merchantDesc =
            new MapStateDescriptor<>("merchant-counts", String.class, Integer.class);
        merchantCountState = getRuntimeContext().getMapState(merchantDesc);
    }

    @Override
    public void flatMap(Transaction txn, Collector<Alert> out) throws Exception {
        double total = runningTotalState.value() + txn.getAmount();
        runningTotalState.update(total);

        // Track distinct merchants
        merchantCountState.put(
            txn.getMerchantId(),
            merchantCountState.getOrDefault(txn.getMerchantId(), 0) + 1
        );

        long distinctMerchants = 0;
        for (String ignored : merchantCountState.keys()) distinctMerchants++;

        if (total > FRAUD_THRESHOLD || distinctMerchants > MAX_MERCHANTS) {
            out.collect(new Alert(
                txn.getUserId(),
                "Threshold breach: total=" + total + " merchants=" + distinctMerchants
            ));
        }
        lastTxnTimestampState.update(txn.getTimestamp());
    }
}

// Wire into a DataStream pipeline
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(16);

DataStream<Transaction> txnStream = env
    .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

txnStream
    .keyBy(Transaction::getUserId)   // partition state by userId
    .flatMap(new FraudDetector())
    .sinkTo(alertSink);

env.execute("Fraud Detection Job");

For production workloads with large state (gigabytes to terabytes per TaskManager), switch from the default HashMapStateBackend (in-heap, fast, limited by JVM heap) to the EmbeddedRocksDBStateBackend (off-heap, SST files on local disk, incremental checkpoints).

// RocksDB state backend configuration — flink-conf.yaml or programmatic

// Programmatic (overrides cluster config for this job):
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

EmbeddedRocksDBStateBackend rocksDB = new EmbeddedRocksDBStateBackend(
    true // incremental checkpoints — only send changed SST files, not full snapshot
);
env.setStateBackend(rocksDB);

// Checkpoint storage: write snapshots to S3 or HDFS
env.getCheckpointConfig().setCheckpointStorage(
    "s3://my-bucket/flink-checkpoints/fraud-detector"
);

# flink-conf.yaml (cluster-wide defaults):
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: s3://my-bucket/flink-checkpoints
state.savepoints.dir: s3://my-bucket/flink-savepoints

# RocksDB tuning for high-throughput writes:
state.backend.rocksdb.block.cache-size: 256mb
state.backend.rocksdb.writebuffer.size: 64mb
state.backend.rocksdb.writebuffer.count: 4
state.backend.rocksdb.compaction.style: LEVEL

Note

RocksDB incremental checkpoints dramatically reduce checkpoint duration and S3/HDFS I/O for large state — only the SST files that changed since the last checkpoint are uploaded. The tradeoff is slightly slower recovery (more files to fetch). For state smaller than ~1 GB per subtask, the HashMapStateBackend is faster due to zero serialisation overhead during normal operation.

Windowing — Tumbling, Sliding, and Session Windows

Flink's window API operates on keyed streams and assigns each record to one or more windows. A window function (ReduceFunction, AggregateFunction, or ProcessWindowFunction) is called when the window closes (its watermark passes the end timestamp), producing one output record per window per key.

// Java — Three window types on the same click stream

DataStream<ClickEvent> clicks = env.fromSource(source, watermarkStrategy, "Clicks");
KeyedStream<ClickEvent, String> keyedClicks = clicks.keyBy(ClickEvent::getUserId);

// 1. TUMBLING window: non-overlapping, fixed-size windows
//    e.g., clicks per user per 5-minute bucket
DataStream<PageViewCount> tumblingCounts = keyedClicks
    .window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
    .aggregate(new ClickCountAggregator(), new WindowMetadataEnricher());

// 2. SLIDING window: overlapping windows, emits more frequently
//    e.g., 10-minute window sliding every 2 minutes (5x overlap)
DataStream<PageViewCount> slidingCounts = keyedClicks
    .window(SlidingEventTimeWindows.of(
        Duration.ofMinutes(10),  // window size
        Duration.ofMinutes(2)    // slide interval
    ))
    .aggregate(new ClickCountAggregator());

// 3. SESSION window: gap-based, closes after inactivity
//    e.g., group clicks until 30 minutes of silence = end of session
DataStream<SessionSummary> sessions = keyedClicks
    .window(EventTimeSessionWindows.withGap(Duration.ofMinutes(30)))
    .process(new SessionWindowFunction());

// AggregateFunction — stateful, incremental, low memory overhead
public class ClickCountAggregator
    implements AggregateFunction<ClickEvent, Long, Long> {
    @Override public Long createAccumulator() { return 0L; }
    @Override public Long add(ClickEvent e, Long acc) { return acc + 1; }
    @Override public Long getResult(Long acc) { return acc; }
    @Override public Long merge(Long a, Long b) { return a + b; }
}

// ProcessWindowFunction — full window access, higher memory cost
public class SessionWindowFunction
    extends ProcessWindowFunction<ClickEvent, SessionSummary, String, TimeWindow> {
    @Override
    public void process(String userId, Context ctx,
                        Iterable<ClickEvent> events,
                        Collector<SessionSummary> out) {
        long count = 0; long firstTs = Long.MAX_VALUE; long lastTs = 0;
        for (ClickEvent e : events) {
            count++;
            firstTs = Math.min(firstTs, e.getTimestamp());
            lastTs  = Math.max(lastTs,  e.getTimestamp());
        }
        out.collect(new SessionSummary(userId, count, firstTs, lastTs,
            ctx.window().getStart(), ctx.window().getEnd()));
    }
}

Watermarks drive event-time progress. Flink advances the watermark periodically based on the maximum observed event timestamp minus a configurable out-of-orderness bound. Records with timestamps below the current watermark are considered late and handled by the allowedLateness and side-output mechanisms:

// Watermark strategy — bounded out-of-orderness for event-time processing
WatermarkStrategy<ClickEvent> watermarkStrategy = WatermarkStrategy
    .<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30))
    .withTimestampAssigner((event, recordTimestamp) -> event.getTimestamp())
    .withIdleness(Duration.ofMinutes(5)); // advance watermark if partition is idle

// allowedLateness + side output for late records
OutputTag<ClickEvent> lateClicksTag = new OutputTag<ClickEvent>("late-clicks") {};

SingleOutputStreamOperator<PageViewCount> result = keyedClicks
    .window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
    .allowedLateness(Duration.ofMinutes(1))  // keep window open 1 extra minute
    .sideOutputLateData(lateClicksTag)        // route truly late records elsewhere
    .aggregate(new ClickCountAggregator());

// Handle late records separately (e.g., reprocess or dead-letter queue)
DataStream<ClickEvent> lateClicks = result.getSideOutput(lateClicksTag);
lateClicks.sinkTo(deadLetterSink);

Exactly-Once Semantics — Checkpointing and Two-Phase Commit

Flink implements exactly-once end-to-end guarantees using the Chandy-Lamport distributed snapshot algorithm. The JobManager periodically injects checkpoint barriers into the source streams. Each operator, upon receiving barriers on all inputs, snapshots its state and forwards the barrier downstream. When all operators have acknowledged, the checkpoint is complete — a consistent global snapshot of the entire job state.

For end-to-end exactly-once with Kafka sinks, Flink implements the two-phase commit (2PC) protocol: records are written to Kafka within an open transaction during the checkpoint interval; the transaction is committed atomically when the checkpoint completes. If the job fails mid-interval, the open transaction is aborted and records are replayed from the last completed checkpoint.

// Checkpoint configuration — production settings
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing every 60 seconds
env.enableCheckpointing(60_000L, CheckpointingMode.EXACTLY_ONCE);

CheckpointConfig ckptConfig = env.getCheckpointConfig();
ckptConfig.setCheckpointTimeout(120_000L);        // abort if > 2 min
ckptConfig.setMinPauseBetweenCheckpoints(30_000L); // min gap between checkpoints
ckptConfig.setMaxConcurrentCheckpoints(1);         // only one in-flight at a time
ckptConfig.setTolerableCheckpointFailureNumber(3); // allow 3 failures before failing job

// Retain checkpoints on cancellation (enables manual recovery)
ckptConfig.setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// KafkaSink with EXACTLY_ONCE semantic (two-phase commit)
KafkaSink<PageViewCount> kafkaSink = KafkaSink.<PageViewCount>builder()
    .setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("page-view-counts")
        .setValueSerializationSchema(new PageViewCountSerializer())
        .build())
    // EXACTLY_ONCE requires Kafka transactions + matching checkpoint interval
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    // Transaction timeout must be > checkpoint interval
    .setTransactionalIdPrefix("flink-pageview-job")
    .setKafkaProducerConfig(producerConfig(Map.of(
        "transaction.timeout.ms", "120000",  // must exceed checkpoint interval
        "acks", "all",
        "enable.idempotence", "true"
    )))
    .build();

Note

Exactly-once to Kafka requires that the Kafka transaction.timeout.ms is greater than your checkpoint interval plus checkpoint timeout. Downstream consumers of the exactly-once topic must set isolation.level=read_committed to avoid reading uncommitted (aborted) transactional records. If you can tolerate at-least-once with deduplication downstream, DeliveryGuarantee.AT_LEAST_ONCE has lower overhead and no transaction coordination latency.

Flink SQL and Table API — Declarative Stream Processing

Flink SQL and the Table API provide a declarative layer over the DataStream API, allowing engineers to write familiar SQL against unbounded streams. Under the hood, Flink compiles SQL into a DataStream program with the same operators, state backends, and checkpointing semantics. Flink SQL is particularly powerful for stream-table joins (enriching events with slowly-changing reference data) and aggregations with GROUP BY over tumbling windows.

-- Flink SQL: define Kafka source table with event-time and watermark

CREATE TABLE clicks (
    user_id     STRING,
    page_id     STRING,
    click_ts    TIMESTAMP(3),
    country     STRING,
    -- Declare event-time attribute and watermark strategy
    WATERMARK FOR click_ts AS click_ts - INTERVAL '30' SECOND
) WITH (
    'connector'            = 'kafka',
    'topic'                = 'click-events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id'  = 'flink-sql-consumer',
    'scan.startup.mode'    = 'earliest-offset',
    'format'               = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);

-- Define a slowly-changing dimension table (bounded lookup)
CREATE TABLE user_profiles (
    user_id     STRING,
    tier        STRING,
    signup_date DATE,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector'  = 'jdbc',
    'url'        = 'jdbc:postgresql://postgres:5432/profiles',
    'table-name' = 'user_profiles',
    'lookup.cache.max-rows'   = '10000',
    'lookup.cache.ttl'        = '60s'
);

-- Define output sink table
CREATE TABLE click_aggregates (
    window_start  TIMESTAMP(3),
    window_end    TIMESTAMP(3),
    page_id       STRING,
    tier          STRING,
    click_count   BIGINT,
    PRIMARY KEY (window_start, page_id, tier) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic'     = 'click-aggregates',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format'    = 'json'
);

-- Stream-table join + GROUP BY tumbling window
INSERT INTO click_aggregates
SELECT
    TUMBLE_START(c.click_ts, INTERVAL '5' MINUTE) AS window_start,
    TUMBLE_END(c.click_ts,   INTERVAL '5' MINUTE) AS window_end,
    c.page_id,
    p.tier,
    COUNT(*) AS click_count
FROM clicks AS c
LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF c.click_ts AS p
    ON c.user_id = p.user_id
GROUP BY
    TUMBLE(c.click_ts, INTERVAL '5' MINUTE),
    c.page_id,
    p.tier;

The FOR SYSTEM_TIME AS OF clause implements a temporal table join — the lookup table is accessed at the event time of each stream record, correctly handling slowly-changing dimensions without duplicating records when the reference data changes. This is one of Flink SQL's most powerful features for enrichment pipelines, and there is no equivalent in Spark Structured Streaming or Kafka Streams.

Kafka Integration — FlinkKafkaSource and KafkaSink

The Flink Kafka connector (flink-connector-kafka) implements the new FLIP-27 Source API and supports all three delivery guarantees. Offset management is handled by Flink — offsets are committed to Kafka only when a checkpoint completes, ensuring exactly-once coordination between state and source offsets.

// Kafka source with event-time watermarks and Avro deserialization
// Maven: org.apache.flink:flink-connector-kafka:3.4.0-1.20

KafkaSource<ClickEvent> kafkaSource = KafkaSource.<ClickEvent>builder()
    .setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
    .setTopics("click-events")
    .setGroupId("flink-click-processor")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(
        OffsetResetStrategy.EARLIEST  // fall back if no committed offset
    ))
    .setDeserializer(
        KafkaRecordDeserializationSchema.valueOnly(new AvroDeserializationSchema<>(ClickEvent.class))
    )
    // Consumer properties
    .setProperty("security.protocol", "SASL_SSL")
    .setProperty("sasl.mechanism", "SCRAM-SHA-256")
    .setProperty("sasl.jaas.config",
        "org.apache.kafka.common.security.scram.ScramLoginModule required " +
        "username=\"flink-consumer\" password=\"secret\";")
    .setProperty("fetch.min.bytes", "65536")         // reduce fetch requests
    .setProperty("fetch.max.wait.ms", "500")
    .build();

// WatermarkStrategy: bounded out-of-orderness with per-partition tracking
WatermarkStrategy<ClickEvent> watermarkStrategy = WatermarkStrategy
    .<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30))
    .withTimestampAssigner(
        (event, ts) -> event.getClickTimestampMs()
    )
    .withIdleness(Duration.ofMinutes(2)); // unblock watermark on idle partitions

DataStreamSource<ClickEvent> clickStream = env.fromSource(
    kafkaSource,
    watermarkStrategy,
    "Kafka Click Source"
);

Production Deployment with the Flink Kubernetes Operator

The Flink Kubernetes Operator manages the full lifecycle of Flink deployments as a Kubernetes custom resource (CRD: FlinkDeployment). It handles job submission, scaling, checkpointing, savepoints on upgrade, and automatic recovery — replacing the manual process of constructing JobManager/TaskManager Deployments and Services.

# FlinkDeployment — production Flink job on Kubernetes
# Install operator: helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: click-aggregation-job
  namespace: flink-prod
spec:
  image: my-registry/flink-click-aggregation:1.20.0-v2.3.1
  flinkVersion: v1_20
  flinkConfiguration:
    # Checkpointing
    execution.checkpointing.interval: "60000"
    execution.checkpointing.mode: EXACTLY_ONCE
    state.backend: rocksdb
    state.backend.incremental: "true"
    state.checkpoints.dir: s3://my-bucket/flink-checkpoints
    state.savepoints.dir: s3://my-bucket/flink-savepoints
    # Restart strategy
    restart-strategy: exponential-delay
    restart-strategy.exponential-delay.initial-backoff: 1s
    restart-strategy.exponential-delay.max-backoff: 5min
    restart-strategy.exponential-delay.reset-backoff-threshold: 10min
    # Web UI and metrics
    metrics.reporters: prometheus
    metrics.reporter.prometheus.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    metrics.reporter.prometheus.port: "9249"
    # Network and buffer tuning
    taskmanager.network.memory.fraction: "0.2"
    taskmanager.memory.managed.fraction: "0.4"

  serviceAccount: flink-service-account

  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          env:
            - name: KAFKA_BOOTSTRAP_SERVERS
              valueFrom:
                secretKeyRef:
                  name: kafka-credentials
                  key: bootstrap-servers
          resources: {}  # overridden per component below
          volumeMounts:
            - name: flink-config-vol
              mountPath: /opt/flink/conf
      volumes:
        - name: flink-config-vol
          configMap:
            name: flink-job-config

  jobManager:
    resource:
      memory: "2048m"
      cpu: 0.5
    replicas: 1  # set to 3 + ha.mode=kubernetes for HA

  taskManager:
    resource:
      memory: "4096m"
      cpu: 2.0
    replicas: 4  # 4 TMs * 2 slots = 8 total parallelism

  job:
    jarURI: local:///opt/flink/usrlib/click-aggregation.jar
    entryClass: com.datasops.ClickAggregationJob
    args: ["--env", "production", "--parallelism", "8"]
    parallelism: 8
    upgradeMode: savepoint   # take savepoint before replacing deployment
    savepointTriggerNonce: 0 # increment to trigger manual savepoint
# Horizontal scaling — update replicas without job restart (since Operator 1.7)
kubectl patch flinkdeployment click-aggregation-job -n flink-prod   --type=merge   -p '{"spec":{"taskManager":{"replicas":8}}}'

# Trigger a savepoint manually
kubectl patch flinkdeployment click-aggregation-job -n flink-prod   --type=merge   -p '{"spec":{"job":{"savepointTriggerNonce":1}}}'

# Monitor savepoint status
kubectl get flinkdeployment click-aggregation-job -n flink-prod -o jsonpath='{.status.jobStatus}'

Monitoring — Backpressure, Checkpoints, and Prometheus Metrics

Flink exposes hundreds of metrics via its metrics system. The Prometheus reporter scrapes them at the /metrics endpoint on each TaskManager (port 9249 in our config above). The most critical metrics for production operations fall into four categories:

  • Checkpoint metrics. lastCheckpointDuration (alert if > 2× checkpoint interval), lastCheckpointSize (alert on sudden growth), numberOfFailedCheckpoints.
  • Backpressure. backPressuredTimeMsPerSecondper operator subtask — alert if > 100 ms/s sustained. Also visible as coloured nodes in the Flink Web UI job graph.
  • Lag. KafkaSourceReader.KafkaConsumer.records-lag-max — measures how far behind the Flink consumer is from the Kafka head partition.
  • Watermark lag. currentOutputWatermark subtracted from wall-clock time — reveals event-time processing lag independently of checkpoint health.
# Prometheus alert rules for Flink — prometheus-flink-rules.yaml
groups:
  - name: flink.checkpoints
    rules:
      - alert: FlinkCheckpointDurationHigh
        expr: |
          flink_jobmanager_job_lastCheckpointDuration > 90000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Flink checkpoint duration {{ $value }}ms exceeds 90s"
          runbook: "https://wiki.internal/flink-checkpoint-runbook"

      - alert: FlinkCheckpointFailed
        expr: |
          increase(flink_jobmanager_job_numberOfFailedCheckpoints[10m]) > 2
        for: 0m
        labels:
          severity: critical
        annotations:
          summary: "Flink job {{ $labels.job_name }} has {{ $value }} failed checkpoints"

  - name: flink.backpressure
    rules:
      - alert: FlinkBackpressureHigh
        expr: |
          flink_taskmanager_job_task_backPressuredTimeMsPerSecond > 200
        for: 3m
        labels:
          severity: warning
        annotations:
          summary: "Flink task {{ $labels.task_name }} backpressured {{ $value }}ms/s"

  - name: flink.kafka.lag
    rules:
      - alert: FlinkKafkaLagHigh
        expr: |
          flink_source_KafkaSourceReader_KafkaConsumer_records_lag_max > 100000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Flink Kafka consumer lag {{ $value }} records on {{ $labels.topic }}"

---
# Grafana dashboard provisioning — key panels
# Panel 1: Checkpoint duration heatmap
# metric: flink_jobmanager_job_lastCheckpointDuration{job_name="$job"}
# Panel 2: Backpressure ratio per task
# metric: flink_taskmanager_job_task_backPressuredTimeMsPerSecond
# Panel 3: Kafka consumer lag per partition
# metric: flink_source_KafkaSourceReader_KafkaConsumer_records_lag_max
# Panel 4: Watermark lag (ms)
# metric: time() * 1000 - flink_taskmanager_job_task_operator_currentOutputWatermark / 1000000

When backpressure is detected, the diagnostic process follows a consistent pattern: identify the slowest operator (the first one upstream of a backpressure boundary in the Web UI), check whether it is CPU-bound (increase parallelism), I/O-bound (optimise sink or external call), or state-bound (tune RocksDB compaction, enable incremental checkpoints, reduce state TTL). The Flink Web UI's "Flame Graph" tab (enable via rest.flamegraph.enabled: true) samples TaskManager thread stacks on-demand without a profiler agent.

Decision Framework — Flink vs Spark Structured Streaming vs Kafka Streams

Choosing a stream processing engine is an architectural decision that is expensive to reverse. The right choice depends on latency requirements, state complexity, team expertise, and operational overhead tolerance.

# Decision matrix: stream processing engine selection

CHOOSE Apache Flink when:
  ✓ Sub-second latency is required (true continuous streaming, not micro-batch)
  ✓ Complex event-time windowing with out-of-order data and late arrivals
  ✓ Long-lived keyed state (GBs per key, multi-week retention)
  ✓ Exactly-once end-to-end guarantees to Kafka/JDBC/Iceberg
  ✓ Complex event processing (CEP patterns, temporal joins)
  ✓ High-throughput stateful joins between multiple streams
  ✓ Flink SQL temporal table joins for enrichment at scale
  Examples: fraud detection, real-time ML feature computation,
            IoT telemetry aggregation, financial risk calculations

CHOOSE Spark Structured Streaming when:
  ✓ Team has existing Spark expertise (PySpark, Delta Lake, MLflow)
  ✓ Latency of 1–60 seconds is acceptable (micro-batch)
  ✓ Deep Lakehouse integration (Delta Lake CDC, Iceberg write via Spark)
  ✓ Unified batch + streaming code sharing the same DataFrame API
  ✓ Need to run complex ML inference inside the pipeline (Pandas UDFs)
  Examples: ETL to data warehouse, CDC ingestion to lakehouse,
            batch + streaming shared pipelines

CHOOSE Kafka Streams when:
  ✓ Processing logic embedded in a Java/Kotlin microservice
  ✓ No separate streaming cluster to operate
  ✓ Source and sink are always Kafka topics
  ✓ State requirements are modest (GBs total, not per key at scale)
  ✓ Team is more familiar with Java microservice patterns than distributed systems
  Examples: event enrichment, stateful deduplication within a service,
            topic-to-topic transformations, aggregation per session window

AVOID Flink when:
  ✗ Team has no JVM expertise (Python-first teams should evaluate PyFlink
    or use Flink SQL via a managed service like Confluent Cloud or Immerok)
  ✗ Operational maturity for a separate cluster is not available
  ✗ Workload is purely batch (use Spark or dbt instead)
  ✗ Budget constraints rule out a dedicated Flink cluster
     (consider Kafka Streams embedded in the application)

Note

If your team is Python-first, evaluate PyFlink — the Python API for Flink that covers both the DataStream API and Flink SQL. PyFlink executes Python operators in a co-located Python process alongside the JVM, with data serialisation overhead between the two runtimes. For SQL-only workloads, PyFlink's Table API has no Python overhead — SQL is compiled to JVM operators directly. Alternatively, managed Flink services (AWS Managed Service for Apache Flink, Confluent Cloud for Flink) eliminate cluster operational burden at the cost of flexibility.

Work with us

Building streaming data pipelines and hitting the limits of batch processing or simple Kafka consumers?

We design and implement Apache Flink streaming architectures — from DataStream API keyed state design and window function selection to exactly-once Kafka sink configuration, Flink SQL table definitions, FlinkDeployment Kubernetes Operator setup, checkpoint tuning, backpressure resolution, and Prometheus monitoring. Let’s talk.

Get in touch

Related Articles

DataSOps Consulting

Need help implementing this in production?

We build and operate data pipelines, AI systems, and observability stacks for engineering teams. Reach out for a free 30-minute architecture review.