The core problem: state is a lossy compression of history
A conventional CRUD database stores the current state of an entity — the latest snapshot. When you update a bank account balance, the previous balance is gone. When you cancel an order, the sequence of status transitions that led to the cancellation is lost. The audit log, if it exists at all, is a second-class citizen bolted on after the fact.
Event Sourcing inverts this model. Instead of storing current state, you store the sequence of events that caused it. The bank account does not have a balance column — it has a stream of AccountOpened, MoneyDeposited, and MoneyWithdrawn events. The current balance is derived by replaying those events. State becomes a projection of history, not the source of truth itself.
This unlocks capabilities that are impossible or expensive with state-based persistence: complete audit trails with zero extra effort, temporal queries ("what was the balance on March 3rd?"), event-driven integration without polling, and the ability to build new read models retroactively from the existing event log. The cost is increased complexity — and understanding that cost is the central skill in applying these patterns well. Tools like EventStoreDB and Axon Framework provide purpose-built infrastructure, though many teams implement event stores directly on PostgreSQL or Kafka.
Core Concepts — Events, Aggregates, and the Event Store
Domain Event
An immutable fact that something happened in the past, expressed in the ubiquitous language of the domain. Events are named in the past tense: OrderPlaced, PaymentProcessed, ShipmentDispatched. They carry all the data needed to understand what happened — not just foreign keys, but denormalised values. An event written to the store is never mutated or deleted.
Aggregate
A cluster of domain objects treated as a single unit for writes. The aggregate root is the only entry point for commands. The aggregate enforces invariants — business rules that must hold true across the entire cluster. In Event Sourcing, the aggregate's state is rebuilt by replaying its event stream from the beginning (or from a snapshot).
Event Stream
An ordered, append-only log of events for a single aggregate instance, identified by a stream ID (e.g., order-abc123). Events in the stream are numbered sequentially from version 0. Optimistic concurrency control uses the expected version: a writer specifies the version it read, and the store rejects the append if a concurrent write has already advanced the version.
Event Store
The persistence layer for event streams. Supports two primary operations: AppendToStream (with an expected version for optimistic locking) and ReadStreamForward (to replay events and reconstitute state). The event store is an append-only log — not a mutable record store. EventStoreDB is purpose-built; PostgreSQL and DynamoDB are common DIY alternatives.
Projection
A read model built by subscribing to the event stream and maintaining a denormalised view optimised for queries. Projections are derived data — they can always be rebuilt from scratch by replaying all events. This separates read and write concerns and allows multiple independent read models optimised for different access patterns (CQRS).
Snapshot
A checkpoint that stores the aggregate state at a known event version. When loading an aggregate, the event store loads the most recent snapshot, then replays only the events after that snapshot version. Snapshots reduce reconstitution time for aggregates with long event histories without breaking the audit trail.
Implementing Event Sourcing in Python — Aggregate, Store, and Repository
The following implementation uses PostgreSQL as the event store. Each event stream is stored as a sequence of rows in an events table. Optimistic concurrency is enforced with a unique constraint on (stream_id, version). The aggregate rebuilds its state by replaying events from the stream — no ORM, no mutable state columns.
# event_sourcing/domain.py — Domain events and Order aggregate
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from decimal import Decimal
from typing import Any
# ---------------------------------------------------------------------------
# Base event
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class DomainEvent:
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
occurred_at: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
def to_dict(self) -> dict[str, Any]:
raise NotImplementedError
# ---------------------------------------------------------------------------
# Order domain events
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
order_id: str = ""
customer_id: str = ""
items: list[dict] = field(default_factory=list)
total_amount: str = "0" # Decimal serialised as string
def to_dict(self) -> dict[str, Any]:
return {
"event_type": "OrderPlaced",
"event_id": self.event_id,
"occurred_at": self.occurred_at,
"order_id": self.order_id,
"customer_id": self.customer_id,
"items": self.items,
"total_amount": self.total_amount,
}
@dataclass(frozen=True)
class PaymentConfirmed(DomainEvent):
order_id: str = ""
payment_reference: str = ""
amount: str = "0"
def to_dict(self) -> dict[str, Any]:
return {
"event_type": "PaymentConfirmed",
"event_id": self.event_id,
"occurred_at": self.occurred_at,
"order_id": self.order_id,
"payment_reference": self.payment_reference,
"amount": self.amount,
}
@dataclass(frozen=True)
class OrderCancelled(DomainEvent):
order_id: str = ""
reason: str = ""
def to_dict(self) -> dict[str, Any]:
return {
"event_type": "OrderCancelled",
"event_id": self.event_id,
"occurred_at": self.occurred_at,
"order_id": self.order_id,
"reason": self.reason,
}
# ---------------------------------------------------------------------------
# Order aggregate
# ---------------------------------------------------------------------------
class OrderStatus:
PENDING = "PENDING"
PAID = "PAID"
CANCELLED = "CANCELLED"
class OrderAggregate:
"""
Aggregate root for the Order bounded context.
State is reconstituted exclusively by applying events — never set directly.
"""
def __init__(self) -> None:
self.order_id: str | None = None
self.customer_id: str | None = None
self.status: str | None = None
self.total_amount: Decimal = Decimal("0")
self.version: int = -1 # -1 = stream does not exist yet
self._pending_events: list[DomainEvent] = []
# ------------------------------------------------------------------
# Command handlers (enforce invariants, raise events)
# ------------------------------------------------------------------
def place_order(
self,
order_id: str,
customer_id: str,
items: list[dict],
total_amount: Decimal,
) -> None:
if self.status is not None:
raise ValueError("Order has already been placed")
self._raise(
OrderPlaced(
order_id=order_id,
customer_id=customer_id,
items=items,
total_amount=str(total_amount),
)
)
def confirm_payment(self, payment_reference: str, amount: Decimal) -> None:
if self.status != OrderStatus.PENDING:
raise ValueError(f"Cannot confirm payment for order in status {self.status}")
self._raise(
PaymentConfirmed(
order_id=self.order_id,
payment_reference=payment_reference,
amount=str(amount),
)
)
def cancel(self, reason: str) -> None:
if self.status == OrderStatus.CANCELLED:
raise ValueError("Order is already cancelled")
if self.status == OrderStatus.PAID:
raise ValueError("Cannot cancel a paid order without a refund")
self._raise(OrderCancelled(order_id=self.order_id, reason=reason))
# ------------------------------------------------------------------
# Event application (mutate state — no validation here)
# ------------------------------------------------------------------
def _apply(self, event: DomainEvent) -> None:
if isinstance(event, OrderPlaced):
self.order_id = event.order_id
self.customer_id = event.customer_id
self.status = OrderStatus.PENDING
self.total_amount = Decimal(event.total_amount)
elif isinstance(event, PaymentConfirmed):
self.status = OrderStatus.PAID
elif isinstance(event, OrderCancelled):
self.status = OrderStatus.CANCELLED
def _raise(self, event: DomainEvent) -> None:
self._apply(event)
self._pending_events.append(event)
# ------------------------------------------------------------------
# Reconstitution from stored events
# ------------------------------------------------------------------
@classmethod
def reconstitute(cls, events: list[DomainEvent]) -> "OrderAggregate":
aggregate = cls()
for event in events:
aggregate._apply(event)
aggregate.version += 1
return aggregate
def pop_pending_events(self) -> list[DomainEvent]:
events, self._pending_events = self._pending_events, []
return events# event_sourcing/store.py — PostgreSQL-backed event store
import json
import psycopg2
from psycopg2.extras import RealDictCursor
from typing import Any
# DDL — run once on database initialisation
CREATE_EVENTS_TABLE = """
CREATE TABLE IF NOT EXISTS events (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL,
version INTEGER NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT uq_stream_version UNIQUE (stream_id, version)
);
CREATE INDEX IF NOT EXISTS idx_events_stream_id ON events (stream_id);
"""
class OptimisticConcurrencyError(Exception):
"""Raised when an append fails because the stream version has advanced."""
class PostgresEventStore:
def __init__(self, dsn: str) -> None:
self._dsn = dsn
def _connect(self):
return psycopg2.connect(self._dsn)
def append_to_stream(
self,
stream_id: str,
events: list[dict[str, Any]],
expected_version: int,
) -> None:
"""
Append events to a stream, enforcing optimistic concurrency.
expected_version=-1 means the stream must not exist yet.
"""
with self._connect() as conn:
with conn.cursor() as cur:
# Verify current version
cur.execute(
"SELECT COALESCE(MAX(version), -1) FROM events WHERE stream_id = %s",
(stream_id,),
)
current_version: int = cur.fetchone()[0]
if current_version != expected_version:
raise OptimisticConcurrencyError(
f"Expected version {expected_version}, got {current_version} "
f"for stream {stream_id}"
)
for i, event in enumerate(events):
cur.execute(
"""
INSERT INTO events (stream_id, version, event_type, payload)
VALUES (%s, %s, %s, %s)
""",
(
stream_id,
expected_version + 1 + i,
event["event_type"],
json.dumps(event),
),
)
def read_stream(self, stream_id: str) -> list[dict[str, Any]]:
with self._connect() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"SELECT payload FROM events WHERE stream_id = %s ORDER BY version",
(stream_id,),
)
return [row["payload"] for row in cur.fetchall()]
def read_all_events(self, after_position: int = 0) -> list[dict[str, Any]]:
"""Read the global event log — used by projection rebuilders and subscriptions."""
with self._connect() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"SELECT id, payload FROM events WHERE id > %s ORDER BY id",
(after_position,),
)
return [{"position": row["id"], **row["payload"]} for row in cur.fetchall()]# event_sourcing/repository.py — Order repository wiring aggregate to store
from .domain import OrderAggregate, DomainEvent, OrderPlaced, PaymentConfirmed, OrderCancelled
from .store import PostgresEventStore
def _deserialise(payload: dict) -> DomainEvent:
et = payload["event_type"]
if et == "OrderPlaced":
return OrderPlaced(**{k: v for k, v in payload.items() if k != "event_type"})
if et == "PaymentConfirmed":
return PaymentConfirmed(**{k: v for k, v in payload.items() if k != "event_type"})
if et == "OrderCancelled":
return OrderCancelled(**{k: v for k, v in payload.items() if k != "event_type"})
raise ValueError(f"Unknown event type: {et}")
class OrderRepository:
def __init__(self, store: PostgresEventStore) -> None:
self._store = store
def load(self, order_id: str) -> OrderAggregate:
payloads = self._store.read_stream(f"order-{order_id}")
if not payloads:
raise KeyError(f"Order {order_id} not found")
events = [_deserialise(p) for p in payloads]
return OrderAggregate.reconstitute(events)
def save(self, aggregate: OrderAggregate) -> None:
pending = aggregate.pop_pending_events()
if not pending:
return
self._store.append_to_stream(
stream_id=f"order-{aggregate.order_id}",
events=[e.to_dict() for e in pending],
expected_version=aggregate.version,
)Note
CQRS — Splitting Read and Write Models
CQRS (Command Query Responsibility Segregation) is the pattern of using separate models for writes (commands) and reads (queries). In an event-sourced system, the write model is the aggregate — it enforces invariants and produces events. The read models are projections: denormalised views materialised from the event stream, each optimised for a specific query pattern.
A single aggregate might feed multiple independent projections: an orders_summary table for list views, an customer_order_history table for customer dashboards, and a revenue_by_day table for analytics — all powered by the same OrderPlaced and PaymentConfirmed events. Projections are eventually consistent — they lag slightly behind the command side — but they can be rebuilt from scratch at any time by replaying the event log.
# event_sourcing/projections.py — Projecting events into read models (PostgreSQL)
import json
import psycopg2
from .store import PostgresEventStore
CREATE_PROJECTION_TABLES = """
CREATE TABLE IF NOT EXISTS orders_summary (
order_id TEXT PRIMARY KEY,
customer_id TEXT NOT NULL,
status TEXT NOT NULL,
total_amount NUMERIC(12, 2) NOT NULL,
placed_at TIMESTAMPTZ,
paid_at TIMESTAMPTZ,
cancelled_at TIMESTAMPTZ
);
CREATE TABLE IF NOT EXISTS projection_checkpoints (
projection_name TEXT PRIMARY KEY,
last_position BIGINT NOT NULL DEFAULT 0
);
"""
class OrderSummaryProjection:
"""
Maintains a denormalised orders_summary table.
Tracks a checkpoint so it can resume after a restart without replaying all events.
"""
NAME = "orders_summary"
def __init__(self, dsn: str, store: PostgresEventStore) -> None:
self._dsn = dsn
self._store = store
def _connect(self):
return psycopg2.connect(self._dsn)
def _get_checkpoint(self, cur) -> int:
cur.execute(
"SELECT last_position FROM projection_checkpoints WHERE projection_name = %s",
(self.NAME,),
)
row = cur.fetchone()
return row[0] if row else 0
def _save_checkpoint(self, cur, position: int) -> None:
cur.execute(
"""
INSERT INTO projection_checkpoints (projection_name, last_position)
VALUES (%s, %s)
ON CONFLICT (projection_name) DO UPDATE SET last_position = EXCLUDED.last_position
""",
(self.NAME, position),
)
def run(self) -> None:
"""Process all unhandled events since the last checkpoint."""
with self._connect() as conn:
with conn.cursor() as cur:
checkpoint = self._get_checkpoint(cur)
events = self._store.read_all_events(after_position=checkpoint)
for event in events:
self._handle(cur, event)
self._save_checkpoint(cur, event["position"])
def rebuild(self) -> None:
"""Drop and rebuild the projection from the beginning of the event log."""
with self._connect() as conn:
with conn.cursor() as cur:
cur.execute("TRUNCATE orders_summary")
cur.execute(
"DELETE FROM projection_checkpoints WHERE projection_name = %s",
(self.NAME,),
)
self.run()
def _handle(self, cur, event: dict) -> None:
et = event.get("event_type")
if et == "OrderPlaced":
cur.execute(
"""
INSERT INTO orders_summary
(order_id, customer_id, status, total_amount, placed_at)
VALUES (%s, %s, 'PENDING', %s, %s)
ON CONFLICT (order_id) DO NOTHING
""",
(event["order_id"], event["customer_id"],
event["total_amount"], event["occurred_at"]),
)
elif et == "PaymentConfirmed":
cur.execute(
"""
UPDATE orders_summary
SET status = 'PAID', paid_at = %s
WHERE order_id = %s
""",
(event["occurred_at"], event["order_id"]),
)
elif et == "OrderCancelled":
cur.execute(
"""
UPDATE orders_summary
SET status = 'CANCELLED', cancelled_at = %s
WHERE order_id = %s
""",
(event["occurred_at"], event["order_id"]),
)TypeScript Implementation — Command Handling and Kafka Event Bus
In a microservices architecture, the event store is often complemented by an event bus: events are appended to the store and published to Kafka for consumption by downstream services. The pattern — sometimes called the Transactional Outbox — ensures that the event is durably recorded before it is published. The event store itself becomes the outbox.
// order-service/src/commands.ts — Command handlers and event publishing
import { Kafka, Producer } from "kafkajs";
import { Pool } from "pg";
interface OrderItem {
productId: string;
quantity: number;
unitPrice: number;
}
// ---------------------------------------------------------------------------
// Lightweight event store on PostgreSQL
// ---------------------------------------------------------------------------
async function appendToStream(
pool: Pool,
streamId: string,
events: object[],
expectedVersion: number
): Promise<void> {
const client = await pool.connect();
try {
await client.query("BEGIN");
const { rows } = await client.query<{ version: number }>(
"SELECT COALESCE(MAX(version), -1) AS version FROM events WHERE stream_id = $1",
[streamId]
);
const currentVersion = rows[0].version;
if (currentVersion !== expectedVersion) {
await client.query("ROLLBACK");
throw new Error(
`Optimistic concurrency conflict on ${streamId}: expected ${expectedVersion}, got ${currentVersion}`
);
}
for (let i = 0; i < events.length; i++) {
const event = events[i] as { event_type: string };
await client.query(
`INSERT INTO events (stream_id, version, event_type, payload)
VALUES ($1, $2, $3, $4)`,
[streamId, expectedVersion + 1 + i, event.event_type, JSON.stringify(event)]
);
}
await client.query("COMMIT");
} catch (err) {
await client.query("ROLLBACK");
throw err;
} finally {
client.release();
}
}
// ---------------------------------------------------------------------------
// PlaceOrder command handler
// ---------------------------------------------------------------------------
export class PlaceOrderHandler {
constructor(
private readonly pool: Pool,
private readonly producer: Producer
) {}
async handle(command: {
orderId: string;
customerId: string;
items: OrderItem[];
}): Promise<void> {
const total = command.items.reduce(
(sum, item) => sum + item.unitPrice * item.quantity,
0
);
const event = {
event_type: "OrderPlaced",
event_id: crypto.randomUUID(),
occurred_at: new Date().toISOString(),
order_id: command.orderId,
customer_id: command.customerId,
items: command.items,
total_amount: total.toFixed(2),
};
// 1. Append to the event store (source of truth)
await appendToStream(this.pool, `order-${command.orderId}`, [event], -1);
// 2. Publish to Kafka for downstream projections and other services
await this.producer.send({
topic: "domain-events.orders",
messages: [
{
key: command.orderId,
value: JSON.stringify(event),
headers: { "event-type": "OrderPlaced" },
},
],
});
}
}// order-service/src/projection-consumer.ts — Kafka consumer updating read models
import { Kafka, Consumer, EachMessagePayload } from "kafkajs";
import { Pool } from "pg";
export class OrderProjectionConsumer {
private consumer: Consumer;
constructor(
private readonly pool: Pool,
kafka: Kafka
) {
this.consumer = kafka.consumer({ groupId: "orders-read-model-projector" });
}
async start(): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({
topic: "domain-events.orders",
fromBeginning: false, // use fromBeginning: true when rebuilding
});
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
const event = JSON.parse(payload.message.value!.toString());
await this.project(event);
},
});
}
private async project(event: Record<string, string>): Promise<void> {
const { event_type } = event;
switch (event_type) {
case "OrderPlaced":
await this.pool.query(
`INSERT INTO orders_summary
(order_id, customer_id, status, total_amount, placed_at)
VALUES ($1, $2, 'PENDING', $3, $4)
ON CONFLICT (order_id) DO NOTHING`,
[event.order_id, event.customer_id, event.total_amount, event.occurred_at]
);
break;
case "PaymentConfirmed":
await this.pool.query(
`UPDATE orders_summary
SET status = 'PAID', paid_at = $1
WHERE order_id = $2`,
[event.occurred_at, event.order_id]
);
break;
case "OrderCancelled":
await this.pool.query(
`UPDATE orders_summary
SET status = 'CANCELLED', cancelled_at = $1
WHERE order_id = $2`,
[event.occurred_at, event.order_id]
);
break;
default:
// Unknown event type — log and skip (forward compatibility)
console.warn(`[projection] Unknown event type: ${event_type}`);
}
}
async stop(): Promise<void> {
await this.consumer.disconnect();
}
}Note
outbox table in the same transaction, then use a separate relay process (or Debezium CDC) to publish from the outbox to Kafka. This eliminates the dual-write inconsistency window entirely.Event Versioning — Upcasters and Schema Evolution
Events are immutable and permanent — you cannot change an event that has been stored. When the domain model evolves, old events must still be readable. The standard approach is upcasting: a function that transforms an older event version to the current shape before it reaches the aggregate. Upcasters are applied in sequence during deserialisation, keeping old event payloads in the store unchanged while allowing the aggregate to work against a single, current event schema.
# event_sourcing/upcasters.py — Event version migration without touching stored data
from typing import Any
def upcast_order_placed_v1_to_v2(payload: dict[str, Any]) -> dict[str, Any]:
"""
OrderPlaced v1 stored items as a flat list of product IDs.
v2 stores items as objects with productId, quantity, unitPrice.
Upcaster reconstructs the v2 shape from v1 data.
"""
return {
**payload,
"schema_version": 2,
"items": [
{"productId": pid, "quantity": 1, "unitPrice": "0.00"}
for pid in payload.get("product_ids", [])
],
}
def upcast_order_placed_v2_to_v3(payload: dict[str, Any]) -> dict[str, Any]:
"""
v3 adds a currency field (default USD for legacy events).
"""
return {**payload, "schema_version": 3, "currency": "USD"}
# Upcaster chain — applied left to right during deserialisation
ORDER_PLACED_UPCASTERS: dict[int, callable] = {
1: upcast_order_placed_v1_to_v2,
2: upcast_order_placed_v2_to_v3,
}
CURRENT_SCHEMA_VERSION = 3
def upcast(payload: dict[str, Any]) -> dict[str, Any]:
"""Apply all necessary upcasters to bring payload to the current schema version."""
version = payload.get("schema_version", 1)
upcasters = {
k: v for k, v in ORDER_PLACED_UPCASTERS.items() if k >= version
}
for v in sorted(upcasters):
payload = ORDER_PLACED_UPCASTERS[v](payload)
return payloadSnapshotting — Avoiding Full Replay for Long-Lived Aggregates
An aggregate with 10,000 events takes meaningful time to reconstitute on every command. Snapshots solve this by persisting the aggregate state at a known version. On load, the repository reads the latest snapshot (if one exists) and then replays only the events after that snapshot version. The event log itself is never modified — the snapshot is supplementary, not a replacement for the event history.
# event_sourcing/snapshot_store.py — Snapshot table and repository mixin
import json
import psycopg2
from psycopg2.extras import RealDictCursor
CREATE_SNAPSHOT_TABLE = """
CREATE TABLE IF NOT EXISTS snapshots (
stream_id TEXT PRIMARY KEY,
version INTEGER NOT NULL,
state JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
"""
SNAPSHOT_THRESHOLD = 100 # take a snapshot every N events
class SnapshotStore:
def __init__(self, dsn: str) -> None:
self._dsn = dsn
def save_snapshot(self, stream_id: str, version: int, state: dict) -> None:
with psycopg2.connect(self._dsn) as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO snapshots (stream_id, version, state)
VALUES (%s, %s, %s)
ON CONFLICT (stream_id) DO UPDATE
SET version = EXCLUDED.version,
state = EXCLUDED.state,
created_at = now()
""",
(stream_id, version, json.dumps(state)),
)
def load_snapshot(self, stream_id: str) -> dict | None:
with psycopg2.connect(self._dsn) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"SELECT version, state FROM snapshots WHERE stream_id = %s",
(stream_id,),
)
row = cur.fetchone()
return dict(row) if row else None
class SnapshottingOrderRepository:
def __init__(self, event_store, snapshot_store: SnapshotStore) -> None:
self._events = event_store
self._snapshots = snapshot_store
def load(self, order_id: str):
from .domain import OrderAggregate
from .repository import _deserialise
stream_id = f"order-{order_id}"
snap = self._snapshots.load_snapshot(stream_id)
if snap:
# Restore state from snapshot, then replay only newer events
aggregate = OrderAggregate()
aggregate.__dict__.update(snap["state"])
aggregate.version = snap["version"]
payloads = self._events.read_stream_after(stream_id, snap["version"])
else:
aggregate = OrderAggregate()
payloads = self._events.read_stream(stream_id)
for payload in payloads:
aggregate._apply(_deserialise(payload))
aggregate.version += 1
return aggregate
def save(self, aggregate) -> None:
from .domain import OrderAggregate
pending = aggregate.pop_pending_events()
if not pending:
return
self._events.append_to_stream(
stream_id=f"order-{aggregate.order_id}",
events=[e.to_dict() for e in pending],
expected_version=aggregate.version,
)
new_version = aggregate.version + len(pending)
if new_version % SNAPSHOT_THRESHOLD == 0:
state = {
k: v for k, v in aggregate.__dict__.items()
if not k.startswith("_")
}
self._snapshots.save_snapshot(
f"order-{aggregate.order_id}", new_version, state
)When to Use Event Sourcing and CQRS — and When Not To
Use when: audit trail is a first-class requirement
Financial systems, healthcare records, compliance-regulated workflows, and supply chains all need a complete, tamper-evident history of every state change. Event Sourcing provides this without a separate audit log service — the event store is the audit log. GDPR right-to-erasure is handled by event encryption and key deletion, not by modifying stored events.
Use when: temporal queries are needed
"What was the state of this entity on a specific date?" is trivially answered by replaying events up to that point. In a state-based system, point-in-time queries require either event sourcing or a parallel history table. Event Sourcing makes this free.
Use when: multiple independent read models are needed
If different teams or services need fundamentally different views of the same data — one service needs a real-time order count by region, another needs a customer lifetime value calculation, a third needs a fraud risk score — CQRS projections let each team build and own their read model independently, without coupling to the write model schema.
Use when: the domain is genuinely event-driven
Domains with rich lifecycle transitions — e-commerce orders, loan applications, inventory reservations, insurance claims — map naturally to event streams. The events are the domain, not an implementation detail. When the domain experts talk about things that happened, not current states, Event Sourcing matches the mental model.
Avoid when: the domain is CRUD-shaped
User profile settings, configuration records, reference data tables — these are genuinely state-based. There is no meaningful event history; each update is just the new state. Forcing Event Sourcing onto CRUD operations adds infrastructure complexity with no benefit. Use a regular database and save Event Sourcing for the core domain.
Avoid when: the team is new to DDD
Event Sourcing and CQRS require well-defined aggregate boundaries, a ubiquitous language, and a domain model that accurately reflects business operations. Without Domain-Driven Design as a foundation, event streams become a dumping ground for arbitrary state changes and projections accumulate inconsistencies. Build the domain model first; add Event Sourcing only when the aggregates are stable.
Avoid for simple reporting requirements
If the primary driver for splitting read and write models is query performance on a single PostgreSQL table, CQRS is the wrong tool. Add an index, a materialised view, or a read replica. CQRS introduces eventual consistency and projection management overhead — justify that cost with a genuine need for independent write and read model evolution, not slow queries.
Event Sourcing Production Checklist
Events are always named in the past tense and contain full context
An event like MoneyWithdrawn(accountId, amount, currency, balance_after, initiated_by, timestamp) is self-describing. An event like StateChanged(accountId, newState) is useless to a projection that needs to know how the state changed. Events must carry all the data needed to reconstruct what happened, independently of the current system state.
Optimistic concurrency is enforced on every stream append
Without optimistic locking, two concurrent commands on the same aggregate can both read version 5 and both append at version 6, corrupting the stream. The event store must enforce the expected version constraint. Catch OptimisticConcurrencyError in command handlers and retry with the latest aggregate state — most conflicts in well-designed aggregates are transient.
Projections are idempotent
At-least-once delivery from Kafka means a projection may receive the same event twice. Every projection handler must be idempotent: INSERT ... ON CONFLICT DO NOTHING, UPDATE with a WHERE clause that checks the current state, or tracking processed event IDs in a deduplication table. A non-idempotent projection will produce incorrect read models on consumer restart.
Projection rebuilds are tested and documented
The whole point of Event Sourcing is that projections can be rebuilt from the event log. Test this regularly. A projection rebuild that has never been run in production will fail when you need it most — after a bug corrupts the read model. Document the rebuild command and the expected SLA for completion (usually based on event log size and projection throughput).
Schema versioning is designed before the first event is written
Add a schema_version field to every event from day one. When the schema evolves, increment the version and write an upcaster. Never mutate stored events. If you ship without schema versioning and then need to change an event shape, you face retrofitting upcasters to all stored events — painful work that could have been avoided with one extra field.
Aggregate boundaries are sized to the transaction, not the data model
An aggregate boundary defines the consistency boundary — everything inside must be consistent at the end of a command. Aggregates that are too large (an entire Order including all order line history, customer data, and payment records) have high contention and frequent optimistic concurrency conflicts. Aggregates that are too small break invariants across boundaries. Design aggregates around the invariants the domain requires, not the data model.
Further Reading
- Martin Fowler — Event Sourcing — the canonical introduction to the pattern
- Greg Young — CQRS Documents — the original CQRS paper, still the most thorough reference
- EventStoreDB Blog — Event Sourcing and CQRS — practical implementation guidance from the maintainers of EventStoreDB
- microservices.io — Event Sourcing Pattern — Chris Richardson's concise pattern summary with context in microservices architecture
Work with us
Designing an event-sourced system or evaluating CQRS for your microservices architecture?
We design and implement Event Sourcing and CQRS architectures — from aggregate boundary design and event store selection to projection infrastructure, Kafka event bus integration, event versioning strategies, and migration from state-based persistence. Let’s talk.
Get in touch