Back to Blog
Event SourcingCQRSArchitectureMicroservicesDDDKafka

Event Sourcing and CQRS — When to Split Read and Write Models

A practical guide to Event Sourcing and CQRS in production: event store design on PostgreSQL, aggregate and domain event patterns with optimistic concurrency, CQRS read model projections with checkpoint-based replay, Kafka-based event publishing with the Transactional Outbox pattern, TypeScript command handlers, event versioning with upcasters, snapshotting for long-lived aggregates, and a decision framework for when these patterns are the right choice.

2026-05-13

The core problem: state is a lossy compression of history

A conventional CRUD database stores the current state of an entity — the latest snapshot. When you update a bank account balance, the previous balance is gone. When you cancel an order, the sequence of status transitions that led to the cancellation is lost. The audit log, if it exists at all, is a second-class citizen bolted on after the fact.

Event Sourcing inverts this model. Instead of storing current state, you store the sequence of events that caused it. The bank account does not have a balance column — it has a stream of AccountOpened, MoneyDeposited, and MoneyWithdrawn events. The current balance is derived by replaying those events. State becomes a projection of history, not the source of truth itself.

This unlocks capabilities that are impossible or expensive with state-based persistence: complete audit trails with zero extra effort, temporal queries ("what was the balance on March 3rd?"), event-driven integration without polling, and the ability to build new read models retroactively from the existing event log. The cost is increased complexity — and understanding that cost is the central skill in applying these patterns well. Tools like EventStoreDB and Axon Framework provide purpose-built infrastructure, though many teams implement event stores directly on PostgreSQL or Kafka.

Core Concepts — Events, Aggregates, and the Event Store

Domain Event

An immutable fact that something happened in the past, expressed in the ubiquitous language of the domain. Events are named in the past tense: OrderPlaced, PaymentProcessed, ShipmentDispatched. They carry all the data needed to understand what happened — not just foreign keys, but denormalised values. An event written to the store is never mutated or deleted.

Aggregate

A cluster of domain objects treated as a single unit for writes. The aggregate root is the only entry point for commands. The aggregate enforces invariants — business rules that must hold true across the entire cluster. In Event Sourcing, the aggregate's state is rebuilt by replaying its event stream from the beginning (or from a snapshot).

Event Stream

An ordered, append-only log of events for a single aggregate instance, identified by a stream ID (e.g., order-abc123). Events in the stream are numbered sequentially from version 0. Optimistic concurrency control uses the expected version: a writer specifies the version it read, and the store rejects the append if a concurrent write has already advanced the version.

Event Store

The persistence layer for event streams. Supports two primary operations: AppendToStream (with an expected version for optimistic locking) and ReadStreamForward (to replay events and reconstitute state). The event store is an append-only log — not a mutable record store. EventStoreDB is purpose-built; PostgreSQL and DynamoDB are common DIY alternatives.

Projection

A read model built by subscribing to the event stream and maintaining a denormalised view optimised for queries. Projections are derived data — they can always be rebuilt from scratch by replaying all events. This separates read and write concerns and allows multiple independent read models optimised for different access patterns (CQRS).

Snapshot

A checkpoint that stores the aggregate state at a known event version. When loading an aggregate, the event store loads the most recent snapshot, then replays only the events after that snapshot version. Snapshots reduce reconstitution time for aggregates with long event histories without breaking the audit trail.

Implementing Event Sourcing in Python — Aggregate, Store, and Repository

The following implementation uses PostgreSQL as the event store. Each event stream is stored as a sequence of rows in an events table. Optimistic concurrency is enforced with a unique constraint on (stream_id, version). The aggregate rebuilds its state by replaying events from the stream — no ORM, no mutable state columns.

# event_sourcing/domain.py — Domain events and Order aggregate

from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from decimal import Decimal
from typing import Any


# ---------------------------------------------------------------------------
# Base event
# ---------------------------------------------------------------------------

@dataclass(frozen=True)
class DomainEvent:
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    occurred_at: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )

    def to_dict(self) -> dict[str, Any]:
        raise NotImplementedError


# ---------------------------------------------------------------------------
# Order domain events
# ---------------------------------------------------------------------------

@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
    order_id: str = ""
    customer_id: str = ""
    items: list[dict] = field(default_factory=list)
    total_amount: str = "0"            # Decimal serialised as string

    def to_dict(self) -> dict[str, Any]:
        return {
            "event_type": "OrderPlaced",
            "event_id": self.event_id,
            "occurred_at": self.occurred_at,
            "order_id": self.order_id,
            "customer_id": self.customer_id,
            "items": self.items,
            "total_amount": self.total_amount,
        }


@dataclass(frozen=True)
class PaymentConfirmed(DomainEvent):
    order_id: str = ""
    payment_reference: str = ""
    amount: str = "0"

    def to_dict(self) -> dict[str, Any]:
        return {
            "event_type": "PaymentConfirmed",
            "event_id": self.event_id,
            "occurred_at": self.occurred_at,
            "order_id": self.order_id,
            "payment_reference": self.payment_reference,
            "amount": self.amount,
        }


@dataclass(frozen=True)
class OrderCancelled(DomainEvent):
    order_id: str = ""
    reason: str = ""

    def to_dict(self) -> dict[str, Any]:
        return {
            "event_type": "OrderCancelled",
            "event_id": self.event_id,
            "occurred_at": self.occurred_at,
            "order_id": self.order_id,
            "reason": self.reason,
        }


# ---------------------------------------------------------------------------
# Order aggregate
# ---------------------------------------------------------------------------

class OrderStatus:
    PENDING = "PENDING"
    PAID = "PAID"
    CANCELLED = "CANCELLED"


class OrderAggregate:
    """
    Aggregate root for the Order bounded context.
    State is reconstituted exclusively by applying events — never set directly.
    """

    def __init__(self) -> None:
        self.order_id: str | None = None
        self.customer_id: str | None = None
        self.status: str | None = None
        self.total_amount: Decimal = Decimal("0")
        self.version: int = -1              # -1 = stream does not exist yet
        self._pending_events: list[DomainEvent] = []

    # ------------------------------------------------------------------
    # Command handlers (enforce invariants, raise events)
    # ------------------------------------------------------------------

    def place_order(
        self,
        order_id: str,
        customer_id: str,
        items: list[dict],
        total_amount: Decimal,
    ) -> None:
        if self.status is not None:
            raise ValueError("Order has already been placed")
        self._raise(
            OrderPlaced(
                order_id=order_id,
                customer_id=customer_id,
                items=items,
                total_amount=str(total_amount),
            )
        )

    def confirm_payment(self, payment_reference: str, amount: Decimal) -> None:
        if self.status != OrderStatus.PENDING:
            raise ValueError(f"Cannot confirm payment for order in status {self.status}")
        self._raise(
            PaymentConfirmed(
                order_id=self.order_id,
                payment_reference=payment_reference,
                amount=str(amount),
            )
        )

    def cancel(self, reason: str) -> None:
        if self.status == OrderStatus.CANCELLED:
            raise ValueError("Order is already cancelled")
        if self.status == OrderStatus.PAID:
            raise ValueError("Cannot cancel a paid order without a refund")
        self._raise(OrderCancelled(order_id=self.order_id, reason=reason))

    # ------------------------------------------------------------------
    # Event application (mutate state — no validation here)
    # ------------------------------------------------------------------

    def _apply(self, event: DomainEvent) -> None:
        if isinstance(event, OrderPlaced):
            self.order_id = event.order_id
            self.customer_id = event.customer_id
            self.status = OrderStatus.PENDING
            self.total_amount = Decimal(event.total_amount)
        elif isinstance(event, PaymentConfirmed):
            self.status = OrderStatus.PAID
        elif isinstance(event, OrderCancelled):
            self.status = OrderStatus.CANCELLED

    def _raise(self, event: DomainEvent) -> None:
        self._apply(event)
        self._pending_events.append(event)

    # ------------------------------------------------------------------
    # Reconstitution from stored events
    # ------------------------------------------------------------------

    @classmethod
    def reconstitute(cls, events: list[DomainEvent]) -> "OrderAggregate":
        aggregate = cls()
        for event in events:
            aggregate._apply(event)
            aggregate.version += 1
        return aggregate

    def pop_pending_events(self) -> list[DomainEvent]:
        events, self._pending_events = self._pending_events, []
        return events
# event_sourcing/store.py — PostgreSQL-backed event store

import json
import psycopg2
from psycopg2.extras import RealDictCursor
from typing import Any

# DDL — run once on database initialisation
CREATE_EVENTS_TABLE = """
CREATE TABLE IF NOT EXISTS events (
    id          BIGSERIAL PRIMARY KEY,
    stream_id   TEXT        NOT NULL,
    version     INTEGER     NOT NULL,
    event_type  TEXT        NOT NULL,
    payload     JSONB       NOT NULL,
    recorded_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    CONSTRAINT uq_stream_version UNIQUE (stream_id, version)
);
CREATE INDEX IF NOT EXISTS idx_events_stream_id ON events (stream_id);
"""


class OptimisticConcurrencyError(Exception):
    """Raised when an append fails because the stream version has advanced."""


class PostgresEventStore:
    def __init__(self, dsn: str) -> None:
        self._dsn = dsn

    def _connect(self):
        return psycopg2.connect(self._dsn)

    def append_to_stream(
        self,
        stream_id: str,
        events: list[dict[str, Any]],
        expected_version: int,
    ) -> None:
        """
        Append events to a stream, enforcing optimistic concurrency.
        expected_version=-1 means the stream must not exist yet.
        """
        with self._connect() as conn:
            with conn.cursor() as cur:
                # Verify current version
                cur.execute(
                    "SELECT COALESCE(MAX(version), -1) FROM events WHERE stream_id = %s",
                    (stream_id,),
                )
                current_version: int = cur.fetchone()[0]
                if current_version != expected_version:
                    raise OptimisticConcurrencyError(
                        f"Expected version {expected_version}, got {current_version} "
                        f"for stream {stream_id}"
                    )
                for i, event in enumerate(events):
                    cur.execute(
                        """
                        INSERT INTO events (stream_id, version, event_type, payload)
                        VALUES (%s, %s, %s, %s)
                        """,
                        (
                            stream_id,
                            expected_version + 1 + i,
                            event["event_type"],
                            json.dumps(event),
                        ),
                    )

    def read_stream(self, stream_id: str) -> list[dict[str, Any]]:
        with self._connect() as conn:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                cur.execute(
                    "SELECT payload FROM events WHERE stream_id = %s ORDER BY version",
                    (stream_id,),
                )
                return [row["payload"] for row in cur.fetchall()]

    def read_all_events(self, after_position: int = 0) -> list[dict[str, Any]]:
        """Read the global event log — used by projection rebuilders and subscriptions."""
        with self._connect() as conn:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                cur.execute(
                    "SELECT id, payload FROM events WHERE id > %s ORDER BY id",
                    (after_position,),
                )
                return [{"position": row["id"], **row["payload"]} for row in cur.fetchall()]
# event_sourcing/repository.py — Order repository wiring aggregate to store

from .domain import OrderAggregate, DomainEvent, OrderPlaced, PaymentConfirmed, OrderCancelled
from .store import PostgresEventStore


def _deserialise(payload: dict) -> DomainEvent:
    et = payload["event_type"]
    if et == "OrderPlaced":
        return OrderPlaced(**{k: v for k, v in payload.items() if k != "event_type"})
    if et == "PaymentConfirmed":
        return PaymentConfirmed(**{k: v for k, v in payload.items() if k != "event_type"})
    if et == "OrderCancelled":
        return OrderCancelled(**{k: v for k, v in payload.items() if k != "event_type"})
    raise ValueError(f"Unknown event type: {et}")


class OrderRepository:
    def __init__(self, store: PostgresEventStore) -> None:
        self._store = store

    def load(self, order_id: str) -> OrderAggregate:
        payloads = self._store.read_stream(f"order-{order_id}")
        if not payloads:
            raise KeyError(f"Order {order_id} not found")
        events = [_deserialise(p) for p in payloads]
        return OrderAggregate.reconstitute(events)

    def save(self, aggregate: OrderAggregate) -> None:
        pending = aggregate.pop_pending_events()
        if not pending:
            return
        self._store.append_to_stream(
            stream_id=f"order-{aggregate.order_id}",
            events=[e.to_dict() for e in pending],
            expected_version=aggregate.version,
        )

Note

The repository pattern keeps the event store API out of the domain layer. The aggregate knows nothing about PostgreSQL — it only raises and applies events. This makes the domain fully testable without any I/O: create an aggregate, call commands, inspect pending events. The repository is the only boundary between the domain model and the persistence layer.

CQRS — Splitting Read and Write Models

CQRS (Command Query Responsibility Segregation) is the pattern of using separate models for writes (commands) and reads (queries). In an event-sourced system, the write model is the aggregate — it enforces invariants and produces events. The read models are projections: denormalised views materialised from the event stream, each optimised for a specific query pattern.

A single aggregate might feed multiple independent projections: an orders_summary table for list views, an customer_order_history table for customer dashboards, and a revenue_by_day table for analytics — all powered by the same OrderPlaced and PaymentConfirmed events. Projections are eventually consistent — they lag slightly behind the command side — but they can be rebuilt from scratch at any time by replaying the event log.

# event_sourcing/projections.py — Projecting events into read models (PostgreSQL)

import json
import psycopg2
from .store import PostgresEventStore

CREATE_PROJECTION_TABLES = """
CREATE TABLE IF NOT EXISTS orders_summary (
    order_id     TEXT PRIMARY KEY,
    customer_id  TEXT NOT NULL,
    status       TEXT NOT NULL,
    total_amount NUMERIC(12, 2) NOT NULL,
    placed_at    TIMESTAMPTZ,
    paid_at      TIMESTAMPTZ,
    cancelled_at TIMESTAMPTZ
);

CREATE TABLE IF NOT EXISTS projection_checkpoints (
    projection_name TEXT PRIMARY KEY,
    last_position   BIGINT NOT NULL DEFAULT 0
);
"""


class OrderSummaryProjection:
    """
    Maintains a denormalised orders_summary table.
    Tracks a checkpoint so it can resume after a restart without replaying all events.
    """

    NAME = "orders_summary"

    def __init__(self, dsn: str, store: PostgresEventStore) -> None:
        self._dsn = dsn
        self._store = store

    def _connect(self):
        return psycopg2.connect(self._dsn)

    def _get_checkpoint(self, cur) -> int:
        cur.execute(
            "SELECT last_position FROM projection_checkpoints WHERE projection_name = %s",
            (self.NAME,),
        )
        row = cur.fetchone()
        return row[0] if row else 0

    def _save_checkpoint(self, cur, position: int) -> None:
        cur.execute(
            """
            INSERT INTO projection_checkpoints (projection_name, last_position)
            VALUES (%s, %s)
            ON CONFLICT (projection_name) DO UPDATE SET last_position = EXCLUDED.last_position
            """,
            (self.NAME, position),
        )

    def run(self) -> None:
        """Process all unhandled events since the last checkpoint."""
        with self._connect() as conn:
            with conn.cursor() as cur:
                checkpoint = self._get_checkpoint(cur)
                events = self._store.read_all_events(after_position=checkpoint)
                for event in events:
                    self._handle(cur, event)
                    self._save_checkpoint(cur, event["position"])

    def rebuild(self) -> None:
        """Drop and rebuild the projection from the beginning of the event log."""
        with self._connect() as conn:
            with conn.cursor() as cur:
                cur.execute("TRUNCATE orders_summary")
                cur.execute(
                    "DELETE FROM projection_checkpoints WHERE projection_name = %s",
                    (self.NAME,),
                )
        self.run()

    def _handle(self, cur, event: dict) -> None:
        et = event.get("event_type")

        if et == "OrderPlaced":
            cur.execute(
                """
                INSERT INTO orders_summary
                    (order_id, customer_id, status, total_amount, placed_at)
                VALUES (%s, %s, 'PENDING', %s, %s)
                ON CONFLICT (order_id) DO NOTHING
                """,
                (event["order_id"], event["customer_id"],
                 event["total_amount"], event["occurred_at"]),
            )

        elif et == "PaymentConfirmed":
            cur.execute(
                """
                UPDATE orders_summary
                SET status = 'PAID', paid_at = %s
                WHERE order_id = %s
                """,
                (event["occurred_at"], event["order_id"]),
            )

        elif et == "OrderCancelled":
            cur.execute(
                """
                UPDATE orders_summary
                SET status = 'CANCELLED', cancelled_at = %s
                WHERE order_id = %s
                """,
                (event["occurred_at"], event["order_id"]),
            )

TypeScript Implementation — Command Handling and Kafka Event Bus

In a microservices architecture, the event store is often complemented by an event bus: events are appended to the store and published to Kafka for consumption by downstream services. The pattern — sometimes called the Transactional Outbox — ensures that the event is durably recorded before it is published. The event store itself becomes the outbox.

// order-service/src/commands.ts — Command handlers and event publishing

import { Kafka, Producer } from "kafkajs";
import { Pool } from "pg";

interface OrderItem {
  productId: string;
  quantity: number;
  unitPrice: number;
}

// ---------------------------------------------------------------------------
// Lightweight event store on PostgreSQL
// ---------------------------------------------------------------------------

async function appendToStream(
  pool: Pool,
  streamId: string,
  events: object[],
  expectedVersion: number
): Promise<void> {
  const client = await pool.connect();
  try {
    await client.query("BEGIN");

    const { rows } = await client.query<{ version: number }>(
      "SELECT COALESCE(MAX(version), -1) AS version FROM events WHERE stream_id = $1",
      [streamId]
    );
    const currentVersion = rows[0].version;

    if (currentVersion !== expectedVersion) {
      await client.query("ROLLBACK");
      throw new Error(
        `Optimistic concurrency conflict on ${streamId}: expected ${expectedVersion}, got ${currentVersion}`
      );
    }

    for (let i = 0; i < events.length; i++) {
      const event = events[i] as { event_type: string };
      await client.query(
        `INSERT INTO events (stream_id, version, event_type, payload)
         VALUES ($1, $2, $3, $4)`,
        [streamId, expectedVersion + 1 + i, event.event_type, JSON.stringify(event)]
      );
    }

    await client.query("COMMIT");
  } catch (err) {
    await client.query("ROLLBACK");
    throw err;
  } finally {
    client.release();
  }
}

// ---------------------------------------------------------------------------
// PlaceOrder command handler
// ---------------------------------------------------------------------------

export class PlaceOrderHandler {
  constructor(
    private readonly pool: Pool,
    private readonly producer: Producer
  ) {}

  async handle(command: {
    orderId: string;
    customerId: string;
    items: OrderItem[];
  }): Promise<void> {
    const total = command.items.reduce(
      (sum, item) => sum + item.unitPrice * item.quantity,
      0
    );

    const event = {
      event_type: "OrderPlaced",
      event_id: crypto.randomUUID(),
      occurred_at: new Date().toISOString(),
      order_id: command.orderId,
      customer_id: command.customerId,
      items: command.items,
      total_amount: total.toFixed(2),
    };

    // 1. Append to the event store (source of truth)
    await appendToStream(this.pool, `order-${command.orderId}`, [event], -1);

    // 2. Publish to Kafka for downstream projections and other services
    await this.producer.send({
      topic: "domain-events.orders",
      messages: [
        {
          key: command.orderId,
          value: JSON.stringify(event),
          headers: { "event-type": "OrderPlaced" },
        },
      ],
    });
  }
}
// order-service/src/projection-consumer.ts — Kafka consumer updating read models

import { Kafka, Consumer, EachMessagePayload } from "kafkajs";
import { Pool } from "pg";

export class OrderProjectionConsumer {
  private consumer: Consumer;

  constructor(
    private readonly pool: Pool,
    kafka: Kafka
  ) {
    this.consumer = kafka.consumer({ groupId: "orders-read-model-projector" });
  }

  async start(): Promise<void> {
    await this.consumer.connect();
    await this.consumer.subscribe({
      topic: "domain-events.orders",
      fromBeginning: false,    // use fromBeginning: true when rebuilding
    });

    await this.consumer.run({
      eachMessage: async (payload: EachMessagePayload) => {
        const event = JSON.parse(payload.message.value!.toString());
        await this.project(event);
      },
    });
  }

  private async project(event: Record<string, string>): Promise<void> {
    const { event_type } = event;

    switch (event_type) {
      case "OrderPlaced":
        await this.pool.query(
          `INSERT INTO orders_summary
             (order_id, customer_id, status, total_amount, placed_at)
           VALUES ($1, $2, 'PENDING', $3, $4)
           ON CONFLICT (order_id) DO NOTHING`,
          [event.order_id, event.customer_id, event.total_amount, event.occurred_at]
        );
        break;

      case "PaymentConfirmed":
        await this.pool.query(
          `UPDATE orders_summary
           SET status = 'PAID', paid_at = $1
           WHERE order_id = $2`,
          [event.occurred_at, event.order_id]
        );
        break;

      case "OrderCancelled":
        await this.pool.query(
          `UPDATE orders_summary
           SET status = 'CANCELLED', cancelled_at = $1
           WHERE order_id = $2`,
          [event.occurred_at, event.order_id]
        );
        break;

      default:
        // Unknown event type — log and skip (forward compatibility)
        console.warn(`[projection] Unknown event type: ${event_type}`);
    }
  }

  async stop(): Promise<void> {
    await this.consumer.disconnect();
  }
}

Note

Publishing to Kafka after the database transaction commits introduces a small window where the event is stored but not yet published. For strict at-least-once delivery, use the Transactional Outbox pattern: write events to an outbox table in the same transaction, then use a separate relay process (or Debezium CDC) to publish from the outbox to Kafka. This eliminates the dual-write inconsistency window entirely.

Event Versioning — Upcasters and Schema Evolution

Events are immutable and permanent — you cannot change an event that has been stored. When the domain model evolves, old events must still be readable. The standard approach is upcasting: a function that transforms an older event version to the current shape before it reaches the aggregate. Upcasters are applied in sequence during deserialisation, keeping old event payloads in the store unchanged while allowing the aggregate to work against a single, current event schema.

# event_sourcing/upcasters.py — Event version migration without touching stored data

from typing import Any


def upcast_order_placed_v1_to_v2(payload: dict[str, Any]) -> dict[str, Any]:
    """
    OrderPlaced v1 stored items as a flat list of product IDs.
    v2 stores items as objects with productId, quantity, unitPrice.
    Upcaster reconstructs the v2 shape from v1 data.
    """
    return {
        **payload,
        "schema_version": 2,
        "items": [
            {"productId": pid, "quantity": 1, "unitPrice": "0.00"}
            for pid in payload.get("product_ids", [])
        ],
    }


def upcast_order_placed_v2_to_v3(payload: dict[str, Any]) -> dict[str, Any]:
    """
    v3 adds a currency field (default USD for legacy events).
    """
    return {**payload, "schema_version": 3, "currency": "USD"}


# Upcaster chain — applied left to right during deserialisation
ORDER_PLACED_UPCASTERS: dict[int, callable] = {
    1: upcast_order_placed_v1_to_v2,
    2: upcast_order_placed_v2_to_v3,
}

CURRENT_SCHEMA_VERSION = 3


def upcast(payload: dict[str, Any]) -> dict[str, Any]:
    """Apply all necessary upcasters to bring payload to the current schema version."""
    version = payload.get("schema_version", 1)
    upcasters = {
        k: v for k, v in ORDER_PLACED_UPCASTERS.items() if k >= version
    }
    for v in sorted(upcasters):
        payload = ORDER_PLACED_UPCASTERS[v](payload)
    return payload

Snapshotting — Avoiding Full Replay for Long-Lived Aggregates

An aggregate with 10,000 events takes meaningful time to reconstitute on every command. Snapshots solve this by persisting the aggregate state at a known version. On load, the repository reads the latest snapshot (if one exists) and then replays only the events after that snapshot version. The event log itself is never modified — the snapshot is supplementary, not a replacement for the event history.

# event_sourcing/snapshot_store.py — Snapshot table and repository mixin

import json
import psycopg2
from psycopg2.extras import RealDictCursor

CREATE_SNAPSHOT_TABLE = """
CREATE TABLE IF NOT EXISTS snapshots (
    stream_id   TEXT    PRIMARY KEY,
    version     INTEGER NOT NULL,
    state       JSONB   NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT now()
);
"""

SNAPSHOT_THRESHOLD = 100   # take a snapshot every N events


class SnapshotStore:
    def __init__(self, dsn: str) -> None:
        self._dsn = dsn

    def save_snapshot(self, stream_id: str, version: int, state: dict) -> None:
        with psycopg2.connect(self._dsn) as conn:
            with conn.cursor() as cur:
                cur.execute(
                    """
                    INSERT INTO snapshots (stream_id, version, state)
                    VALUES (%s, %s, %s)
                    ON CONFLICT (stream_id) DO UPDATE
                        SET version = EXCLUDED.version,
                            state   = EXCLUDED.state,
                            created_at = now()
                    """,
                    (stream_id, version, json.dumps(state)),
                )

    def load_snapshot(self, stream_id: str) -> dict | None:
        with psycopg2.connect(self._dsn) as conn:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                cur.execute(
                    "SELECT version, state FROM snapshots WHERE stream_id = %s",
                    (stream_id,),
                )
                row = cur.fetchone()
                return dict(row) if row else None


class SnapshottingOrderRepository:
    def __init__(self, event_store, snapshot_store: SnapshotStore) -> None:
        self._events = event_store
        self._snapshots = snapshot_store

    def load(self, order_id: str):
        from .domain import OrderAggregate
        from .repository import _deserialise

        stream_id = f"order-{order_id}"
        snap = self._snapshots.load_snapshot(stream_id)

        if snap:
            # Restore state from snapshot, then replay only newer events
            aggregate = OrderAggregate()
            aggregate.__dict__.update(snap["state"])
            aggregate.version = snap["version"]
            payloads = self._events.read_stream_after(stream_id, snap["version"])
        else:
            aggregate = OrderAggregate()
            payloads = self._events.read_stream(stream_id)

        for payload in payloads:
            aggregate._apply(_deserialise(payload))
            aggregate.version += 1

        return aggregate

    def save(self, aggregate) -> None:
        from .domain import OrderAggregate

        pending = aggregate.pop_pending_events()
        if not pending:
            return
        self._events.append_to_stream(
            stream_id=f"order-{aggregate.order_id}",
            events=[e.to_dict() for e in pending],
            expected_version=aggregate.version,
        )
        new_version = aggregate.version + len(pending)
        if new_version % SNAPSHOT_THRESHOLD == 0:
            state = {
                k: v for k, v in aggregate.__dict__.items()
                if not k.startswith("_")
            }
            self._snapshots.save_snapshot(
                f"order-{aggregate.order_id}", new_version, state
            )

When to Use Event Sourcing and CQRS — and When Not To

Use when: audit trail is a first-class requirement

Financial systems, healthcare records, compliance-regulated workflows, and supply chains all need a complete, tamper-evident history of every state change. Event Sourcing provides this without a separate audit log service — the event store is the audit log. GDPR right-to-erasure is handled by event encryption and key deletion, not by modifying stored events.

Use when: temporal queries are needed

"What was the state of this entity on a specific date?" is trivially answered by replaying events up to that point. In a state-based system, point-in-time queries require either event sourcing or a parallel history table. Event Sourcing makes this free.

Use when: multiple independent read models are needed

If different teams or services need fundamentally different views of the same data — one service needs a real-time order count by region, another needs a customer lifetime value calculation, a third needs a fraud risk score — CQRS projections let each team build and own their read model independently, without coupling to the write model schema.

Use when: the domain is genuinely event-driven

Domains with rich lifecycle transitions — e-commerce orders, loan applications, inventory reservations, insurance claims — map naturally to event streams. The events are the domain, not an implementation detail. When the domain experts talk about things that happened, not current states, Event Sourcing matches the mental model.

Avoid when: the domain is CRUD-shaped

User profile settings, configuration records, reference data tables — these are genuinely state-based. There is no meaningful event history; each update is just the new state. Forcing Event Sourcing onto CRUD operations adds infrastructure complexity with no benefit. Use a regular database and save Event Sourcing for the core domain.

Avoid when: the team is new to DDD

Event Sourcing and CQRS require well-defined aggregate boundaries, a ubiquitous language, and a domain model that accurately reflects business operations. Without Domain-Driven Design as a foundation, event streams become a dumping ground for arbitrary state changes and projections accumulate inconsistencies. Build the domain model first; add Event Sourcing only when the aggregates are stable.

Avoid for simple reporting requirements

If the primary driver for splitting read and write models is query performance on a single PostgreSQL table, CQRS is the wrong tool. Add an index, a materialised view, or a read replica. CQRS introduces eventual consistency and projection management overhead — justify that cost with a genuine need for independent write and read model evolution, not slow queries.

Event Sourcing Production Checklist

Events are always named in the past tense and contain full context

An event like MoneyWithdrawn(accountId, amount, currency, balance_after, initiated_by, timestamp) is self-describing. An event like StateChanged(accountId, newState) is useless to a projection that needs to know how the state changed. Events must carry all the data needed to reconstruct what happened, independently of the current system state.

Optimistic concurrency is enforced on every stream append

Without optimistic locking, two concurrent commands on the same aggregate can both read version 5 and both append at version 6, corrupting the stream. The event store must enforce the expected version constraint. Catch OptimisticConcurrencyError in command handlers and retry with the latest aggregate state — most conflicts in well-designed aggregates are transient.

Projections are idempotent

At-least-once delivery from Kafka means a projection may receive the same event twice. Every projection handler must be idempotent: INSERT ... ON CONFLICT DO NOTHING, UPDATE with a WHERE clause that checks the current state, or tracking processed event IDs in a deduplication table. A non-idempotent projection will produce incorrect read models on consumer restart.

Projection rebuilds are tested and documented

The whole point of Event Sourcing is that projections can be rebuilt from the event log. Test this regularly. A projection rebuild that has never been run in production will fail when you need it most — after a bug corrupts the read model. Document the rebuild command and the expected SLA for completion (usually based on event log size and projection throughput).

Schema versioning is designed before the first event is written

Add a schema_version field to every event from day one. When the schema evolves, increment the version and write an upcaster. Never mutate stored events. If you ship without schema versioning and then need to change an event shape, you face retrofitting upcasters to all stored events — painful work that could have been avoided with one extra field.

Aggregate boundaries are sized to the transaction, not the data model

An aggregate boundary defines the consistency boundary — everything inside must be consistent at the end of a command. Aggregates that are too large (an entire Order including all order line history, customer data, and payment records) have high contention and frequent optimistic concurrency conflicts. Aggregates that are too small break invariants across boundaries. Design aggregates around the invariants the domain requires, not the data model.

Further Reading

Work with us

Designing an event-sourced system or evaluating CQRS for your microservices architecture?

We design and implement Event Sourcing and CQRS architectures — from aggregate boundary design and event store selection to projection infrastructure, Kafka event bus integration, event versioning strategies, and migration from state-based persistence. Let’s talk.

Get in touch

Related Articles