Himanshu Kukreja
0%
LearnSystem DesignWeek 3Transactional Outbox
Day 02

Week 3 — Day 2: Transactional Outbox

System Design Mastery Series


Preface

It's month-end at a fintech startup. The accounting team is reconciling transactions and finds a disturbing pattern: 847 payments were charged to customers, but 23 of those never created an order in the system.

Customers were charged. Orders were never recorded. Refunds are required. Trust is damaged.

The engineering team investigates and finds the culprit in this innocent-looking code:

async def create_order(order_data):
    # Step 1: Save order to database
    order = await db.orders.insert(order_data)
    
    # Step 2: Publish event for downstream services
    await kafka.publish("order.created", order)
    
    return order

What went wrong?

┌────────────────────────────────────────────────────────────────────────┐
│                    THE DUAL-WRITE DISASTER                             │
│                                                                        │
│   Timeline of failure:                                                 │
│                                                                        │
│   T1: Database INSERT succeeds ✓                                       │
│       Order saved to PostgreSQL                                        │
│                                                                        │
│   T2: Application process crashes 💥                                   │
│       - Kubernetes pod killed during rolling deploy                    │
│       - Network partition to Kafka                                     │
│       - Out of memory error                                            │
│       - Unhandled exception                                            │
│                                                                        │
│   T3: Kafka publish NEVER HAPPENS ✗                                    │
│       - Payment service never notified                                 │
│       - Inventory service never notified                               │
│       - Email service never notified                                   │
│                                                                        │
│   Result:                                                              │
│       Order exists in database                                         │
│       But downstream services don't know about it                      │
│       Customer charged, order "lost"                                   │
│                                                                        │
│   Question: What went wrong?                                           │
│   Answer: Two writes (DB + Kafka) without atomicity                    │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Yesterday, we learned about queues and streams—powerful tools for decoupling services. But we glossed over a critical question: How do you reliably get messages INTO the queue in the first place?

This is the problem the Transactional Outbox pattern solves.

Today, we'll learn how to guarantee that every database write is accompanied by a corresponding message publish—atomically, reliably, and without losing data.


Part I: Foundations

Chapter 1: What Is the Dual-Write Problem?

1.1 The Simple Definition

The dual-write problem occurs when an application needs to write to two different systems (e.g., database and message broker) and cannot do so atomically. If the application crashes between the two writes, the systems become inconsistent.

┌────────────────────────────────────────────────────────────────────────┐
│                    EVERYDAY ANALOGY                                    │
│                                                                        │
│  Imagine you're a bank teller processing a transfer:                   │
│                                                                        │
│    Step 1: Debit $100 from Account A (write to ledger)                 │
│    Step 2: Credit $100 to Account B (write to different ledger)        │
│                                                                        │
│  What if you complete Step 1, then:                                    │
│    - Fire alarm goes off, you evacuate                                 │
│    - Your computer crashes                                             │
│    - You have a medical emergency                                      │
│                                                                        │
│  Result: $100 debited but never credited. Money "disappeared."         │
│                                                                        │
│  This is why banks use TRANSACTIONS:                                   │
│    Either BOTH happen, or NEITHER happens.                             │
│                                                                        │
│  But what if the two ledgers are in different buildings,               │
│  owned by different companies, with no shared transaction?             │
│                                                                        │
│  That's the dual-write problem in distributed systems.                 │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

1.2 Why It Happens in Distributed Systems

In a monolithic application with a single database, you use transactions:

# Single database - transactions work!
async with db.transaction():
    await db.execute("INSERT INTO orders ...")
    await db.execute("INSERT INTO order_items ...")
    await db.execute("UPDATE inventory ...")
# All or nothing - atomic!

But in a distributed system, you write to multiple systems:

# Two different systems - no shared transaction!
await db.execute("INSERT INTO orders ...")  # PostgreSQL
await kafka.publish("order.created", ...)   # Kafka

# These are TWO separate operations
# No way to make them atomic

Why can't we just use a distributed transaction?

Distributed transactions (like 2PC - Two-Phase Commit) exist, but:

  1. Kafka doesn't support them with external databases
  2. They're slow and complex
  3. They reduce availability (blocking)
  4. Most message brokers don't participate in XA transactions

1.3 The Three Failure Scenarios

Let's examine what can go wrong:

┌────────────────────────────────────────────────────────────────────────┐
│                    FAILURE SCENARIOS                                   │
│                                                                        │
│  SCENARIO 1: Database succeeds, Kafka fails                            │
│  ═══════════════════════════════════════════                           │
│                                                                        │
│    db.insert(order)     ──── SUCCESS ✓                                 │
│    kafka.publish(event) ──── FAILURE ✗ (timeout, broker down)          │
│                                                                        │
│    Result: Order in DB, event never published                          │
│    Impact: Downstream services never know about order                  │
│                                                                        │
│  SCENARIO 2: Database succeeds, app crashes before Kafka               │
│  ══════════════════════════════════════════════════════                │
│                                                                        │
│    db.insert(order)     ──── SUCCESS ✓                                 │
│    --- APP CRASHES ---                                                 │
│    kafka.publish(event) ──── NEVER EXECUTED                            │
│                                                                        │
│    Result: Same as Scenario 1                                          │
│    Impact: Silent data loss, very hard to detect                       │
│                                                                        │
│  SCENARIO 3: Kafka succeeds, database fails (if you flip the order)    │
│  ═════════════════════════════════════════════════════════════════     │
│                                                                        │
│    kafka.publish(event) ──── SUCCESS ✓                                 │
│    db.insert(order)     ──── FAILURE ✗ (constraint violation)          │
│                                                                        │
│    Result: Event published for non-existent order                      │
│    Impact: Downstream processes garbage data                           │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

The core insight: You cannot make two writes to different systems atomic without special patterns.

1.4 The Formal Definition

Term Meaning
Dual-write Writing to two different data stores in a single operation
Atomicity All operations succeed or all fail—no partial state
Outbox A database table that stores pending messages
CDC Change Data Capture—streaming database changes
Idempotency Same operation can be applied multiple times with same result

Chapter 2: The Transactional Outbox Pattern

2.1 The Core Idea

Instead of writing to the database AND the message broker, write everything to the database in a single transaction. Then, a separate process reads from the database and publishes to the message broker.

┌────────────────────────────────────────────────────────────────────────┐
│                    TRANSACTIONAL OUTBOX PATTERN                        │
│                                                                        │
│  BEFORE (Dual-Write - Dangerous):                                      │
│                                                                        │
│    Application                                                         │
│        │                                                               │
│        ├───────────────► Database (PostgreSQL)                         │
│        │                                                               │
│        └───────────────► Message Broker (Kafka)                        │
│                                                                        │
│    Two separate writes. Not atomic. Data loss possible.                │
│                                                                        │
│  ════════════════════════════════════════════════════════════════      │
│                                                                        │
│  AFTER (Transactional Outbox - Safe):                                  │
│                                                                        │
│    Application                                                         │
│        │                                                               │
│        │  Single Transaction                                           │
│        ▼                                                               │
│    ┌─────────────────────────────────────────┐                         │
│    │           DATABASE                       │                        │
│    │  ┌─────────────┐    ┌────────────────┐  │                         │
│    │  │   Orders    │    │    Outbox      │  │                         │
│    │  │   Table     │    │    Table       │  │                         │
│    │  │             │    │                │  │                         │
│    │  │ order_id    │    │ id             │  │                         │
│    │  │ user_id     │    │ aggregate_type │  │                         │
│    │  │ amount      │    │ aggregate_id   │  │                         │
│    │  │ status      │    │ event_type     │  │                         │
│    │  │ created_at  │    │ payload        │  │                         │
│    │  └─────────────┘    │ created_at     │  │                         │
│    │                     │ published_at   │  │                         │
│    │                     └────────────────┘  │                         │
│    └─────────────────────────────────────────┘                         │
│                              │                                         │
│                              │ Outbox Publisher                        │
│                              │ (Polling or CDC)                        │
│                              ▼                                         │
│                    ┌──────────────────┐                                │
│                    │   Message Broker │                                │
│                    │     (Kafka)      │                                │
│                    └──────────────────┘                                │
│                                                                        │
│    Both writes in ONE database transaction. Atomic!                    │
│    Separate process handles publishing. Reliable!                      │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

2.2 How It Works

Step 1: Application writes to database (single transaction)

async def create_order(order_data):
    async with db.transaction():
        # Write business data
        order = await db.execute("""
            INSERT INTO orders (user_id, amount, status)
            VALUES ($1, $2, 'created')
            RETURNING *
        """, order_data.user_id, order_data.amount)
        
        # Write event to outbox (same transaction!)
        await db.execute("""
            INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
            VALUES ('Order', $1, 'OrderCreated', $2)
        """, order.id, json.dumps({
            'order_id': order.id,
            'user_id': order.user_id,
            'amount': str(order.amount),
            'created_at': order.created_at.isoformat()
        }))
        
        return order
    
    # If transaction commits, BOTH writes succeed
    # If transaction fails, NEITHER write happens

Step 2: Outbox publisher reads and publishes

async def outbox_publisher():
    while True:
        async with db.transaction():
            # Get unpublished events
            events = await db.fetch("""
                SELECT * FROM outbox 
                WHERE published_at IS NULL
                ORDER BY created_at
                LIMIT 100
                FOR UPDATE SKIP LOCKED
            """)
            
            for event in events:
                # Publish to Kafka
                await kafka.publish(
                    topic=f"{event.aggregate_type.lower()}.events",
                    key=event.aggregate_id,
                    value=event.payload
                )
                
                # Mark as published
                await db.execute("""
                    UPDATE outbox SET published_at = NOW()
                    WHERE id = $1
                """, event.id)
        
        await asyncio.sleep(1)  # Poll interval

2.3 The Outbox Table Schema

-- Outbox table schema
CREATE TABLE outbox (
    id              BIGSERIAL PRIMARY KEY,
    aggregate_type  VARCHAR(255) NOT NULL,    -- e.g., 'Order', 'User', 'Payment'
    aggregate_id    VARCHAR(255) NOT NULL,    -- e.g., order ID, user ID
    event_type      VARCHAR(255) NOT NULL,    -- e.g., 'OrderCreated', 'OrderPaid'
    payload         JSONB NOT NULL,           -- Event data
    created_at      TIMESTAMP NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMP NULL,           -- NULL = not yet published
    
    -- Index for efficient polling
    INDEX idx_outbox_unpublished (created_at) WHERE published_at IS NULL
);

-- Optional: Partition by time for easier cleanup
-- CREATE TABLE outbox (...) PARTITION BY RANGE (created_at);

Key design decisions:

Column Purpose
aggregate_type Groups events by entity type. Used for topic routing.
aggregate_id The ID of the entity. Used as partition key in Kafka.
event_type Specific event name. Consumers filter by this.
payload Event data as JSON. Self-contained, includes all needed info.
published_at NULL means unpublished. Enables exactly-once tracking.

2.4 Two Approaches: Polling vs CDC

There are two ways to get events from the outbox to the message broker:

┌────────────────────────────────────────────────────────────────────────┐
│                    POLLING VS CDC                                      │
│                                                                        │
│  APPROACH 1: POLLING                                                   │
│  ════════════════════                                                  │
│                                                                        │
│    ┌─────────────┐         ┌─────────────┐         ┌─────────────      │
│    │  Database   │ ◄────── │   Poller    │ ──────► │   Kafka     │     │
│    │   Outbox    │  Query  │   Process   │ Publish │             │     │
│    └─────────────┘         └─────────────┘         └─────────────┘     │
│                                   │                                    │
│                              Every N sec                               │
│                                                                        │
│    How it works:                                                       │
│    1. Poller queries for unpublished events every N seconds            │
│    2. Publishes each event to Kafka                                    │
│    3. Marks events as published in database                            │
│                                                                        │
│    Pros:                                                               │
│    ✓ Simple to implement                                               │
│    ✓ No additional infrastructure                                      │
│    ✓ Works with any database                                           │
│                                                                        │
│    Cons:                                                               │
│    ✗ Latency (up to poll interval)                                     │
│    ✗ Database load from polling                                        │
│    ✗ Scaling requires coordination                                     │
│                                                                        │
│  ════════════════════════════════════════════════════════════════      │
│                                                                        │
│  APPROACH 2: CHANGE DATA CAPTURE (CDC)                                 │
│  ═════════════════════════════════════                                 │
│                                                                        │
│    ┌─────────────┐         ┌─────────────┐         ┌─────────────┐     │
│    │  Database   │ ──────► │  Debezium   │ ──────► │   Kafka     │     │
│    │   Outbox    │   WAL   │  Connector  │ Stream  │             │     │
│    └─────────────┘         └─────────────┘         └─────────────┘     │
│                                   │                                    │
│                           Real-time stream                             │
│                           from write-ahead log                         │
│                                                                        │
│    How it works:                                                       │
│    1. CDC reads database's write-ahead log (WAL/binlog)                │
│    2. Captures INSERT to outbox table in real-time                     │
│    3. Streams directly to Kafka                                        │
│                                                                        │
│    Pros:                                                               │
│    ✓ Near real-time (milliseconds)                                     │
│    ✓ No polling load on database                                       │
│    ✓ Scales automatically                                              │
│                                                                        │
│    Cons:                                                               │
│    ✗ More complex infrastructure                                       │
│    ✗ Requires database WAL access                                      │
│    ✗ Additional component to operate (Debezium)                        │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

When to use which:

Scenario Recommendation
Starting out, low volume Polling (simpler)
Need low latency (< 1 second) CDC
High volume (> 10K events/sec) CDC
Can't access WAL (managed DB restrictions) Polling
Already using Kafka Connect CDC with Debezium

Chapter 3: Trade-offs and Considerations

3.1 Trade-off Matrix

┌────────────────────────────────────────────────────────────────────────┐
│                    TRANSACTIONAL OUTBOX TRADE-OFFS                     │
│                                                                        │
│  BENEFITS:                                                             │
│  ├── ✓ Atomic: Both writes succeed or both fail                        │
│  ├── ✓ Reliable: No message loss possible                              │
│  ├── ✓ Ordered: Events published in order they were created            │
│  ├── ✓ Debuggable: Outbox table is audit log                           │
│  └── ✓ Recoverable: Can republish on failure                           │
│                                                                        │
│  COSTS:                                                                │
│  ├── ✗ Latency: Not immediate (polling delay or CDC pipeline)          │
│  ├── ✗ Complexity: Additional table, publisher process                 │
│  ├── ✗ Storage: Outbox table grows, needs cleanup                      │
│  ├── ✗ Database load: Extra writes, extra queries                      │
│  └── ✗ Ordering: Only within same database transaction                 │
│                                                                        │
│  ALTERNATIVES CONSIDERED:                                              │
│                                                                        │
│  1. "Just retry on failure"                                            │
│     Problem: Crash between writes = no retry, silent loss              │
│                                                                        │
│  2. "Publish first, then write to DB"                                  │
│     Problem: Event published for data that might not exist             │
│                                                                        │
│  3. "Distributed transaction (2PC)"                                    │
│     Problem: Slow, complex, Kafka doesn't support XA                   │
│                                                                        │
│  4. "Event sourcing"                                                   │
│     Alternative: Store events as source of truth, derive state         │
│     Trade-off: Major architectural change, not always appropriate      │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

3.2 Exactly-Once Publishing

A critical question: What if the publisher crashes after sending to Kafka but before marking as published?

┌────────────────────────────────────────────────────────────────────────┐
│                    THE PUBLISHER CRASH SCENARIO                        │
│                                                                        │
│  Timeline:                                                             │
│                                                                        │
│  T1: Publisher reads event from outbox                                 │
│      SELECT * FROM outbox WHERE published_at IS NULL                   │
│                                                                        │
│  T2: Publisher sends to Kafka                                          │
│      kafka.publish("order.events", event) ──── SUCCESS ✓               │
│                                                                        │
│  T3: Publisher crashes before updating outbox 💥                       │
│      UPDATE outbox SET published_at = NOW() ──── NEVER EXECUTED        │
│                                                                        │
│  T4: Publisher restarts, reads same event again                        │
│      (published_at still NULL)                                         │
│                                                                        │
│  T5: Publisher sends to Kafka AGAIN                                    │
│      kafka.publish("order.events", event) ──── DUPLICATE!              │
│                                                                        │
│  Result: Event published TWICE                                         │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Solutions:

  1. Accept at-least-once, make consumers idempotent (most common)

    • Publisher may send duplicates
    • Consumers deduplicate by event ID
    • This is what we learned in Week 2
  2. Use Kafka transactions (if using Kafka)

    • Kafka supports transactions
    • Publish + offset commit atomically
    • More complex but true exactly-once
  3. Use CDC with Debezium

    • Debezium tracks position in WAL
    • Automatic exactly-once semantics
    • No duplicate publishing

3.3 Outbox Table Cleanup

The outbox table grows forever if you don't clean it up:

-- Option 1: Delete after publishing (simple, no history)
DELETE FROM outbox WHERE published_at IS NOT NULL;

-- Option 2: Delete old published events (keep recent for debugging)
DELETE FROM outbox 
WHERE published_at IS NOT NULL 
AND published_at < NOW() - INTERVAL '7 days';

-- Option 3: Partition by time, drop old partitions (efficient at scale)
-- Requires table partitioning setup
ALTER TABLE outbox DROP PARTITION outbox_2024_01;

Cleanup strategies:

Strategy Pros Cons
Delete immediately No storage growth No history for debugging
Delete after N days Keep recent history Cleanup job needed
Partition + drop Very efficient Complex setup
Archive to cold storage Full history Extra infrastructure

3.4 Decision Framework

┌────────────────────────────────────────────────────────────────────────┐
│                    WHEN TO USE TRANSACTIONAL OUTBOX                    │
│                                                                        │
│  USE TRANSACTIONAL OUTBOX WHEN:                                        │
│                                                                        │
│    ✓ You have a database + message broker architecture                 │
│    ✓ Message loss is unacceptable                                      │
│    ✓ You need guaranteed delivery                                      │
│    ✓ Events must reflect committed database state                      │
│                                                                        │
│  CONSIDER ALTERNATIVES WHEN:                                           │
│                                                                        │
│    → Eventual loss is acceptable (metrics, logs)                       │
│      Use: Fire-and-forget publishing                                   │
│                                                                        │
│    → Events ARE your source of truth                                   │
│      Use: Event sourcing (no separate database)                        │
│                                                                        │
│    → You're using a database with built-in streaming                   │
│      Use: Database-native features (e.g., PostgreSQL logical           │
│           replication, MongoDB change streams)                         │
│                                                                        │
│    → Single service, no downstream consumers                           │
│      Use: Nothing—you don't need messaging                             │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Part II: Implementation

Chapter 4: Basic Implementation

4.1 The Simplest Version

Let's start with a basic polling-based outbox. This is educational—we'll improve it for production.

# Basic Transactional Outbox Implementation
# WARNING: Not production-ready - for learning only

import asyncio
import json
from datetime import datetime
from typing import Any, Dict, Optional
from dataclasses import dataclass

import asyncpg
from aiokafka import AIOKafkaProducer


@dataclass
class OutboxEvent:
    """Represents an event in the outbox."""
    id: int
    aggregate_type: str
    aggregate_id: str
    event_type: str
    payload: Dict[str, Any]
    created_at: datetime


class SimpleOutbox:
    """
    Basic transactional outbox implementation.
    
    Usage:
        async with db.transaction():
            order = await create_order(...)
            await outbox.add_event(
                aggregate_type="Order",
                aggregate_id=str(order.id),
                event_type="OrderCreated",
                payload={"order_id": order.id, ...}
            )
    """
    
    def __init__(self, db_pool: asyncpg.Pool):
        self.db = db_pool
    
    async def add_event(
        self,
        aggregate_type: str,
        aggregate_id: str,
        event_type: str,
        payload: Dict[str, Any],
        connection: Optional[asyncpg.Connection] = None
    ) -> int:
        """
        Add an event to the outbox.
        
        IMPORTANT: Call this within the same transaction as your business logic!
        """
        conn = connection or await self.db.acquire()
        try:
            result = await conn.fetchval("""
                INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
                VALUES ($1, $2, $3, $4)
                RETURNING id
            """, aggregate_type, aggregate_id, event_type, json.dumps(payload))
            return result
        finally:
            if connection is None:
                await self.db.release(conn)


class SimpleOutboxPublisher:
    """
    Basic polling-based outbox publisher.
    """
    
    def __init__(
        self,
        db_pool: asyncpg.Pool,
        kafka_producer: AIOKafkaProducer,
        poll_interval: float = 1.0,
        batch_size: int = 100
    ):
        self.db = db_pool
        self.kafka = kafka_producer
        self.poll_interval = poll_interval
        self.batch_size = batch_size
        self._running = False
    
    async def start(self):
        """Start the publisher loop."""
        self._running = True
        while self._running:
            try:
                published = await self._publish_batch()
                if published == 0:
                    # No events, wait before next poll
                    await asyncio.sleep(self.poll_interval)
            except Exception as e:
                print(f"Publisher error: {e}")
                await asyncio.sleep(self.poll_interval)
    
    async def stop(self):
        """Stop the publisher."""
        self._running = False
    
    async def _publish_batch(self) -> int:
        """Publish a batch of events. Returns count published."""
        async with self.db.acquire() as conn:
            async with conn.transaction():
                # Lock and fetch unpublished events
                events = await conn.fetch("""
                    SELECT id, aggregate_type, aggregate_id, event_type, payload, created_at
                    FROM outbox
                    WHERE published_at IS NULL
                    ORDER BY created_at
                    LIMIT $1
                    FOR UPDATE SKIP LOCKED
                """, self.batch_size)
                
                if not events:
                    return 0
                
                # Publish each event
                for event in events:
                    topic = f"{event['aggregate_type'].lower()}.events"
                    await self.kafka.send_and_wait(
                        topic,
                        key=event['aggregate_id'].encode(),
                        value=event['payload'].encode()
                    )
                
                # Mark all as published
                event_ids = [e['id'] for e in events]
                await conn.execute("""
                    UPDATE outbox
                    SET published_at = NOW()
                    WHERE id = ANY($1)
                """, event_ids)
                
                return len(events)


# Usage example
async def create_order_example(db: asyncpg.Pool, outbox: SimpleOutbox):
    """Example of creating an order with outbox event."""
    
    async with db.acquire() as conn:
        async with conn.transaction():
            # Create the order
            order = await conn.fetchrow("""
                INSERT INTO orders (user_id, amount, status)
                VALUES ($1, $2, 'created')
                RETURNING id, user_id, amount, status, created_at
            """, "user_123", 99.99)
            
            # Add event to outbox (SAME TRANSACTION!)
            await outbox.add_event(
                aggregate_type="Order",
                aggregate_id=str(order['id']),
                event_type="OrderCreated",
                payload={
                    "order_id": order['id'],
                    "user_id": order['user_id'],
                    "amount": str(order['amount']),
                    "status": order['status'],
                    "created_at": order['created_at'].isoformat()
                },
                connection=conn  # Use same connection!
            )
            
            return dict(order)

4.2 Understanding the Flow

┌─────────────────────────────────────────────────────────────────────────┐
│                    OUTBOX FLOW DIAGRAM                                  │
│                                                                         │
│  Step 1: Application creates order                                      │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │  async with db.transaction():                                    │   │
│  │      order = INSERT INTO orders ...                              │   │
│  │      event = INSERT INTO outbox ...                              │   │
│  │  # COMMIT                                                        │   │
│  └──────────────────────────────────────────────────────────────────┘   │
│                              │                                          │
│                              ▼                                          │
│  Step 2: Transaction commits atomically                                 │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │  orders table:  [order_123] ✓                                    │   │
│  │  outbox table:  [event_456, published_at=NULL] ✓                 │   │
│  └──────────────────────────────────────────────────────────────────┘   │
│                              │                                          │
│                              ▼                                          │
│  Step 3: Publisher polls for unpublished events                         │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │  SELECT * FROM outbox WHERE published_at IS NULL                 │   │
│  │  FOR UPDATE SKIP LOCKED                                          │   │
│  │  → Returns event_456                                             │   │
│  └──────────────────────────────────────────────────────────────────┘   │
│                              │                                          │
│                              ▼                                          │
│  Step 4: Publisher sends to Kafka                                       │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │  kafka.send("order.events", key="order_123", value={...})        │   │
│  │  → Kafka acknowledges                                            │   │
│  └──────────────────────────────────────────────────────────────────┘   │
│                              │                                          │
│                              ▼                                          │
│  Step 5: Publisher marks as published                                   │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │  UPDATE outbox SET published_at = NOW() WHERE id = event_456     │   │
│  └──────────────────────────────────────────────────────────────────┘   │
│                              │                                          │
│                              ▼                                          │
│  Step 6: Event delivered to consumers                                   │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │  Payment Service: Processes OrderCreated                         │   │
│  │  Notification Service: Sends confirmation email                  │   │
│  │  Analytics Service: Records order metrics                        │   │
│  └──────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Chapter 5: Production-Ready Implementation

5.1 Requirements for Production

Our production implementation needs:

  1. Exactly-once semantics — Handle publisher crashes gracefully
  2. High throughput — Batch publishing for efficiency
  3. Low latency — Minimize delay between write and publish
  4. Monitoring — Track lag, errors, throughput
  5. Graceful shutdown — Don't lose in-flight events
  6. Horizontal scaling — Multiple publishers without duplicates

5.2 Full Production Implementation

# Production-Ready Transactional Outbox Implementation

import asyncio
import json
import signal
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List, Callable
from enum import Enum
import logging
from contextlib import asynccontextmanager
import hashlib

import asyncpg
from aiokafka import AIOKafkaProducer

logger = logging.getLogger(__name__)


# =============================================================================
# Configuration
# =============================================================================

@dataclass
class OutboxConfig:
    """Configuration for the transactional outbox."""
    
    # Database settings
    db_dsn: str
    db_pool_min: int = 5
    db_pool_max: int = 20
    
    # Kafka settings
    kafka_bootstrap_servers: str = "localhost:9092"
    kafka_acks: str = "all"
    kafka_retries: int = 3
    
    # Publisher settings
    poll_interval_ms: int = 100          # How often to poll (ms)
    batch_size: int = 100                 # Events per batch
    max_publish_attempts: int = 5         # Before giving up on an event
    lock_timeout_seconds: int = 300       # Stale lock detection
    
    # Cleanup settings
    retention_hours: int = 168            # 7 days
    cleanup_batch_size: int = 1000


@dataclass
class OutboxEvent:
    """An event stored in the outbox."""
    id: int
    aggregate_type: str
    aggregate_id: str
    event_type: str
    payload: Dict[str, Any]
    created_at: datetime
    publish_attempts: int = 0
    last_error: Optional[str] = None


@dataclass
class PublishResult:
    """Result of publishing an event."""
    event_id: int
    success: bool
    topic: str
    partition: Optional[int] = None
    offset: Optional[int] = None
    error: Optional[str] = None


# =============================================================================
# Outbox Writer (used by application code)
# =============================================================================

class OutboxWriter:
    """
    Writes events to the outbox table.
    
    Usage:
        async with db.transaction() as conn:
            # Your business logic
            order = await create_order(conn, order_data)
            
            # Write to outbox in same transaction
            await outbox.write(
                conn,
                aggregate_type="Order",
                aggregate_id=str(order.id),
                event_type="OrderCreated",
                payload=order.to_event_payload()
            )
    """
    
    def __init__(self, config: OutboxConfig):
        self.config = config
    
    async def write(
        self,
        connection: asyncpg.Connection,
        aggregate_type: str,
        aggregate_id: str,
        event_type: str,
        payload: Dict[str, Any],
        event_id: Optional[str] = None
    ) -> int:
        """
        Write an event to the outbox.
        
        Args:
            connection: Database connection (must be in a transaction!)
            aggregate_type: Type of entity (e.g., "Order", "User")
            aggregate_id: ID of the entity
            event_type: Type of event (e.g., "OrderCreated")
            payload: Event data
            event_id: Optional idempotency key for the event
            
        Returns:
            The outbox event ID
        """
        # Generate event_id if not provided
        if event_id is None:
            event_id = self._generate_event_id(aggregate_type, aggregate_id, event_type, payload)
        
        result = await connection.fetchval("""
            INSERT INTO outbox (
                event_id, aggregate_type, aggregate_id, event_type, payload
            )
            VALUES ($1, $2, $3, $4, $5)
            ON CONFLICT (event_id) DO NOTHING
            RETURNING id
        """, event_id, aggregate_type, aggregate_id, event_type, json.dumps(payload))
        
        if result is None:
            # Event already exists (idempotency)
            result = await connection.fetchval(
                "SELECT id FROM outbox WHERE event_id = $1", event_id
            )
            logger.debug(f"Event {event_id} already exists, returning existing ID {result}")
        
        return result
    
    async def write_batch(
        self,
        connection: asyncpg.Connection,
        events: List[Dict[str, Any]]
    ) -> List[int]:
        """
        Write multiple events to the outbox.
        
        Args:
            connection: Database connection
            events: List of event dicts with aggregate_type, aggregate_id, event_type, payload
            
        Returns:
            List of outbox event IDs
        """
        ids = []
        for event in events:
            event_id = await self.write(
                connection,
                aggregate_type=event['aggregate_type'],
                aggregate_id=event['aggregate_id'],
                event_type=event['event_type'],
                payload=event['payload'],
                event_id=event.get('event_id')
            )
            ids.append(event_id)
        return ids
    
    def _generate_event_id(
        self,
        aggregate_type: str,
        aggregate_id: str,
        event_type: str,
        payload: Dict[str, Any]
    ) -> str:
        """Generate a deterministic event ID for idempotency."""
        content = f"{aggregate_type}:{aggregate_id}:{event_type}:{json.dumps(payload, sort_keys=True)}"
        return hashlib.sha256(content.encode()).hexdigest()[:32]


# =============================================================================
# Outbox Publisher (background process)
# =============================================================================

class OutboxPublisher:
    """
    Publishes events from the outbox to Kafka.
    
    Features:
    - Batch publishing for throughput
    - Exactly-once semantics via event_id deduplication
    - Automatic retry with backoff
    - Stale lock detection
    - Graceful shutdown
    - Metrics and monitoring
    """
    
    def __init__(
        self,
        config: OutboxConfig,
        topic_resolver: Optional[Callable[[OutboxEvent], str]] = None
    ):
        self.config = config
        self.topic_resolver = topic_resolver or self._default_topic_resolver
        
        self.db_pool: Optional[asyncpg.Pool] = None
        self.kafka: Optional[AIOKafkaProducer] = None
        
        self._running = False
        self._shutdown_event = asyncio.Event()
        
        # Metrics
        self.events_published = 0
        self.events_failed = 0
        self.batches_processed = 0
        self.last_publish_time: Optional[datetime] = None
    
    async def start(self) -> None:
        """Initialize connections and start publishing."""
        logger.info("Starting outbox publisher...")
        
        # Initialize database pool
        self.db_pool = await asyncpg.create_pool(
            self.config.db_dsn,
            min_size=self.config.db_pool_min,
            max_size=self.config.db_pool_max
        )
        
        # Initialize Kafka producer
        self.kafka = AIOKafkaProducer(
            bootstrap_servers=self.config.kafka_bootstrap_servers,
            acks=self.config.kafka_acks,
            retries=self.config.kafka_retries,
            enable_idempotence=True  # Kafka-level idempotence
        )
        await self.kafka.start()
        
        self._running = True
        logger.info("Outbox publisher started")
    
    async def stop(self) -> None:
        """Gracefully stop the publisher."""
        logger.info("Stopping outbox publisher...")
        self._running = False
        self._shutdown_event.set()
        
        # Wait for current batch to complete
        await asyncio.sleep(0.5)
        
        if self.kafka:
            await self.kafka.stop()
        
        if self.db_pool:
            await self.db_pool.close()
        
        logger.info(
            f"Outbox publisher stopped. "
            f"Published: {self.events_published}, Failed: {self.events_failed}"
        )
    
    async def run(self) -> None:
        """Main publishing loop."""
        while self._running:
            try:
                published = await self._process_batch()
                
                if published == 0:
                    # No events to process, wait before next poll
                    try:
                        await asyncio.wait_for(
                            self._shutdown_event.wait(),
                            timeout=self.config.poll_interval_ms / 1000
                        )
                    except asyncio.TimeoutError:
                        pass  # Normal timeout, continue polling
                else:
                    self.batches_processed += 1
                    
            except asyncio.CancelledError:
                logger.info("Publisher loop cancelled")
                break
            except Exception as e:
                logger.error(f"Publisher error: {e}", exc_info=True)
                await asyncio.sleep(1)  # Back off on error
    
    async def _process_batch(self) -> int:
        """Process a batch of events. Returns count published."""
        async with self.db_pool.acquire() as conn:
            # Use advisory lock for distributed coordination
            lock_acquired = await conn.fetchval(
                "SELECT pg_try_advisory_lock(hashtext('outbox_publisher'))"
            )
            
            if not lock_acquired:
                # Another publisher has the lock
                return 0
            
            try:
                return await self._process_batch_with_lock(conn)
            finally:
                await conn.execute(
                    "SELECT pg_advisory_unlock(hashtext('outbox_publisher'))"
                )
    
    async def _process_batch_with_lock(self, conn: asyncpg.Connection) -> int:
        """Process batch while holding the lock."""
        async with conn.transaction():
            # Fetch unpublished events
            events = await conn.fetch("""
                SELECT 
                    id, event_id, aggregate_type, aggregate_id, 
                    event_type, payload, created_at, publish_attempts
                FROM outbox
                WHERE published_at IS NULL
                AND publish_attempts < $1
                ORDER BY created_at
                LIMIT $2
                FOR UPDATE SKIP LOCKED
            """, self.config.max_publish_attempts, self.config.batch_size)
            
            if not events:
                return 0
            
            # Process each event
            published_ids = []
            failed_updates = []
            
            for row in events:
                event = OutboxEvent(
                    id=row['id'],
                    aggregate_type=row['aggregate_type'],
                    aggregate_id=row['aggregate_id'],
                    event_type=row['event_type'],
                    payload=json.loads(row['payload']),
                    created_at=row['created_at'],
                    publish_attempts=row['publish_attempts']
                )
                
                result = await self._publish_event(event)
                
                if result.success:
                    published_ids.append(event.id)
                    self.events_published += 1
                else:
                    failed_updates.append((event.id, result.error))
                    self.events_failed += 1
            
            # Mark successful events as published
            if published_ids:
                await conn.execute("""
                    UPDATE outbox
                    SET published_at = NOW()
                    WHERE id = ANY($1)
                """, published_ids)
            
            # Update failed events (increment attempts, record error)
            for event_id, error in failed_updates:
                await conn.execute("""
                    UPDATE outbox
                    SET 
                        publish_attempts = publish_attempts + 1,
                        last_error = $2,
                        last_attempt_at = NOW()
                    WHERE id = $1
                """, event_id, error[:1000])  # Truncate error
            
            self.last_publish_time = datetime.utcnow()
            return len(published_ids)
    
    async def _publish_event(self, event: OutboxEvent) -> PublishResult:
        """Publish a single event to Kafka."""
        topic = self.topic_resolver(event)
        
        try:
            # Serialize payload
            value = json.dumps({
                'event_id': event.id,
                'event_type': event.event_type,
                'aggregate_type': event.aggregate_type,
                'aggregate_id': event.aggregate_id,
                'payload': event.payload,
                'created_at': event.created_at.isoformat()
            }).encode('utf-8')
            
            key = event.aggregate_id.encode('utf-8')
            
            # Send to Kafka
            result = await self.kafka.send_and_wait(topic, value=value, key=key)
            
            logger.debug(
                f"Published event {event.id} to {topic}:{result.partition}@{result.offset}"
            )
            
            return PublishResult(
                event_id=event.id,
                success=True,
                topic=topic,
                partition=result.partition,
                offset=result.offset
            )
            
        except Exception as e:
            logger.warning(f"Failed to publish event {event.id}: {e}")
            return PublishResult(
                event_id=event.id,
                success=False,
                topic=topic,
                error=str(e)
            )
    
    def _default_topic_resolver(self, event: OutboxEvent) -> str:
        """Default topic naming: lowercase aggregate type + .events"""
        return f"{event.aggregate_type.lower()}.events"


# =============================================================================
# Outbox Cleaner (periodic cleanup)
# =============================================================================

class OutboxCleaner:
    """Cleans up old published events from the outbox."""
    
    def __init__(self, db_pool: asyncpg.Pool, config: OutboxConfig):
        self.db = db_pool
        self.config = config
        self._running = False
    
    async def run_once(self) -> int:
        """Run a single cleanup pass. Returns number of deleted rows."""
        cutoff = datetime.utcnow() - timedelta(hours=self.config.retention_hours)
        
        async with self.db.acquire() as conn:
            result = await conn.execute("""
                DELETE FROM outbox
                WHERE published_at IS NOT NULL
                AND published_at < $1
                LIMIT $2
            """, cutoff, self.config.cleanup_batch_size)
            
            deleted = int(result.split()[-1])
            if deleted > 0:
                logger.info(f"Cleaned up {deleted} old outbox events")
            return deleted
    
    async def run(self, interval_seconds: int = 3600):
        """Run cleanup periodically."""
        self._running = True
        while self._running:
            try:
                await self.run_once()
            except Exception as e:
                logger.error(f"Cleanup error: {e}")
            
            await asyncio.sleep(interval_seconds)
    
    def stop(self):
        self._running = False


# =============================================================================
# Database Schema
# =============================================================================

OUTBOX_SCHEMA = """
-- Outbox table for transactional outbox pattern
CREATE TABLE IF NOT EXISTS outbox (
    id              BIGSERIAL PRIMARY KEY,
    event_id        VARCHAR(64) NOT NULL UNIQUE,  -- Idempotency key
    aggregate_type  VARCHAR(255) NOT NULL,
    aggregate_id    VARCHAR(255) NOT NULL,
    event_type      VARCHAR(255) NOT NULL,
    payload         JSONB NOT NULL,
    created_at      TIMESTAMP NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMP NULL,
    publish_attempts INTEGER NOT NULL DEFAULT 0,
    last_attempt_at TIMESTAMP NULL,
    last_error      TEXT NULL
);

-- Index for efficient polling of unpublished events
CREATE INDEX IF NOT EXISTS idx_outbox_unpublished 
ON outbox (created_at) 
WHERE published_at IS NULL;

-- Index for cleanup of old published events
CREATE INDEX IF NOT EXISTS idx_outbox_published 
ON outbox (published_at) 
WHERE published_at IS NOT NULL;

-- Index for event_id lookups (idempotency checks)
CREATE INDEX IF NOT EXISTS idx_outbox_event_id 
ON outbox (event_id);
"""


# =============================================================================
# Example Usage
# =============================================================================

async def example_usage():
    """Complete example of using the transactional outbox."""
    
    config = OutboxConfig(
        db_dsn="postgresql://user:pass@localhost/mydb",
        kafka_bootstrap_servers="localhost:9092"
    )
    
    # Initialize components
    db_pool = await asyncpg.create_pool(config.db_dsn)
    outbox_writer = OutboxWriter(config)
    outbox_publisher = OutboxPublisher(config)
    
    # Start publisher in background
    await outbox_publisher.start()
    publisher_task = asyncio.create_task(outbox_publisher.run())
    
    try:
        # Application code: Create an order with outbox event
        async with db_pool.acquire() as conn:
            async with conn.transaction():
                # Business logic
                order = await conn.fetchrow("""
                    INSERT INTO orders (user_id, amount, status)
                    VALUES ($1, $2, 'created')
                    RETURNING *
                """, "user_123", 99.99)
                
                # Write to outbox (same transaction!)
                await outbox_writer.write(
                    conn,
                    aggregate_type="Order",
                    aggregate_id=str(order['id']),
                    event_type="OrderCreated",
                    payload={
                        "order_id": order['id'],
                        "user_id": order['user_id'],
                        "amount": float(order['amount']),
                        "status": order['status']
                    }
                )
                
                print(f"Created order {order['id']}")
        
        # Wait for event to be published
        await asyncio.sleep(2)
        
    finally:
        # Cleanup
        await outbox_publisher.stop()
        publisher_task.cancel()
        await db_pool.close()


if __name__ == "__main__":
    asyncio.run(example_usage())

5.3 CDC Implementation with Debezium

For high-volume systems, CDC (Change Data Capture) is more efficient than polling:

# docker-compose.yml for Debezium CDC
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.0.1
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  postgres:
    image: postgres:14
    ports:
      - "5432:5432"
    environment:
      POSTGRES_PASSWORD: postgres
    command: 
      - "postgres"
      - "-c"
      - "wal_level=logical"  # Required for CDC

  debezium:
    image: debezium/connect:2.0
    depends_on:
      - kafka
      - postgres
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: debezium_configs
      OFFSET_STORAGE_TOPIC: debezium_offsets
      STATUS_STORAGE_TOPIC: debezium_status
// Debezium connector configuration
// POST to http://localhost:8083/connectors
{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "mydb",
    "database.server.name": "myapp",
    
    "table.include.list": "public.outbox",
    
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "event_id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.by.field": "aggregate_type",
    "transforms.outbox.route.topic.replacement": "${routedByValue}.events"
  }
}

How Debezium CDC works:

┌─────────────────────────────────────────────────────────────────────────┐
│                    DEBEZIUM CDC FLOW                                    │
│                                                                         │
│  1. Application writes to outbox table                                  │
│     INSERT INTO outbox (aggregate_type, ...) VALUES ('Order', ...)      │
│                              │                                          │
│                              ▼                                          │
│  2. PostgreSQL writes to WAL (Write-Ahead Log)                          │
│     ┌─────────────────────────────────────────────────────────────┐     │
│     │  WAL: [LSN 1234] INSERT outbox {id:1, type:'Order', ...}    │     │
│     └─────────────────────────────────────────────────────────────┘     │
│                              │                                          │
│                              ▼                                          │
│  3. Debezium reads WAL via logical replication                          │
│     ┌─────────────────────────────────────────────────────────────┐     │
│     │  Debezium: Reading WAL from LSN 1234...                     │     │
│     │  Found INSERT on outbox table                               │     │
│     └─────────────────────────────────────────────────────────────┘     │
│                              │                                          │
│                              ▼                                          │
│  4. Outbox Event Router transforms and routes                           │
│     ┌─────────────────────────────────────────────────────────────┐     │
│     │  Input:  {aggregate_type: 'Order', aggregate_id: '123', ...}│     │
│     │  Output: Topic 'order.events', Key '123', Value {...}       │     │
│     └─────────────────────────────────────────────────────────────┘     │
│                              │                                          │
│                              ▼                                          │
│  5. Event published to Kafka                                            │
│     ┌─────────────────────────────────────────────────────────────┐     │
│     │  Topic: order.events                                        │     │
│     │  Partition: 3 (based on key hash)                           │     │
│     │  Offset: 847                                                │     │
│     └─────────────────────────────────────────────────────────────┘     │
│                                                                         │
│  Latency: ~10-100ms from INSERT to Kafka                                │
│  Throughput: 10,000+ events/second                                      │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Chapter 6: Edge Cases and Error Handling

6.1 Edge Case 1: Outbox Table Gets Too Large

┌────────────────────────────────────────────────────────────────────────┐
│  SCENARIO: Kafka is down for 2 hours, outbox table grows to 1M rows    │
│                                                                        │
│  SYMPTOMS:                                                             │
│    • Outbox table size growing rapidly                                 │
│    • Database disk space alerts                                        │
│    • Queries on outbox table getting slow                              │
│    • Application writes slowing down (table bloat)                     │
│                                                                        │
│  SOLUTIONS:                                                            │
│                                                                        │
│  1. IMMEDIATE: Fix Kafka, clear backlog                                │
│     - Kafka comes back online                                          │
│     - Publisher catches up naturally                                   │
│     - May take hours depending on volume                               │
│                                                                        │
│  2. SCALE PUBLISHERS: Temporarily increase publisher throughput        │
│     - Add more publisher instances (with coordination)                 │
│     - Increase batch size                                              │
│     - Reduce poll interval                                             │
│                                                                        │
│  3. BACKPRESSURE: Slow down writes to outbox                           │
│     - Return 503 to clients (service degraded)                         │
│     - Queue requests at API level                                      │
│     - Better than losing events                                        │
│                                                                        │
│  4. PARTITIONING: For long-term scalability                            │
│     - Partition outbox table by created_at                             │
│     - Drop old partitions efficiently                                  │
│     - Prevents table bloat                                             │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Solution: Table Partitioning

-- Create partitioned outbox table
CREATE TABLE outbox (
    id              BIGSERIAL,
    event_id        VARCHAR(64) NOT NULL,
    aggregate_type  VARCHAR(255) NOT NULL,
    aggregate_id    VARCHAR(255) NOT NULL,
    event_type      VARCHAR(255) NOT NULL,
    payload         JSONB NOT NULL,
    created_at      TIMESTAMP NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMP NULL,
    PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);

-- Create partitions for each day
CREATE TABLE outbox_2024_01_15 PARTITION OF outbox
    FOR VALUES FROM ('2024-01-15') TO ('2024-01-16');
    
CREATE TABLE outbox_2024_01_16 PARTITION OF outbox
    FOR VALUES FROM ('2024-01-16') TO ('2024-01-17');

-- Drop old partitions (instant, no row-by-row delete)
DROP TABLE outbox_2024_01_08;  -- Drop week-old partition

6.2 Edge Case 2: Publisher Processes Same Event Twice

┌────────────────────────────────────────────────────────────────────────┐
│  SCENARIO: Two publisher instances grab same event                     │
│                                                                        │
│  HOW IT HAPPENS:                                                       │
│    Publisher A: SELECT ... FOR UPDATE (gets event 123)                 │
│    Publisher A: Sends to Kafka                                         │
│    Publisher A: Network partition! Can't update database               │
│    Publisher A: Lock times out                                         │
│    Publisher B: SELECT ... FOR UPDATE (gets event 123 again!)          │
│    Publisher B: Sends to Kafka (DUPLICATE!)                            │
│                                                                        │
│  SOLUTIONS:                                                            │
│                                                                        │
│  1. ADVISORY LOCKS: Only one publisher active at a time                │
│     pg_try_advisory_lock(hashtext('outbox_publisher'))                 │
│     + Simple                                                           │
│     - Single point of failure                                          │
│                                                                        │
│  2. SKIP LOCKED: Concurrent publishers, no duplicates                  │
│     SELECT ... FOR UPDATE SKIP LOCKED                                  │
│     + Allows multiple publishers                                       │
│     + No duplicate processing                                          │
│     - Events may be processed out of order                             │
│                                                                        │
│  3. IDEMPOTENT CONSUMERS: Accept duplicates, deduplicate               │
│     Consumer checks event_id before processing                         │
│     + Most robust                                                      │
│     + Handles all edge cases                                           │
│     - Extra storage for dedup tracking                                 │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

6.3 Edge Case 3: Events Published Out of Order

┌───────────────────────────────────────────────────────────────────────┐
│  SCENARIO: OrderCreated arrives after OrderPaid                       │
│                                                                       │
│  HOW IT HAPPENS:                                                      │
│    T1: Order created, event 1 added to outbox                         │
│    T2: Order paid, event 2 added to outbox                            │
│    T3: Publisher processes event 2 first (skip locked, different tx)  │
│    T4: Publisher processes event 1 second                             │
│    Consumer sees: OrderPaid, then OrderCreated (wrong order!)         │
│                                                                       │
│  SOLUTIONS:                                                           │
│                                                                       │
│  1. SINGLE PUBLISHER: Process events strictly in order                │
│     + Guaranteed ordering                                             │
│     - Limited throughput                                              │
│     - Single point of failure                                         │
│                                                                       │
│  2. PARTITION BY AGGREGATE: One publisher per partition               │
│     Events for same aggregate go to same partition                    │
│     Kafka guarantees order within partition                           │
│     + Ordering per aggregate                                          │
│     + Parallel processing across aggregates                           │
│                                                                       │
│  3. CONSUMER HANDLES ORDERING: Reorder at consumer                    │
│     Buffer events, sort by sequence number                            │
│     + Most flexible                                                   │
│     - Complex consumer logic                                          │
│                                                                       │
│  RECOMMENDATION:                                                      │
│    Partition by aggregate_id in Kafka (already doing this!)           │
│    Events for same order always go to same partition                  │
│    Kafka guarantees order within partition                            │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

6.4 Error Handling Matrix

Error Cause Handling Retry?
Kafka unavailable Broker down Back off, retry Yes
Kafka timeout Network issues Retry with backoff Yes
Serialization error Invalid payload Log error, skip event No
Database lock timeout Contention Retry immediately Yes
Out of disk space Outbox too large Alert, manual intervention No
Schema mismatch Payload format changed Fix schema, retry Manual
Max retries exceeded Persistent failure Move to error table, alert No

Part III: Real-World Application

Chapter 7: How Big Tech Does It

7.1 Case Study: Shopify — Transactional Outbox at Scale

┌────────────────────────────────────────────────────────────────────────┐
│                    SHOPIFY'S OUTBOX IMPLEMENTATION                     │
│                                                                        │
│  SCALE:                                                                │
│    • Millions of merchants                                             │
│    • Billions of events per day                                        │
│    • Black Friday: 10x normal traffic                                  │
│                                                                        │
│  ARCHITECTURE:                                                         │
│                                                                        │
│    ┌──────────────┐      ┌──────────────┐      ┌──────────────┐        │
│    │   Rails App  │      │   MySQL      │      │    Kafka     │        │
│    │              │─────▶│   + Outbox   │─────▶│              │        │
│    │  (Monolith)  │      │   Table      │ CDC  │              │        │
│    └──────────────┘      └──────────────┘      └──────────────┘        │
│                                   │                    │               │
│                                   │               ┌────┴────┐          │
│                            Debezium              │ Consumer │          │
│                            Connector             │ Services │          │
│                                                  └──────────┘          │
│                                                                        │
│  KEY DECISIONS:                                                        │
│                                                                        │
│  1. MySQL + Debezium CDC (not polling)                                 │
│     - Low latency: ~50ms from write to Kafka                           │
│     - No polling load on database                                      │
│     - Scales with database write throughput                            │
│                                                                        │
│  2. Outbox per service, not global                                     │
│     - Each service owns its outbox table                               │
│     - Reduces coupling                                                 │
│     - Easier to scale independently                                    │
│                                                                        │
│  3. Event schema registry                                              │
│     - All events have versioned schemas                                │
│     - Backwards compatibility enforced                                 │
│     - Consumers can evolve independently                               │
│                                                                        │
│  LESSONS LEARNED:                                                      │
│                                                                        │
│  • "Outbox saved us on Black Friday"                                   │
│    When Kafka had issues, events queued in outbox                      │
│    No data loss, consumers caught up after recovery                    │
│                                                                        │
│  • "CDC > Polling at scale"                                            │
│    Polling worked initially, but at scale caused DB load               │
│    Migrated to Debezium, saw 90% reduction in DB queries               │
│                                                                        │
│  • "Idempotency is non-negotiable"                                     │
│    Every consumer must handle duplicates                               │
│    Saved them during Kafka rebalances and retries                      │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Reference: Shopify Engineering Blog — "Capturing Every Change"

7.2 Case Study: Segment — Outbox for Event Routing

┌────────────────────────────────────────────────────────────────────────┐
│                    SEGMENT'S EVENT ARCHITECTURE                        │
│                                                                        │
│  CHALLENGE:                                                            │
│    Route customer events to 300+ integrations                          │
│    Each integration has different format requirements                  │
│    Can't lose any events                                               │
│                                                                        │
│  ARCHITECTURE:                                                         │
│                                                                        │
│    Customer Event                                                      │
│         │                                                              │
│         ▼                                                              │
│    ┌─────────────┐      ┌─────────────┐                                │
│    │  Intake API │─────▶│  PostgreSQL │                                │
│    │             │      │  + Outbox   │                                │
│    └─────────────┘      └──────┬──────┘                                │
│                                │                                       │
│                         CDC (Debezium)                                 │
│                                │                                       │
│                                ▼                                       │
│                         ┌─────────────┐                                │
│                         │    Kafka    │                                │
│                         │ (Raw Events)│                                │
│                         └──────┬──────┘                                │
│                                │                                       │
│         ┌──────────────────────┼──────────────────────┐                │
│         │                      │                      │                │
│         ▼                      ▼                      ▼                │
│    ┌─────────┐           ┌─────────┐           ┌─────────┐             │
│    │ Google  │           │Salesforce│          │ Mixpanel │            │
│    │Analytics│           │         │           │         │             │
│    │ Worker  │           │ Worker  │           │ Worker  │             │
│    └─────────┘           └─────────┘           └─────────┘             │
│                                                                        │
│  KEY INNOVATION: "Centrifuge" System                                   │
│                                                                        │
│    • Events stored in outbox with delivery_targets JSONB field         │
│    • Each target has its own cursor (offset) in the stream             │
│    • Failed deliveries don't block other destinations                  │
│    • Replay possible per-destination                                   │
│                                                                        │
│  OUTBOX SCHEMA:                                                        │
│    events:                                                             │
│      id, event_type, payload, created_at                               │
│    delivery_status:                                                    │
│      event_id, destination, status, attempts, last_error               │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

7.3 Case Study: DoorDash — Outbox for Order Events

┌───────────────────────────────────────────────────────────────────────┐
│                    DOORDASH ORDER EVENT SYSTEM                        │
│                                                                       │
│  REQUIREMENTS:                                                        │
│    • Order events must trigger: driver dispatch, payment,             │
|      notifications                                                    │
│    • Zero tolerance for lost orders                                   │
│    • Events must be in order per order                                │
│                                                                       │
│  IMPLEMENTATION:                                                      │
│                                                                       │
│    Order Service                                                      │
│    ┌─────────────────────────────────────────────────────────────┐    │
│    │  async def create_order(order_data):                        │    │
│    │      async with db.transaction():                           │    │
│    │          order = await db.insert_order(order_data)          │    │
│    │          await db.insert_outbox({                           │    │
│    │              'aggregate_id': order.id,                      │    │
│    │              'event_type': 'OrderCreated',                  │    │
│    │              'payload': order.to_dict()                     │    │
│    │          })                                                 │    │
│    │      return order                                           │    │
│    └─────────────────────────────────────────────────────────────┘    │
│                                                                       │
│  OPTIMIZATIONS:                                                       │
│                                                                       │
│    1. Batch publishing: 100 events per Kafka round-trip               │
│    2. Async outbox write: Non-blocking after transaction commits      │
│    3. Partition by order_id: Ordering guaranteed per order            │
│    4. Compaction: Only keep latest state per order for replay         │
│                                                                       │
│  RESULTS:                                                             │
│    • 99.99% event delivery rate                                       │
│    • < 100ms p99 latency from order creation to event publish         │
│    • Zero lost orders in 3 years of operation                         │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

7.4 Comparison: Implementation Approaches

Company Database CDC Tool Outbox Approach
Shopify MySQL Debezium Per-service outbox
Segment PostgreSQL Debezium Centralized with delivery tracking
DoorDash PostgreSQL Custom poller Polling with batching
Uber MySQL Custom CDC Sharded outbox tables
Netflix Cassandra Custom Event sourcing (no outbox needed)

Chapter 8: Common Mistakes to Avoid

8.1 Mistake 1: Writing to Outbox Outside Transaction

┌────────────────────────────────────────────────────────────────────────┐
│  ❌ WRONG: Separate transactions                                       │
│                                                                        │
│    async def create_order(order_data):                                 │
│        # Transaction 1: Create order                                   │
│        async with db.transaction():                                    │
│            order = await db.insert_order(order_data)                   │
│                                                                        │
│        # Transaction 2: Write to outbox (WRONG!)                       │
│        async with db.transaction():                                    │
│            await db.insert_outbox(order)                               │
│                                                                        │
│        # If app crashes between transactions, event is lost!           │
│                                                                        │
├────────────────────────────────────────────────────────────────────────┤
│  ✅ CORRECT: Same transaction                                          │
│                                                                        │
│    async def create_order(order_data):                                 │
│        async with db.transaction():                                    │
│            order = await db.insert_order(order_data)                   │
│            await db.insert_outbox(order)  # Same transaction!          │
│                                                                        │
│        # Both succeed or both fail. Atomic.                            │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

8.2 Mistake 2: Blocking on Kafka Publish

┌────────────────────────────────────────────────────────────────────────┐
│  ❌ WRONG: Publishing in the request path                              │
│                                                                        │
│    async def create_order(order_data):                                 │
│        async with db.transaction():                                    │
│            order = await db.insert_order(order_data)                   │
│                                                                        │
│        # Blocking call to Kafka in request path!                       │
│        await kafka.publish("order.created", order)  # 50-100ms added   │
│                                                                        │
│        return order  # User waits for Kafka                            │
│                                                                        │
│    Problems:                                                           │
│    • Added latency to every request                                    │
│    • If Kafka is slow, all requests are slow                           │
│    • If Kafka is down, all requests fail                               │
│                                                                        │
├────────────────────────────────────────────────────────────────────────┤
│  ✅ CORRECT: Async publishing via outbox                               │
│                                                                        │
│    async def create_order(order_data):                                 │
│        async with db.transaction():                                    │
│            order = await db.insert_order(order_data)                   │
│            await db.insert_outbox(order)                               │
│                                                                        │
│        return order  # User doesn't wait for Kafka                     │
│                                                                        │
│    # Background publisher handles Kafka (async)                        │
│                                                                        │
│    Benefits:                                                           │
│    • Request latency = only database write                             │
│    • Kafka issues don't affect user requests                           │
│    • Natural buffering during Kafka downtime                           │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

8.3 Mistake 3: Not Making Consumers Idempotent

┌────────────────────────────────────────────────────────────────────────┐
│  ❌ WRONG: Assuming exactly-once delivery                              │
│                                                                        │
│    async def handle_order_created(event):                              │
│        # Charge customer (no idempotency check!)                       │
│        await payment_service.charge(                                   │
│            user_id=event['user_id'],                                   │
│            amount=event['amount']                                      │
│        )                                                               │
│        # If event delivered twice, customer charged twice!             │
│                                                                        │
├────────────────────────────────────────────────────────────────────────┤
│  ✅ CORRECT: Idempotent consumer                                       │
│                                                                        │
│    async def handle_order_created(event):                              │
│        # Check if already processed                                    │
│        if await is_event_processed(event['event_id']):                 │
│            logger.info(f"Event {event['event_id']} already processed") │
│            return                                                      │
│                                                                        │
│        # Process with idempotency key                                  │
│        await payment_service.charge(                                   │
│            user_id=event['user_id'],                                   │
│            amount=event['amount'],                                     │
│            idempotency_key=event['event_id']  # Prevents double charge │
│        )                                                               │
│                                                                        │
│        # Mark as processed                                             │
│        await mark_event_processed(event['event_id'])                   │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

8.4 Mistake 4: Not Monitoring Outbox Lag

┌───────────────────────────────────────────────────────────────────────┐
│  ❌ WRONG: No visibility into outbox health                           │
│                                                                       │
│    # No monitoring, no alerts                                         │
│    # Outbox grows to 10M rows before anyone notices                   │
│    # Database runs out of disk space                                  │
│    # Production incident at 3 AM                                      │
│                                                                       │
├───────────────────────────────────────────────────────────────────────┤
│  ✅ CORRECT: Monitor and alert                                        │
│                                                                       │
│    # Key metrics to track:                                            │
│                                                                       │
│    1. Unpublished event count                                         │
│       SELECT COUNT(*) FROM outbox WHERE published_at IS NULL          │
│       Alert if > 10,000                                               │
│                                                                       │
│    2. Oldest unpublished event age                                    │
│       SELECT NOW() - MIN(created_at) FROM outbox                      │
│       WHERE published_at IS NULL                                      │
│       Alert if > 5 minutes                                            │
│                                                                       │
│    3. Events published per minute                                     │
│       Track in publisher, expose via /metrics                         │
│       Alert if drops to 0                                             │
│                                                                       │
│    4. Publisher error rate                                            │
│       Track failed publishes                                          │
│       Alert if > 1%                                                   │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

8.5 Mistake Checklist

Before deploying a transactional outbox, verify:

  • Same transaction — Business data and outbox event in one transaction
  • Idempotent consumers — Handle duplicate events gracefully
  • Event ID — Every event has a unique, deterministic ID
  • Monitoring — Track unpublished count, lag, error rate
  • Cleanup — Old published events are deleted or archived
  • Partitioning — For high-volume tables, use time-based partitions
  • Retry logic — Failed publishes are retried with backoff
  • Dead letter handling — Events that can't be published go somewhere
  • Ordering — Partition key ensures order within aggregate

Part IV: Interview Preparation

Chapter 9: Interview Tips and Phrases

9.1 When to Bring Up Transactional Outbox

Bring up the outbox pattern when:

  • The system writes to a database AND publishes events
  • Reliability requirements are high (no lost events)
  • The interviewer mentions "event-driven architecture"
  • You're designing microservices that need to communicate
  • The system involves payments, orders, or other critical data

9.2 Key Phrases to Use

┌────────────────────────────────────────────────────────────────────────┐
│                    INTERVIEW PHRASES                                   │
│                                                                        │
│  Introducing the pattern:                                              │
│                                                                        │
│    "There's a dual-write problem here. If we save to the database      │
│     and then publish to Kafka, we could crash in between and lose      │
│     the event. I'd use the transactional outbox pattern: write the     │
│     event to an outbox table in the same transaction as the business   │
│     data, then have a separate publisher process send it to Kafka."    │
│                                                                        │
│  Explaining how it works:                                              │
│                                                                        │
│    "The key insight is that we reduce two writes to one. Both the      │
│     order and the event go into the database in a single transaction.  │
│     Either both succeed or both fail—atomicity. Then a background      │
│     process reads unpublished events and sends them to Kafka."         │
│                                                                        │
│  Discussing implementation options:                                    │
│                                                                        │
│    "For the publisher, we have two options: polling or CDC.            │
│     Polling is simpler—just query for unpublished events every         │
│     second. CDC with Debezium is more efficient at scale because       │
│     it reads the database's write-ahead log directly, giving us        │
│     millisecond latency without the polling load."                     │
│                                                                        │
│  Addressing exactly-once:                                              │
│                                                                        │
│    "The outbox gives us at-least-once delivery—if the publisher        │
│     crashes after sending but before marking as published, the event   │
│     will be sent again. That's why consumers must be idempotent.       │
│     We'd use the event ID for deduplication."                          │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

9.3 Questions to Ask Interviewer

  • "What are the consistency requirements? Is it acceptable if events are delayed by a few seconds?"
  • "How critical is it that events are never lost? Are we dealing with financial transactions?"
  • "What's the expected event volume? This affects whether we'd use polling or CDC."
  • "Are there ordering requirements? Do events for the same entity need to be in order?"

9.4 Common Follow-up Questions

Question Good Answer
"What if Kafka is down for an hour?" "Events queue up in the outbox table. When Kafka recovers, the publisher catches up. We might need to scale the publisher temporarily to clear the backlog, but no events are lost."
"How do you handle publisher failures?" "The publisher marks events as published only after Kafka acknowledges. If it crashes before marking, the event is republished—that's why consumers must be idempotent."
"Doesn't this add latency?" "For the user request, no—we return immediately after the database transaction. Events are published asynchronously. There's a small delay (milliseconds with CDC, up to a second with polling) before consumers see the event."
"How do you keep the outbox table from growing forever?" "We have a cleanup process that deletes published events older than 7 days. For high-volume tables, we'd use time-based partitioning and drop old partitions."

Chapter 10: Practice Problems

Problem 1: E-commerce Order System

Setup: You're designing an order service. When an order is created:

  1. Save order to database
  2. Notify payment service to charge the customer
  3. Notify inventory service to reserve items
  4. Notify notification service to send confirmation email

Requirements:

  • No order can be lost
  • Payment must not be charged twice
  • All services need to know about the order

Questions to consider:

  1. Why can't you just call each service synchronously?
  2. How would you use the transactional outbox here?
  3. What's the partition key for the events?
  4. How do you handle payment service failures?
  • Synchronous calls create tight coupling and cascading failures
  • One outbox event can fan out to multiple consumers
  • Partition by order_id to keep events in order
  • Payment service must be idempotent (use order_id as idempotency key)

Architecture:

  1. Order service writes order + outbox event in one transaction
  2. Publisher sends to Kafka topic "orders.events"
  3. Each downstream service is a consumer group:
    • Payment service: charges customer (idempotent by order_id)
    • Inventory service: reserves items
    • Notification service: sends email

Event:

{
  "event_id": "evt_123",
  "event_type": "OrderCreated",
  "aggregate_id": "order_456",
  "payload": {
    "order_id": "order_456",
    "user_id": "user_789",
    "items": [...],
    "total": 99.99
  }
}

Key decisions:

  • Partition by order_id: ensures order events stay in order
  • Consumer groups: each service processes independently
  • Idempotency: each service uses order_id for deduplication

Problem 2: Multi-Step Saga

Setup: Design a hotel booking system. A booking involves:

  1. Reserve room (Hotel Service)
  2. Charge payment (Payment Service)
  3. Send confirmation (Notification Service)

If payment fails, the room reservation must be cancelled.

Requirements:

  • Booking must be atomic (all or nothing)
  • Each step is a separate service
  • No distributed transactions

Questions to consider:

  1. How do you coordinate the multi-step process?
  2. How does the saga pattern fit with the outbox pattern?
  3. What events do you need?
  4. How do you handle compensation (rollback)?
  • Saga orchestrator can use outbox for reliable event publishing
  • Each step publishes completion event via outbox
  • Compensation events also go through outbox
  • Consider using a saga state machine

Saga Flow:

BookingRequested
    │
    ├──▶ RoomReserved (Hotel Service)
    │         │
    │         ├──▶ PaymentCharged (Payment Service)
    │         │         │
    │         │         └──▶ BookingConfirmed
    │         │
    │         └── PaymentFailed
    │                   │
    │                   └──▶ RoomReservationCancelled (Compensation)
    │
    └── RoomNotAvailable
              │
              └──▶ BookingFailed

Outbox usage:

  • Each service uses outbox for its events
  • Saga orchestrator publishes commands via outbox
  • Compensation events are just regular events via outbox

Key events:

  • BookingRequested, RoomReserved, PaymentCharged, BookingConfirmed
  • PaymentFailed → triggers RoomReservationCancelled (compensation)

Problem 3: High-Volume Event System

Setup: Design an activity tracking system for a social network. Every user action (post, like, comment, share) generates an event that feeds:

  • Real-time activity feed
  • Analytics pipeline
  • Recommendation engine

Requirements:

  • 100,000 events per second at peak
  • Events must not be lost
  • Analytics needs complete data for accurate reports

Questions to consider:

  1. Polling or CDC for this volume?
  2. How do you partition the events?
  3. How do you handle the outbox table growth?
  4. What if Kafka is down?
  • CDC is essential at this volume (polling would overload DB)
  • Partition by user_id for user-centric processing
  • Use table partitioning by time, drop old partitions
  • Outbox naturally buffers during Kafka outages

Architecture:

  • PostgreSQL with partitioned outbox table (daily partitions)
  • Debezium CDC connector reading WAL
  • Kafka with 100+ partitions
  • Multiple consumer groups for different use cases

Outbox table:

CREATE TABLE outbox (...)
PARTITION BY RANGE (created_at);

-- Create daily partitions automatically
-- Drop partitions older than 7 days

Scaling:

  • CDC scales with database write throughput
  • Add Kafka partitions for more parallelism
  • Each consumer group scales independently

Kafka outage handling:

  • Events queue in outbox (database can handle hours of events)
  • Debezium tracks position in WAL
  • When Kafka recovers, CDC catches up automatically

Chapter 11: Mock Interview Dialogue

Scenario: Design a Payment Notification System

Interviewer: "We need to notify merchants when payments are processed. How would you design this?"

You: "Sure. Let me first understand the requirements. When you say notify merchants, is this via webhook, email, or push notification?"

Interviewer: "Primarily webhooks to the merchant's server, but also email as a backup."

You: "Got it. And reliability—can we lose any notifications, or must every payment trigger a notification?"

Interviewer: "Every payment must be notified. We're dealing with financial data, so reliability is critical."

You: "Understood. Let me outline the architecture.

The core challenge here is the dual-write problem. When a payment is processed, we need to save it to our database AND publish an event for notifications. If we do these as two separate operations, we risk losing events.

I'd use the transactional outbox pattern. Let me draw it:

┌────────────────────────────────────────────────────────────────────────┐
│                    PAYMENT NOTIFICATION ARCHITECTURE                   │
│                                                                        │
│    Payment Service                                                     │
│    ┌─────────────────────────────────────────────────────────────┐     │
│    │  async with db.transaction():                               │     │
│    │      payment = save_payment(payment_data)                   │     │
│    │      save_outbox_event(payment)  # Same transaction         │     │
│    └─────────────────────────────────────────────────────────────┘     │
│                              │                                         │
│                              ▼                                         │
│    ┌─────────────────────────────────────────────────────────────┐     │
│    │                    DATABASE                                 │     │
│    │    payments table    │    outbox table                      │     │
│    │    (payment data)    │    (PaymentProcessed events)         │     │
│    └─────────────────────────────────────────────────────────────┘     │
│                              │                                         │
│                         CDC / Polling                                  │
│                              │                                         │
│                              ▼                                         │
│    ┌─────────────────────────────────────────────────────────────┐     │
│    │                    KAFKA                                    │     │
│    │                payment.events topic                         │     │
│    └─────────────────────────────────────────────────────────────┘     │
│                              │                                         │
│              ┌───────────────┴───────────────┐                         │
│              ▼                               ▼                         │
│    ┌─────────────────┐             ┌─────────────────┐                 │
│    │ Webhook Service │             │  Email Service  │                 │
│    │ (Consumer Group)│             │ (Consumer Group)│                 │
│    └─────────────────┘             └─────────────────┘                 │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

The key insight is that we write the payment AND the event to the database in a single transaction. Either both succeed or both fail—we can never have a payment without its corresponding event."

Interviewer: "What if the publisher process crashes after reading from the outbox but before publishing to Kafka?"

You: "Great question. If that happens, the event remains in the outbox with published_at = NULL. When the publisher restarts, it queries for unpublished events and sends them again.

This means events might be published more than once—we get at-least-once delivery. To handle this, both the webhook service and email service must be idempotent.

For webhooks, we'd include a unique event ID. The merchant's system should check if they've already processed that event ID. Many payment providers like Stripe do exactly this—they include an idempotent event ID in every webhook.

For emails, we'd track which payments have been emailed in a separate table, checking before sending."

Interviewer: "How do you ensure the merchant receives webhooks in order?"

You: "I'd partition the Kafka topic by merchant_id. Kafka guarantees ordering within a partition, so all events for a given merchant are delivered in order.

The webhook service then processes events for each merchant sequentially. Even if we have multiple webhook workers, each partition (and thus each merchant) is assigned to exactly one worker."

Interviewer: "What happens if the merchant's webhook endpoint is down?"

You: "The webhook service would implement retry with exponential backoff. After several failed attempts, the event goes to a dead letter queue for investigation.

We'd also have a dashboard showing failed webhooks, allowing ops to see which merchants have connectivity issues and manually retry once the merchant confirms their endpoint is back up.

The email backup is also important here—even if the webhook fails, the merchant gets an email notification."

Interviewer: "What volume can this handle?"

You: "With CDC (like Debezium reading the database's write-ahead log), we can publish tens of thousands of events per second. The bottleneck is usually the database write throughput, and the outbox adds minimal overhead—it's just one extra row per payment.

If we need to scale further, we could shard the outbox table by merchant or use multiple databases. But for most payment systems, a single outbox with CDC handles plenty of volume."


Summary

┌────────────────────────────────────────────────────────────────────────┐
│                    DAY 2 KEY TAKEAWAYS                                 │
│                                                                        │
│  CORE CONCEPT:                                                         │
│  • Dual-write problem: DB + Kafka writes can fail independently        │
│  • Transactional outbox: Write event to DB, publish async              │
│  • Atomicity: Both writes in one transaction                           │
│                                                                        │
│  PATTERNS:                                                             │
│  • Polling: Simple, query outbox periodically                          │
│  • CDC: Low latency, reads database WAL directly                       │
│  • Choose based on volume and latency requirements                     │
│                                                                        │
│  IMPLEMENTATION:                                                       │
│  • Outbox table with: aggregate_type, aggregate_id, event_type, payload│
│  • Publisher: SELECT ... FOR UPDATE SKIP LOCKED                        │
│  • Mark published_at after Kafka acknowledges                          │
│  • Cleanup old published events periodically                           │
│                                                                        │
│  TRADE-OFFS:                                                           │
│  • Polling: Simpler setup, higher latency, DB load                     │
│  • CDC: Low latency, complex setup, requires WAL access                │
│                                                                        │
│  INTERVIEW TIPS:                                                       │
│  • Mention dual-write problem when DB + messaging involved             │
│  • Explain atomicity: both writes or neither                           │
│  • Remember: consumers must be idempotent                              │
│                                                                        │
│  DEFAULT CHOICE:                                                       │
│  • Start with polling (simpler)                                        │
│  • Move to CDC when volume or latency demands it                       │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

📚 Further Reading

Official Documentation

Engineering Blogs

Books

  • "Designing Data-Intensive Applications" by Martin Kleppmann — Chapter 11: Stream Processing
  • "Microservices Patterns" by Chris Richardson — Chapter 3: Transactional Messaging

Articles


End of Day 2: Transactional Outbox

Tomorrow: Day 3 — Backpressure and Flow Control. We now have reliable message publishing with the outbox pattern. But what happens when producers send messages faster than consumers can process? Tomorrow we'll learn how to detect and handle backpressure—preventing queue explosions and system cascading failures when traffic spikes hit.