Himanshu Kukreja
0%
Day 02

Week 5 — Day 2: Distributed Transactions — The Saga Pattern

System Design Mastery Series


Preface

Yesterday, we learned about consistency models — how to reason about when reads see writes in distributed systems. We discussed strong consistency, eventual consistency, and everything in between.

But there's a harder problem we glossed over:

THE DISTRIBUTED TRANSACTION PROBLEM

Single database transaction:
  BEGIN;
    UPDATE accounts SET balance = balance - 100 WHERE id = 'A';
    UPDATE accounts SET balance = balance + 100 WHERE id = 'B';
  COMMIT;
  
  If anything fails → ROLLBACK (automatic)
  Money never disappears. ACID guarantees.


Microservices transaction:
  
  Account Service          Payment Service          Inventory Service
  ┌─────────────┐         ┌─────────────┐         ┌─────────────┐
  │ Debit $100  │────────▶│ Charge Card │────────▶│ Reserve Item│
  │   ✓ Done    │         │   ✓ Done    │         │   ✗ FAILED  │
  └─────────────┘         └─────────────┘         └─────────────┘
  
  Inventory service failed AFTER:
    - Account was debited
    - Card was charged
  
  We can't just ROLLBACK — these are different databases!
  
  What do we do?

This is the problem that Sagas solve.

Today, you'll learn:

  • Why traditional distributed transactions (2PC) don't work in microservices
  • The Saga pattern for distributed transactions
  • Choreography vs Orchestration approaches
  • How to design compensation logic
  • Handling failures in compensation

Part I: Foundations

Chapter 1: The Problem with Distributed Transactions

1.1 What We Want: ACID Across Services

ACID PROPERTIES

A - Atomicity:    All operations succeed or all fail
C - Consistency:  Data remains valid after transaction
I - Isolation:    Concurrent transactions don't interfere
D - Durability:   Committed data survives failures

In a single database, we get these for free.
In microservices, each service has its own database.
How do we get ACID across services?

1.2 The Traditional Solution: Two-Phase Commit (2PC)

TWO-PHASE COMMIT PROTOCOL

Phase 1: PREPARE
  
  Coordinator                 Participants
       │                           │
       │─────── PREPARE ──────────▶│ Service A
       │◀────── READY ─────────────│
       │                           │
       │─────── PREPARE ──────────▶│ Service B
       │◀────── READY ─────────────│
       │                           │
       │─────── PREPARE ──────────▶│ Service C
       │◀────── READY ─────────────│


Phase 2: COMMIT (if all ready)
  
  Coordinator                 Participants
       │                           │
       │─────── COMMIT ───────────▶│ Service A
       │◀────── ACK ───────────────│
       │                           │
       │─────── COMMIT ───────────▶│ Service B
       │◀────── ACK ───────────────│
       │                           │
       │─────── COMMIT ───────────▶│ Service C
       │◀────── ACK ───────────────│


Phase 2: ABORT (if any not ready)
  
  Coordinator                 Participants
       │                           │
       │─────── ABORT ────────────▶│ All participants
       │                           │
       │        (Everyone rolls back)

1.3 Why 2PC Fails in Microservices

PROBLEMS WITH 2PC

1. BLOCKING PROTOCOL
   ├── All participants HOLD LOCKS during prepare phase
   ├── If coordinator crashes after PREPARE, participants wait forever
   ├── Resources locked, other transactions blocked
   └── System throughput collapses

2. COORDINATOR IS SINGLE POINT OF FAILURE
   ├── Coordinator crashes mid-transaction
   ├── Participants don't know whether to commit or abort
   ├── Need complex recovery protocols
   └── In practice, often leaves data inconsistent

3. SYNCHRONOUS COMMUNICATION
   ├── All services must be available simultaneously
   ├── One slow service slows entire transaction
   ├── Network partition = stuck transaction
   └── Violates microservices independence

4. DOESN'T SCALE
   ├── Lock duration proportional to slowest participant
   ├── Adding services increases failure probability
   ├── Latency is sum of all service latencies
   └── Real microservices have 10+ services per transaction

5. NOT SUPPORTED BY MOST SERVICES
   ├── NoSQL databases don't support 2PC
   ├── Message queues don't support 2PC
   ├── Third-party APIs don't support 2PC
   └── You can't tell Stripe to "prepare" a charge

1.4 The Alternative: Sagas

SAGA: A DIFFERENT APPROACH

Instead of:
  "Lock everything, do everything, unlock everything"

Sagas say:
  "Do each step. If a step fails, UNDO previous steps."


Key insight:
  Each step is a LOCAL transaction (fast, independent)
  If step N fails, run COMPENSATING transactions for steps 1..N-1
  No distributed locks, no coordinator holding state
  
  
SAGA STRUCTURE:

  T1 ──────▶ T2 ──────▶ T3 ──────▶ T4 ──────▶ SUCCESS
   │          │          │          │
   ▼          ▼          ▼          ▼
  C1         C2         C3         C4
  (undo)     (undo)     (undo)     (undo)
  
  
If T3 fails:
  
  T1 ──────▶ T2 ──────▶ T3 ✗
                         │
                         ▼
              C2 ◀────── C3 not needed (T3 didn't complete)
               │
               ▼
              C1
               │
               ▼
            ROLLED BACK

Chapter 2: Saga Fundamentals

2.1 What Is a Saga?

A Saga is a sequence of local transactions where:

  1. Each transaction updates a single service
  2. Each transaction has a compensating transaction
  3. If any transaction fails, compensating transactions undo previous work
SAGA TERMINOLOGY

Transaction (Ti):     A local database transaction in one service
Compensation (Ci):    Undoes the effect of Ti (semantically, not literally)
Pivot Transaction:    Point of no return - after this, saga must complete
Retriable Transaction: Safe to retry until success (idempotent)

Example: Order Processing Saga

T1: Reserve inventory        C1: Release inventory
T2: Create payment          C2: Refund payment  
T3: Ship order (PIVOT)      C3: (None - can't unship!)
T4: Send confirmation       C4: (None - just notification)

T3 is the pivot - once shipped, we commit to completing.
Before T3, we can compensate. After T3, we must proceed.

2.2 Compensation vs Rollback

COMPENSATION IS NOT ROLLBACK

Database rollback:
  - Undoes changes as if they never happened
  - Uses transaction log
  - Automatic, built-in

Saga compensation:
  - Creates NEW transactions that reverse the effect
  - Business logic, not database feature
  - You must design and implement it

Example: Payment Compensation

Original:     charge_customer(customer_id, $100)
              Creates: Payment record, charge on card

Compensation: refund_customer(payment_id)
              Creates: Refund record, credit on card
              
              The original charge STILL EXISTS in history
              The refund is a NEW transaction
              Net effect: customer not charged
              But audit trail shows both events

2.3 Designing Compensating Transactions

COMPENSATION DESIGN PRINCIPLES

1. SEMANTIC REVERSAL
   ├── Undo the business effect, not the technical operation
   ├── create_order → cancel_order (not DELETE)
   ├── reserve_inventory → release_inventory
   └── charge_payment → refund_payment

2. IDEMPOTENT
   ├── Compensation might run multiple times (retries)
   ├── Running twice should have same effect as once
   └── Use compensation IDs to prevent double-refunds

3. COMMUTATIVE WHERE POSSIBLE
   ├── Order of compensations shouldn't matter
   ├── Reduces coordination complexity
   └── Not always possible (dependencies)

4. HANDLE MISSING ORIGINAL
   ├── What if T2 compensation runs but T2 never completed?
   ├── Compensation should check state first
   └── Be a no-op if nothing to compensate

5. NEVER FAIL (or be retriable)
   ├── Compensation failure = stuck saga
   ├── Design compensations to always succeed
   └── If must fail, alert for manual intervention

Chapter 3: Choreography vs Orchestration

3.1 Choreography: Event-Driven Sagas

In choreography, each service listens for events and decides what to do next.

CHOREOGRAPHY PATTERN

No central coordinator. Services react to events.

Order Saga via Choreography:

  ┌─────────────┐     order.created      ┌─────────────┐
  │   Order     │─────────────────────▶  │  Inventory  │
  │   Service   │                        │   Service   │
  └─────────────┘                        └──────┬──────┘
                                                │
                              inventory.reserved│
                                                ▼
                                         ┌─────────────┐
                                         │   Payment   │
                                         │   Service   │
                                         └──────┬──────┘
                                                │
                               payment.completed│
                                                ▼
                                         ┌─────────────┐
                                         │  Shipping   │
                                         │   Service   │
                                         └──────┬──────┘
                                                │
                               shipping.created │
                                                ▼
                                         ┌─────────────┐
                                         │   Order     │
                                         │  (complete) │
                                         └─────────────┘


If Payment fails:

  payment.failed event triggers:
    - Inventory listens → releases reservation
    - Order listens → marks order as failed

Pros:

  • Simple for small sagas (2-3 steps)
  • Loose coupling between services
  • No single point of failure

Cons:

  • Hard to understand full saga flow
  • Cyclic dependencies possible
  • Difficult to add/change steps
  • Testing requires all services

3.2 Orchestration: Centralized Coordinator

In orchestration, a central coordinator tells each service what to do.

ORCHESTRATION PATTERN

Central orchestrator manages the saga flow.

Order Saga via Orchestration:

                        ┌─────────────────┐
                        │   Orchestrator  │
                        │   (Order Saga)  │
                        └────────┬────────┘
                                 │
          ┌──────────────────────┼──────────────────────┐
          │                      │                      │
          ▼                      ▼                      ▼
  ┌─────────────┐        ┌─────────────┐        ┌─────────────┐
  │  Inventory  │        │   Payment   │        │  Shipping   │
  │   Service   │        │   Service   │        │   Service   │
  └─────────────┘        └─────────────┘        └─────────────┘


Flow:
  1. Orchestrator: "Inventory, reserve items"
  2. Inventory: "Reserved" (or "Failed")
  3. Orchestrator: "Payment, charge customer"
  4. Payment: "Charged" (or "Failed")
  5. Orchestrator: "Shipping, create shipment"
  6. Shipping: "Created"
  7. Orchestrator: "Saga complete"


If Payment fails:
  
  Orchestrator handles compensation:
  1. Orchestrator: "Inventory, release reservation"
  2. Orchestrator: "Order, mark as failed"

Pros:

  • Easy to understand (linear flow)
  • Centralized saga logic
  • Easy to add/modify steps
  • Better for complex sagas

Cons:

  • Orchestrator is coupling point
  • Orchestrator must be highly available
  • Risk of orchestrator becoming a monolith

3.3 When to Use Each

CHOOSING BETWEEN CHOREOGRAPHY AND ORCHESTRATION

Use CHOREOGRAPHY when:
├── Saga has 2-4 steps
├── Services already publish relevant events
├── Teams are independent and don't want coordination
├── Low complexity, well-understood domain
└── Example: Simple order → payment → notification

Use ORCHESTRATION when:
├── Saga has 5+ steps
├── Complex branching logic
├── Need visibility into saga state
├── Multiple sagas share services
├── Need to change saga logic frequently
└── Example: Travel booking (flight + hotel + car + insurance)

Real-world observation:
├── Start with choreography
├── Switch to orchestration when choreography becomes confusing
├── Many companies use both (simple sagas choreographed, complex orchestrated)

Chapter 4: Saga Execution Semantics

4.1 Forward Recovery vs Backward Recovery

RECOVERY STRATEGIES

BACKWARD RECOVERY (Compensate)
├── Step fails → undo previous steps
├── Return system to original state
├── Use when failure means "give up"
└── Example: Payment failed → cancel order

FORWARD RECOVERY (Retry)
├── Step fails → retry until success
├── Eventually complete the saga
├── Use when saga MUST complete
└── Example: Shipment tracking update (eventually consistent)


Many sagas use BOTH:

  T1 ──────▶ T2 ──────▶ T3 (pivot) ──────▶ T4 ──────▶ T5
  
  Before pivot: Backward recovery (compensate)
  After pivot:  Forward recovery (retry until success)
  
  
Example: E-commerce Order

  Reserve    ──▶ Charge    ──▶ Ship     ──▶ Notify   ──▶ Complete
  Inventory      Payment       (PIVOT)      Customer     Order
  
  Before shipping: Can compensate (release inventory, refund)
  After shipping:  Must complete (can't unship, so notify and complete)

4.2 The Saga Execution Coordinator (SEC)

SAGA EXECUTION COORDINATOR (SEC)

The SEC is responsible for:
1. Starting sagas
2. Tracking saga state
3. Executing steps (or triggering them)
4. Handling failures and compensation
5. Ensuring saga completes (one way or another)

SEC State Machine:

                    ┌──────────────────┐
                    │     STARTED      │
                    └────────┬─────────┘
                             │
                    ┌────────▼─────────┐
              ┌─────│   EXECUTING      │────┐
              │     └────────┬─────────┘    │
              │              │              │
         step fails    all steps         timeout
              │        succeeded            │
              ▼             │               ▼
    ┌─────────────────┐     │     ┌─────────────────┐
    │  COMPENSATING   │     │     │  COMPENSATION   │
    └────────┬────────┘     │     │    STARTED      │
             │              │     └────────┬────────┘
             │              │              │
    compensation            │     ┌────────▼────────┐
      complete              │     │  COMPENSATING   │
             │              │     └────────┬────────┘
             ▼              ▼              │
    ┌─────────────────┐  ┌─────────────────┐
    │   COMPENSATED   │  │    COMPLETED    │
    │    (failed)     │  │   (succeeded)   │
    └─────────────────┘  └─────────────────┘

Part II: Implementation

Chapter 5: Choreography Implementation

# Saga Choreography Implementation

import asyncio
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
from datetime import datetime
from enum import Enum
import json
import logging

logger = logging.getLogger(__name__)


# =============================================================================
# Domain Events
# =============================================================================

@dataclass
class DomainEvent:
    """Base class for domain events."""
    event_id: str
    correlation_id: str  # Saga ID
    timestamp: datetime = field(default_factory=datetime.utcnow)
    
    def to_dict(self) -> dict:
        return {
            "event_id": self.event_id,
            "correlation_id": self.correlation_id,
            "timestamp": self.timestamp.isoformat(),
            "event_type": self.__class__.__name__,
            "data": self._event_data()
        }
    
    def _event_data(self) -> dict:
        raise NotImplementedError


@dataclass
class OrderCreated(DomainEvent):
    order_id: str
    customer_id: str
    items: List[dict]
    total_amount: float
    
    def _event_data(self) -> dict:
        return {
            "order_id": self.order_id,
            "customer_id": self.customer_id,
            "items": self.items,
            "total_amount": self.total_amount
        }


@dataclass
class InventoryReserved(DomainEvent):
    order_id: str
    reservation_id: str
    items: List[dict]
    
    def _event_data(self) -> dict:
        return {
            "order_id": self.order_id,
            "reservation_id": self.reservation_id,
            "items": self.items
        }


@dataclass
class InventoryReservationFailed(DomainEvent):
    order_id: str
    reason: str
    
    def _event_data(self) -> dict:
        return {"order_id": self.order_id, "reason": self.reason}


@dataclass
class PaymentCompleted(DomainEvent):
    order_id: str
    payment_id: str
    amount: float
    
    def _event_data(self) -> dict:
        return {
            "order_id": self.order_id,
            "payment_id": self.payment_id,
            "amount": self.amount
        }


@dataclass
class PaymentFailed(DomainEvent):
    order_id: str
    reason: str
    
    def _event_data(self) -> dict:
        return {"order_id": self.order_id, "reason": self.reason}


@dataclass  
class InventoryReleased(DomainEvent):
    order_id: str
    reservation_id: str
    
    def _event_data(self) -> dict:
        return {
            "order_id": self.order_id,
            "reservation_id": self.reservation_id
        }


@dataclass
class OrderCompleted(DomainEvent):
    order_id: str
    
    def _event_data(self) -> dict:
        return {"order_id": self.order_id}


@dataclass
class OrderFailed(DomainEvent):
    order_id: str
    reason: str
    
    def _event_data(self) -> dict:
        return {"order_id": self.order_id, "reason": self.reason}


# =============================================================================
# Event Publisher/Subscriber
# =============================================================================

class EventBus:
    """Simple in-memory event bus for demonstration."""
    
    def __init__(self):
        self._handlers: Dict[str, List[callable]] = {}
    
    def subscribe(self, event_type: str, handler: callable):
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)
    
    async def publish(self, event: DomainEvent):
        event_type = event.__class__.__name__
        logger.info(f"Publishing event: {event_type} for saga {event.correlation_id}")
        
        handlers = self._handlers.get(event_type, [])
        for handler in handlers:
            try:
                await handler(event)
            except Exception as e:
                logger.error(f"Handler failed for {event_type}: {e}")


# =============================================================================
# Services (Choreography Participants)
# =============================================================================

class OrderService:
    """Order service - initiates and tracks order saga."""
    
    def __init__(self, db, event_bus: EventBus):
        self.db = db
        self.events = event_bus
        
        # Subscribe to events
        self.events.subscribe("InventoryReserved", self._on_inventory_reserved)
        self.events.subscribe("InventoryReservationFailed", self._on_inventory_failed)
        self.events.subscribe("PaymentCompleted", self._on_payment_completed)
        self.events.subscribe("PaymentFailed", self._on_payment_failed)
    
    async def create_order(self, customer_id: str, items: List[dict]) -> str:
        """Create order and start saga via event."""
        order_id = generate_id()
        total = sum(item['price'] * item['quantity'] for item in items)
        
        # Save order in PENDING state
        await self.db.execute(
            """
            INSERT INTO orders (id, customer_id, items, total, status, created_at)
            VALUES ($1, $2, $3, $4, 'PENDING', NOW())
            """,
            order_id, customer_id, json.dumps(items), total
        )
        
        # Publish event to start saga
        event = OrderCreated(
            event_id=generate_id(),
            correlation_id=order_id,  # Order ID is the saga ID
            order_id=order_id,
            customer_id=customer_id,
            items=items,
            total_amount=total
        )
        await self.events.publish(event)
        
        return order_id
    
    async def _on_inventory_reserved(self, event: InventoryReserved):
        """Handle inventory reserved - update order state."""
        await self.db.execute(
            """
            UPDATE orders 
            SET status = 'INVENTORY_RESERVED', reservation_id = $2
            WHERE id = $1
            """,
            event.order_id, event.reservation_id
        )
    
    async def _on_inventory_failed(self, event: InventoryReservationFailed):
        """Handle inventory failure - fail order."""
        await self.db.execute(
            "UPDATE orders SET status = 'FAILED', failure_reason = $2 WHERE id = $1",
            event.order_id, event.reason
        )
        
        await self.events.publish(OrderFailed(
            event_id=generate_id(),
            correlation_id=event.correlation_id,
            order_id=event.order_id,
            reason=event.reason
        ))
    
    async def _on_payment_completed(self, event: PaymentCompleted):
        """Handle payment completed - complete order."""
        await self.db.execute(
            """
            UPDATE orders 
            SET status = 'COMPLETED', payment_id = $2
            WHERE id = $1
            """,
            event.order_id, event.payment_id
        )
        
        await self.events.publish(OrderCompleted(
            event_id=generate_id(),
            correlation_id=event.correlation_id,
            order_id=event.order_id
        ))
    
    async def _on_payment_failed(self, event: PaymentFailed):
        """Handle payment failure - will trigger compensation."""
        await self.db.execute(
            "UPDATE orders SET status = 'PAYMENT_FAILED', failure_reason = $2 WHERE id = $1",
            event.order_id, event.reason
        )


class InventoryService:
    """Inventory service - reserves and releases inventory."""
    
    def __init__(self, db, event_bus: EventBus):
        self.db = db
        self.events = event_bus
        
        # Subscribe to events
        self.events.subscribe("OrderCreated", self._on_order_created)
        self.events.subscribe("PaymentFailed", self._on_payment_failed)
    
    async def _on_order_created(self, event: OrderCreated):
        """React to order created - try to reserve inventory."""
        try:
            reservation_id = await self._reserve_inventory(
                event.order_id, 
                event.items
            )
            
            await self.events.publish(InventoryReserved(
                event_id=generate_id(),
                correlation_id=event.correlation_id,
                order_id=event.order_id,
                reservation_id=reservation_id,
                items=event.items
            ))
            
        except InsufficientInventoryError as e:
            await self.events.publish(InventoryReservationFailed(
                event_id=generate_id(),
                correlation_id=event.correlation_id,
                order_id=event.order_id,
                reason=str(e)
            ))
    
    async def _on_payment_failed(self, event: PaymentFailed):
        """
        React to payment failed - COMPENSATE by releasing inventory.
        
        This is the compensation transaction triggered by choreography.
        """
        # Find the reservation for this order
        reservation = await self.db.fetch_one(
            "SELECT id FROM inventory_reservations WHERE order_id = $1 AND status = 'active'",
            event.order_id
        )
        
        if reservation:
            await self._release_inventory(reservation['id'])
            
            await self.events.publish(InventoryReleased(
                event_id=generate_id(),
                correlation_id=event.correlation_id,
                order_id=event.order_id,
                reservation_id=reservation['id']
            ))
    
    async def _reserve_inventory(self, order_id: str, items: List[dict]) -> str:
        """Reserve inventory for order items."""
        reservation_id = generate_id()
        
        async with self.db.transaction(isolation="SERIALIZABLE"):
            for item in items:
                # Check and decrement inventory
                result = await self.db.execute(
                    """
                    UPDATE inventory 
                    SET quantity = quantity - $2
                    WHERE product_id = $1 AND quantity >= $2
                    """,
                    item['product_id'], item['quantity']
                )
                
                if result.rowcount == 0:
                    raise InsufficientInventoryError(
                        f"Insufficient inventory for {item['product_id']}"
                    )
            
            # Create reservation record
            await self.db.execute(
                """
                INSERT INTO inventory_reservations (id, order_id, items, status, created_at)
                VALUES ($1, $2, $3, 'active', NOW())
                """,
                reservation_id, order_id, json.dumps(items)
            )
        
        return reservation_id
    
    async def _release_inventory(self, reservation_id: str):
        """Release inventory reservation (compensation)."""
        async with self.db.transaction():
            reservation = await self.db.fetch_one(
                """
                SELECT items FROM inventory_reservations 
                WHERE id = $1 AND status = 'active'
                FOR UPDATE
                """,
                reservation_id
            )
            
            if not reservation:
                logger.warning(f"Reservation {reservation_id} not found or not active")
                return  # Idempotent - already released
            
            items = json.loads(reservation['items'])
            
            for item in items:
                await self.db.execute(
                    "UPDATE inventory SET quantity = quantity + $2 WHERE product_id = $1",
                    item['product_id'], item['quantity']
                )
            
            await self.db.execute(
                "UPDATE inventory_reservations SET status = 'released' WHERE id = $1",
                reservation_id
            )


class PaymentService:
    """Payment service - charges and refunds payments."""
    
    def __init__(self, db, event_bus: EventBus, payment_gateway):
        self.db = db
        self.events = event_bus
        self.gateway = payment_gateway
        
        # Subscribe to events
        self.events.subscribe("InventoryReserved", self._on_inventory_reserved)
    
    async def _on_inventory_reserved(self, event: InventoryReserved):
        """React to inventory reserved - charge payment."""
        # Get order details
        order = await self.db.fetch_one(
            "SELECT customer_id, total FROM orders WHERE id = $1",
            event.order_id
        )
        
        try:
            payment_id = await self._charge_customer(
                order['customer_id'],
                order['total'],
                event.order_id
            )
            
            await self.events.publish(PaymentCompleted(
                event_id=generate_id(),
                correlation_id=event.correlation_id,
                order_id=event.order_id,
                payment_id=payment_id,
                amount=order['total']
            ))
            
        except PaymentError as e:
            await self.events.publish(PaymentFailed(
                event_id=generate_id(),
                correlation_id=event.correlation_id,
                order_id=event.order_id,
                reason=str(e)
            ))
    
    async def _charge_customer(
        self,
        customer_id: str,
        amount: float,
        order_id: str
    ) -> str:
        """Charge customer via payment gateway."""
        payment_id = generate_id()
        
        # Get customer payment method
        customer = await self.db.fetch_one(
            "SELECT payment_method_id FROM customers WHERE id = $1",
            customer_id
        )
        
        # Charge via gateway
        result = await self.gateway.charge(
            payment_method_id=customer['payment_method_id'],
            amount=amount,
            idempotency_key=f"order-{order_id}"  # Idempotent!
        )
        
        if not result.success:
            raise PaymentError(result.error_message)
        
        # Record payment
        await self.db.execute(
            """
            INSERT INTO payments (id, order_id, customer_id, amount, status, gateway_ref)
            VALUES ($1, $2, $3, $4, 'completed', $5)
            """,
            payment_id, order_id, customer_id, amount, result.reference
        )
        
        return payment_id

Chapter 6: Orchestration Implementation

# Saga Orchestration Implementation

from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any, Callable
from enum import Enum
from datetime import datetime, timedelta
import asyncio
import json
import logging

logger = logging.getLogger(__name__)


# =============================================================================
# Saga Definition
# =============================================================================

class SagaStepStatus(Enum):
    PENDING = "pending"
    EXECUTING = "executing"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"


class SagaStatus(Enum):
    STARTED = "started"
    EXECUTING = "executing"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"
    FAILED = "failed"


@dataclass
class SagaStep:
    """Definition of a saga step."""
    name: str
    action: Callable  # The forward action
    compensation: Optional[Callable]  # The compensation action
    is_pivot: bool = False  # After pivot, no compensation - must complete
    retryable: bool = True
    max_retries: int = 3
    timeout: timedelta = field(default_factory=lambda: timedelta(seconds=30))


@dataclass
class SagaStepResult:
    """Result of executing a saga step."""
    success: bool
    data: Optional[Dict[str, Any]] = None
    error: Optional[str] = None


@dataclass
class SagaState:
    """Persistent state of a saga execution."""
    saga_id: str
    saga_type: str
    status: SagaStatus
    current_step: int
    input_data: Dict[str, Any]
    step_results: Dict[str, Any]  # Results from each step
    created_at: datetime
    updated_at: datetime
    error: Optional[str] = None


# =============================================================================
# Saga Definition DSL
# =============================================================================

class SagaDefinition:
    """
    Fluent interface for defining sagas.
    
    Usage:
        saga = (SagaDefinition("OrderSaga")
            .step("reserve_inventory", reserve_fn, release_fn)
            .step("charge_payment", charge_fn, refund_fn)
            .step("create_shipment", ship_fn, None, is_pivot=True)
            .step("send_notification", notify_fn, None)
            .build())
    """
    
    def __init__(self, name: str):
        self.name = name
        self.steps: List[SagaStep] = []
    
    def step(
        self,
        name: str,
        action: Callable,
        compensation: Optional[Callable] = None,
        is_pivot: bool = False,
        retryable: bool = True,
        max_retries: int = 3,
        timeout_seconds: int = 30
    ) -> 'SagaDefinition':
        """Add a step to the saga."""
        self.steps.append(SagaStep(
            name=name,
            action=action,
            compensation=compensation,
            is_pivot=is_pivot,
            retryable=retryable,
            max_retries=max_retries,
            timeout=timedelta(seconds=timeout_seconds)
        ))
        return self
    
    def build(self) -> 'Saga':
        """Build the saga."""
        return Saga(self.name, self.steps)


class Saga:
    """Executable saga definition."""
    
    def __init__(self, name: str, steps: List[SagaStep]):
        self.name = name
        self.steps = steps
        
        # Validate: steps after pivot should not have compensation
        pivot_found = False
        for step in steps:
            if step.is_pivot:
                pivot_found = True
            elif pivot_found and step.compensation:
                logger.warning(
                    f"Step {step.name} has compensation after pivot - will be ignored"
                )


# =============================================================================
# Saga Orchestrator
# =============================================================================

class SagaOrchestrator:
    """
    Central orchestrator for executing sagas.
    
    Responsibilities:
    - Execute saga steps in order
    - Persist saga state for recovery
    - Handle failures and compensation
    - Retry failed steps
    """
    
    def __init__(self, db, sagas: Dict[str, Saga]):
        self.db = db
        self.sagas = sagas  # Registered saga definitions
    
    async def start_saga(
        self,
        saga_type: str,
        input_data: Dict[str, Any]
    ) -> str:
        """Start a new saga execution."""
        saga = self.sagas.get(saga_type)
        if not saga:
            raise ValueError(f"Unknown saga type: {saga_type}")
        
        saga_id = generate_id()
        
        state = SagaState(
            saga_id=saga_id,
            saga_type=saga_type,
            status=SagaStatus.STARTED,
            current_step=0,
            input_data=input_data,
            step_results={},
            created_at=datetime.utcnow(),
            updated_at=datetime.utcnow()
        )
        
        await self._save_state(state)
        
        # Execute asynchronously
        asyncio.create_task(self._execute_saga(state))
        
        return saga_id
    
    async def _execute_saga(self, state: SagaState):
        """Execute the saga steps."""
        saga = self.sagas[state.saga_type]
        
        state.status = SagaStatus.EXECUTING
        await self._save_state(state)
        
        try:
            # Execute each step
            for i, step in enumerate(saga.steps):
                state.current_step = i
                await self._save_state(state)
                
                result = await self._execute_step(step, state)
                
                if result.success:
                    state.step_results[step.name] = result.data
                    await self._save_state(state)
                else:
                    # Step failed
                    state.error = result.error
                    await self._save_state(state)
                    
                    if step.is_pivot or self._past_pivot(saga, i):
                        # Past pivot - must complete with forward recovery
                        await self._forward_recovery(saga, state, i)
                    else:
                        # Before pivot - compensate
                        await self._compensate(saga, state, i)
                    return
            
            # All steps completed successfully
            state.status = SagaStatus.COMPLETED
            await self._save_state(state)
            logger.info(f"Saga {state.saga_id} completed successfully")
            
        except Exception as e:
            logger.error(f"Saga {state.saga_id} failed with error: {e}")
            state.status = SagaStatus.FAILED
            state.error = str(e)
            await self._save_state(state)
    
    async def _execute_step(
        self,
        step: SagaStep,
        state: SagaState
    ) -> SagaStepResult:
        """Execute a single saga step with retry logic."""
        last_error = None
        
        for attempt in range(step.max_retries):
            try:
                # Build step context with all previous results
                context = {
                    "saga_id": state.saga_id,
                    "input": state.input_data,
                    "results": state.step_results
                }
                
                # Execute with timeout
                result = await asyncio.wait_for(
                    step.action(context),
                    timeout=step.timeout.total_seconds()
                )
                
                return SagaStepResult(success=True, data=result)
                
            except asyncio.TimeoutError:
                last_error = f"Step {step.name} timed out"
                logger.warning(f"{last_error}, attempt {attempt + 1}/{step.max_retries}")
                
            except Exception as e:
                last_error = str(e)
                logger.warning(
                    f"Step {step.name} failed: {e}, attempt {attempt + 1}/{step.max_retries}"
                )
            
            if attempt < step.max_retries - 1 and step.retryable:
                # Exponential backoff
                await asyncio.sleep(2 ** attempt)
        
        return SagaStepResult(success=False, error=last_error)
    
    async def _compensate(self, saga: Saga, state: SagaState, failed_step: int):
        """Execute compensation for completed steps in reverse order."""
        state.status = SagaStatus.COMPENSATING
        await self._save_state(state)
        
        logger.info(f"Starting compensation for saga {state.saga_id}")
        
        # Compensate in reverse order, excluding the failed step
        for i in range(failed_step - 1, -1, -1):
            step = saga.steps[i]
            
            if step.compensation is None:
                logger.info(f"No compensation for step {step.name}")
                continue
            
            if step.is_pivot:
                # Don't compensate pivot or steps before it
                logger.info(f"Reached pivot step {step.name}, stopping compensation")
                break
            
            await self._execute_compensation(step, state)
        
        state.status = SagaStatus.COMPENSATED
        await self._save_state(state)
        logger.info(f"Saga {state.saga_id} compensated")
    
    async def _execute_compensation(self, step: SagaStep, state: SagaState):
        """Execute compensation for a single step."""
        context = {
            "saga_id": state.saga_id,
            "input": state.input_data,
            "results": state.step_results,
            "step_result": state.step_results.get(step.name)
        }
        
        # Compensation should not fail - retry until success
        for attempt in range(10):  # More retries for compensation
            try:
                await asyncio.wait_for(
                    step.compensation(context),
                    timeout=step.timeout.total_seconds() * 2
                )
                logger.info(f"Compensated step {step.name}")
                return
                
            except Exception as e:
                logger.error(
                    f"Compensation for {step.name} failed: {e}, "
                    f"attempt {attempt + 1}/10"
                )
                await asyncio.sleep(2 ** attempt)
        
        # Compensation failed - this is bad!
        logger.critical(
            f"COMPENSATION FAILED for step {step.name} in saga {state.saga_id}. "
            f"Manual intervention required!"
        )
        # Alert operations team, create incident ticket, etc.
    
    async def _forward_recovery(
        self,
        saga: Saga,
        state: SagaState,
        failed_step: int
    ):
        """Retry steps after pivot until success."""
        logger.info(f"Forward recovery for saga {state.saga_id} from step {failed_step}")
        
        for i in range(failed_step, len(saga.steps)):
            step = saga.steps[i]
            
            # Retry indefinitely until success (with backoff)
            for attempt in range(100):  # Effectively unlimited
                result = await self._execute_step(step, state)
                
                if result.success:
                    state.step_results[step.name] = result.data
                    await self._save_state(state)
                    break
                
                wait_time = min(300, 2 ** attempt)  # Cap at 5 minutes
                logger.warning(
                    f"Forward recovery step {step.name} failed, "
                    f"retrying in {wait_time}s"
                )
                await asyncio.sleep(wait_time)
        
        state.status = SagaStatus.COMPLETED
        await self._save_state(state)
    
    def _past_pivot(self, saga: Saga, step_index: int) -> bool:
        """Check if we're past the pivot point."""
        for i in range(step_index):
            if saga.steps[i].is_pivot:
                return True
        return False
    
    async def _save_state(self, state: SagaState):
        """Persist saga state to database."""
        state.updated_at = datetime.utcnow()
        
        await self.db.execute(
            """
            INSERT INTO saga_state (saga_id, saga_type, status, current_step, 
                                   input_data, step_results, error, created_at, updated_at)
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
            ON CONFLICT (saga_id) DO UPDATE SET
                status = $3, current_step = $4, step_results = $6, 
                error = $7, updated_at = $9
            """,
            state.saga_id, state.saga_type, state.status.value, state.current_step,
            json.dumps(state.input_data), json.dumps(state.step_results),
            state.error, state.created_at, state.updated_at
        )
    
    async def recover_sagas(self):
        """
        Recover incomplete sagas after restart.
        
        Called on startup to resume interrupted sagas.
        """
        incomplete = await self.db.fetch(
            """
            SELECT * FROM saga_state 
            WHERE status IN ('started', 'executing', 'compensating')
            """
        )
        
        for row in incomplete:
            state = SagaState(
                saga_id=row['saga_id'],
                saga_type=row['saga_type'],
                status=SagaStatus(row['status']),
                current_step=row['current_step'],
                input_data=json.loads(row['input_data']),
                step_results=json.loads(row['step_results']),
                created_at=row['created_at'],
                updated_at=row['updated_at'],
                error=row['error']
            )
            
            logger.info(f"Recovering saga {state.saga_id} from step {state.current_step}")
            asyncio.create_task(self._execute_saga(state))


# =============================================================================
# Order Saga Definition
# =============================================================================

class OrderSagaSteps:
    """Step implementations for order saga."""
    
    def __init__(self, inventory_service, payment_service, shipping_service, notification_service):
        self.inventory = inventory_service
        self.payment = payment_service
        self.shipping = shipping_service
        self.notification = notification_service
    
    async def reserve_inventory(self, context: dict) -> dict:
        """Step 1: Reserve inventory."""
        order = context['input']
        
        reservation_id = await self.inventory.reserve(
            order_id=context['saga_id'],
            items=order['items']
        )
        
        return {"reservation_id": reservation_id}
    
    async def release_inventory(self, context: dict):
        """Compensation for reserve_inventory."""
        reservation_id = context['results'].get('reserve_inventory', {}).get('reservation_id')
        
        if reservation_id:
            await self.inventory.release(reservation_id)
    
    async def charge_payment(self, context: dict) -> dict:
        """Step 2: Charge payment."""
        order = context['input']
        
        payment_id = await self.payment.charge(
            customer_id=order['customer_id'],
            amount=order['total'],
            idempotency_key=f"saga-{context['saga_id']}"
        )
        
        return {"payment_id": payment_id}
    
    async def refund_payment(self, context: dict):
        """Compensation for charge_payment."""
        payment_id = context['results'].get('charge_payment', {}).get('payment_id')
        
        if payment_id:
            await self.payment.refund(
                payment_id=payment_id,
                idempotency_key=f"refund-{context['saga_id']}"
            )
    
    async def create_shipment(self, context: dict) -> dict:
        """Step 3: Create shipment (PIVOT - no compensation after this)."""
        order = context['input']
        
        shipment_id = await self.shipping.create_shipment(
            order_id=context['saga_id'],
            items=order['items'],
            address=order['shipping_address']
        )
        
        return {"shipment_id": shipment_id}
    
    async def send_notification(self, context: dict) -> dict:
        """Step 4: Send confirmation notification."""
        order = context['input']
        shipment_id = context['results']['create_shipment']['shipment_id']
        
        await self.notification.send_order_confirmation(
            customer_id=order['customer_id'],
            order_id=context['saga_id'],
            shipment_id=shipment_id
        )
        
        return {"notification_sent": True}


def create_order_saga(steps: OrderSagaSteps) -> Saga:
    """Create the order processing saga."""
    return (SagaDefinition("OrderSaga")
        .step(
            "reserve_inventory",
            steps.reserve_inventory,
            steps.release_inventory
        )
        .step(
            "charge_payment",
            steps.charge_payment,
            steps.refund_payment
        )
        .step(
            "create_shipment",
            steps.create_shipment,
            None,  # No compensation - this is the pivot
            is_pivot=True
        )
        .step(
            "send_notification",
            steps.send_notification,
            None  # Notification doesn't need compensation
        )
        .build())

Part III: Real-World Application

Chapter 7: Case Studies

7.1 Case Study: Uber Trip Saga

UBER TRIP SAGA

A trip involves multiple services that must coordinate:

FORWARD FLOW:
┌──────────────┐    ┌──────────────┐    ┌──────────────┐
│   Matching   │───▶│   Pricing    │───▶│   Dispatch   │
│   Service    │    │   Service    │    │   Service    │
└──────────────┘    └──────────────┘    └──────────────┘
       │                   │                   │
       ▼                   ▼                   ▼
  Match rider         Calculate fare      Assign driver
  to driver           and hold funds      and notify


COMPENSATION (if dispatch fails):
┌──────────────┐    ┌──────────────┐
│   Pricing    │◀───│   Matching   │
│   Service    │    │   Service    │
└──────────────┘    └──────────────┘
       │                   │
       ▼                   ▼
  Release held        Unmatch rider
  funds               and driver


KEY DECISIONS:
├── Dispatch is the pivot point (driver is on the way)
├── After dispatch, must complete (pay driver even if rider cancels)
├── Before dispatch, can compensate (no charge to rider)
└── Uses orchestration (centralized trip service)

7.2 Case Study: Airbnb Booking Saga

AIRBNB BOOKING SAGA

Steps:
  T1: Hold dates on calendar        C1: Release dates
  T2: Pre-authorize payment         C2: Void pre-auth
  T3: Confirm with host (async)     C3: Notify cancellation
  T4: Capture payment (PIVOT)       
  T5: Send confirmation emails

INTERESTING ASPECTS:

1. ASYNC HOST CONFIRMATION
   ├── Host has 24 hours to accept
   ├── Saga enters "waiting" state
   ├── Timer triggers compensation if host doesn't respond
   └── Implement with saga timeout/deadline

2. PIVOT AT PAYMENT CAPTURE
   ├── Once payment captured, booking is confirmed
   ├── Cancellation after capture = refund (different flow)
   └── Forward recovery ensures emails sent

3. IDEMPOTENCY
   ├── Pre-auth must be idempotent (same key = same auth)
   ├── Calendar hold must be idempotent
   └── Confirmation email should have dedup

7.3 Common Patterns Across Companies

Company Saga Type Pivot Point Key Learning
Uber Orchestrated Driver dispatch Compensate matching before dispatch
Airbnb Hybrid Payment capture Async steps with timeouts
Stripe Orchestrated Charge capture Idempotency keys for all steps
DoorDash Choreographed Restaurant acceptance Events for loose coupling
Netflix Orchestrated (Conductor) N/A (internal) Workflow visualization

Chapter 8: Common Mistakes

8.1 Mistake 1: Compensation That Can Fail

❌ WRONG: Compensation with external dependency

async def refund_payment(context):
    """Compensation that calls external API."""
    payment_id = context['results']['charge_payment']['payment_id']
    
    # This can fail! Network error, API down, etc.
    await stripe.refunds.create(payment=payment_id)
    
    # If this fails, saga is stuck in partial state!


✅ CORRECT: Idempotent compensation with retries

async def refund_payment(context):
    """Idempotent compensation with guaranteed completion."""
    payment_id = context['results']['charge_payment']['payment_id']
    saga_id = context['saga_id']
    
    # Idempotency key ensures we can retry safely
    idempotency_key = f"refund-{saga_id}-{payment_id}"
    
    # Record intent to refund FIRST (in our database)
    await db.execute(
        """
        INSERT INTO pending_refunds (saga_id, payment_id, status)
        VALUES ($1, $2, 'pending')
        ON CONFLICT (saga_id, payment_id) DO NOTHING
        """,
        saga_id, payment_id
    )
    
    # Background job will retry until Stripe confirms
    # Even if this call fails, job will pick it up
    try:
        await stripe.refunds.create(
            payment=payment_id,
            idempotency_key=idempotency_key
        )
        
        await db.execute(
            "UPDATE pending_refunds SET status = 'completed' WHERE saga_id = $1",
            saga_id
        )
    except Exception as e:
        logger.warning(f"Refund failed, will retry: {e}")
        # Background job handles retry

8.2 Mistake 2: Not Handling Partial Compensation

❌ WRONG: Assuming compensation is all-or-nothing

async def compensate_order(saga_id):
    """Compensate all steps - but what if one fails?"""
    await release_inventory(saga_id)  # Success
    await refund_payment(saga_id)     # FAILS! 
    await cancel_shipment(saga_id)    # Never runs!
    
    # Inventory released, but payment not refunded
    # Inconsistent state!


✅ CORRECT: Track compensation progress

async def compensate_order(saga_id):
    """Compensate with progress tracking."""
    compensation_steps = [
        ("inventory", release_inventory),
        ("payment", refund_payment),
        ("shipment", cancel_shipment),
    ]
    
    for step_name, compensation_fn in compensation_steps:
        # Check if already compensated
        status = await get_compensation_status(saga_id, step_name)
        
        if status == 'completed':
            continue
        
        try:
            await compensation_fn(saga_id)
            await mark_compensation_complete(saga_id, step_name)
        except Exception as e:
            # Log and continue - will retry later
            logger.error(f"Compensation {step_name} failed: {e}")
            await mark_compensation_failed(saga_id, step_name, str(e))
    
    # Background job retries failed compensations

8.3 Mistake 3: Non-Idempotent Steps

❌ WRONG: Step that charges multiple times on retry

async def charge_payment(context):
    """Non-idempotent payment - dangerous!"""
    customer_id = context['input']['customer_id']
    amount = context['input']['total']
    
    # If this times out and retries, customer charged twice!
    result = await stripe.charges.create(
        amount=amount,
        customer=customer_id
    )
    
    return {"charge_id": result.id}


✅ CORRECT: Idempotent step with idempotency key

async def charge_payment(context):
    """Idempotent payment - safe to retry."""
    customer_id = context['input']['customer_id']
    amount = context['input']['total']
    saga_id = context['saga_id']
    
    # Same key = same charge, even if called multiple times
    idempotency_key = f"charge-{saga_id}"
    
    result = await stripe.charges.create(
        amount=amount,
        customer=customer_id,
        idempotency_key=idempotency_key
    )
    
    return {"charge_id": result.id}

8.4 Mistake Checklist

  • Every step has idempotency key — Safe to retry
  • Compensation recorded before execution — Can resume
  • Compensation steps are idempotent — Safe to retry compensation
  • Pivot point is clearly defined — Know when to compensate vs forward recover
  • Timeouts on all external calls — Don't hang forever
  • Saga state is persisted — Can recover after crash
  • Compensation has unlimited retries — Must eventually succeed

Part IV: Interview Preparation

Chapter 9: Interview Tips

9.1 Key Phrases

INTRODUCING SAGAS:

"Since we have multiple services with separate databases, 
we can't use a traditional distributed transaction. Instead, 
I'd use the Saga pattern — a sequence of local transactions 
with compensating transactions if something fails."


EXPLAINING COMPENSATION:

"Compensation isn't rollback — it's a new transaction that 
semantically undoes the previous one. For example, if we 
charged a customer and then the saga fails, we don't 'rollback' 
the charge — we issue a refund, which is a new transaction."


ON CHOREOGRAPHY VS ORCHESTRATION:

"For simple sagas with 2-3 steps, choreography works well — 
each service reacts to events. But for complex flows, I prefer 
orchestration with a central coordinator. It's easier to 
understand, modify, and debug."


ON FAILURE HANDLING:

"The tricky part is compensation failures. If the refund fails, 
we can't just give up — we need to retry until it succeeds, 
possibly with human intervention. I'd use idempotency keys 
and a background job for retrying stuck compensations."


ON PIVOT POINTS:

"After a certain point, compensation doesn't make sense. For 
example, once we've shipped a physical item, we can't 'unship' 
it. That's the pivot point — after that, we use forward 
recovery and retry until the saga completes."

9.2 Common Questions

Question Good Answer
"Why not use 2PC?" "2PC requires holding locks across services while waiting for prepare responses. It's blocking, doesn't scale, and creates a single point of failure. Most modern databases and APIs don't even support it."
"What if compensation fails?" "Compensation must be designed to eventually succeed. Use idempotency keys, persist compensation intent first, and have a background job that retries failed compensations indefinitely. As a last resort, alert for human intervention."
"How do you handle concurrent sagas?" "Each saga operates independently. If they conflict (e.g., same inventory), the first to commit wins. Others fail and compensate. For high-contention resources, consider reservation patterns."
"Choreography or orchestration?" "Depends on complexity. Simple sagas (2-4 steps, clear flow): choreography. Complex sagas (5+ steps, branching, shared services): orchestration. Many teams use both."

Chapter 10: Practice Problems

Problem 1: Travel Booking Saga

Setup: Design a saga for booking a trip that includes flight + hotel + car rental.

Requirements:

  • All three must succeed, or none should be booked
  • Each vendor has different cancellation policies
  • Hotels require pre-authorization, flights require immediate payment
  • Users want to see progress

Questions:

  1. What's the order of steps and why?
  2. What's the pivot point?
  3. How do you handle hotel pre-auth expiration?

Order of steps:

  1. Reserve car (easiest to cancel, hold for 24h)
  2. Pre-authorize hotel (hold for 24-48h)
  3. Book flight (PIVOT - immediate charge, expensive to cancel)
  4. Capture hotel payment
  5. Confirm car rental

Pivot point: Flight booking. Airlines charge immediately and have expensive change fees.

Hotel pre-auth expiration:

  • Set saga deadline to 23 hours (before 24h expiration)
  • If saga not complete by deadline, compensate all steps
  • Background job monitors saga deadlines

Problem 2: Subscription Renewal Saga

Setup: Design a saga for monthly subscription renewal.

Requirements:

  • Charge payment
  • Update subscription status
  • Provision resources (if plan changed)
  • Send receipt email

Questions:

  1. What if payment fails?
  2. What if provisioning fails after payment?
  3. How do you handle retries for declined cards?

Payment failure:

  • No compensation needed (nothing to undo)
  • Mark subscription as "payment_failed"
  • Send payment failed email
  • Retry payment after 24h (grace period)

Provisioning failure after payment:

  • Refund payment (compensation)
  • Keep subscription on old plan
  • Send failure notification
  • Alert for manual review

Declined cards:

  • Not a saga failure — it's expected
  • Retry with exponential backoff (1, 3, 7 days)
  • After 3 failures, cancel subscription (separate saga)

Chapter 11: Sample Interview Dialogue

Scenario: Design an Order Processing System

Interviewer: "Let's design an order processing system for an e-commerce site. Walk me through how you'd handle the transaction."

You: "First, let me identify the services involved and their transactions..."

You sketch on the whiteboard:

Order Processing Steps:

1. Inventory Service: Reserve items
2. Payment Service: Charge customer  
3. Fulfillment Service: Create shipment
4. Notification Service: Send confirmation

You: "Since each service has its own database, I can't use a traditional database transaction. Instead, I'll use the Saga pattern. Each step is a local transaction, and each has a compensating transaction in case we need to rollback."

Interviewer: "What if the payment fails after inventory is reserved?"

You: "That's exactly why we need compensation. If payment fails, I'd trigger the compensation for the inventory step — release the reservation. The key is that these compensations must be idempotent. If we retry the release multiple times, it should have the same effect as doing it once."

You add to the diagram:

Step                    Compensation
─────                   ────────────
Reserve inventory  ───▶ Release inventory
Charge payment     ───▶ Refund payment
Create shipment    ───▶ (PIVOT - no compensation)
Send notification  ───▶ (No compensation needed)

You: "Notice the shipment step is marked as the pivot. Once we've initiated shipment, we can't really undo it — the package is on its way. So after that point, we use forward recovery instead of compensation. We retry until success."

Interviewer: "How would you implement this? Would you use choreography or orchestration?"

You: "For four steps with a linear flow, either could work. But I'd lean toward orchestration — a central saga coordinator that explicitly calls each service in order. It's easier to add logging, monitoring, and to understand the flow."

You sketch the orchestrator:

┌─────────────────────┐
│  Saga Orchestrator  │
└──────────┬──────────┘
           │
     ┌─────┴─────┐
     │           │
     ▼           ▼
  Execute   Compensate
  forward   (on failure)

You: "The orchestrator persists its state to a database. If it crashes mid-saga, it can recover and continue from where it left off. Each step is called with an idempotency key, so retries are safe."

Interviewer: "What happens if the compensation itself fails?"

You: "Great question — that's the hardest part. Compensations must be designed to eventually succeed. I'd:

  1. Record the compensation intent in our database BEFORE calling the external service
  2. Use an idempotency key so retries are safe
  3. Have a background job that periodically checks for stuck compensations and retries them
  4. If it still fails after many retries, alert the operations team for manual intervention

The key insight is that compensation failure doesn't mean 'give up' — it means 'keep trying'. We'd rather have a background job retrying a refund for hours than leave a customer incorrectly charged."

Interviewer: "How do you prevent double-charging if the payment step times out?"

You: "Idempotency keys. When I call the payment service, I include a unique key based on the saga ID:

await payment_service.charge(
    amount=order.total,
    idempotency_key=f'saga-{saga_id}-payment'
)

If the call times out and we retry, the payment service recognizes the idempotency key and returns the same result without charging again. Stripe, for example, supports this natively."


Summary

DAY 2 KEY TAKEAWAYS

WHY SAGAS:
• 2PC doesn't work in microservices (blocking, no support)
• Sagas use local transactions + compensation
• Each step is independent, no distributed locks

COMPENSATION DESIGN:
• Semantic reversal, not rollback
• Must be idempotent (safe to retry)
• Must eventually succeed (retry forever)
• Record intent before execution

CHOREOGRAPHY VS ORCHESTRATION:
• Choreography: Events between services, decentralized
  └── Good for simple sagas (2-4 steps)
• Orchestration: Central coordinator, explicit flow
  └── Good for complex sagas (5+ steps)

PIVOT POINT:
• Before pivot: Compensate (backward recovery)
• After pivot: Retry until success (forward recovery)
• Choose pivot based on business logic

FAILURE HANDLING:
• Persist saga state for crash recovery
• Idempotency keys for all external calls
• Background jobs for stuck compensations
• Human intervention as last resort

DEFAULT APPROACH:
• Start with orchestration
• Use idempotency keys everywhere
• Design compensation for guaranteed success
• Monitor compensation failure rate

Part V: Production Patterns

Chapter 12: Saga State Persistence

12.1 Database Schema for Saga State

-- Saga State Persistence Schema

-- Main saga state table
CREATE TABLE saga_instances (
    saga_id UUID PRIMARY KEY,
    saga_type VARCHAR(100) NOT NULL,
    status VARCHAR(50) NOT NULL,
    current_step INT NOT NULL DEFAULT 0,
    input_data JSONB NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
    completed_at TIMESTAMP,
    error_message TEXT,
    retry_count INT DEFAULT 0,
    
    -- Indexes for recovery queries
    INDEX idx_saga_status (status),
    INDEX idx_saga_type_status (saga_type, status),
    INDEX idx_saga_created (created_at)
);

-- Step execution history
CREATE TABLE saga_step_executions (
    id UUID PRIMARY KEY,
    saga_id UUID NOT NULL REFERENCES saga_instances(saga_id),
    step_name VARCHAR(100) NOT NULL,
    step_index INT NOT NULL,
    status VARCHAR(50) NOT NULL,
    input_data JSONB,
    output_data JSONB,
    error_message TEXT,
    started_at TIMESTAMP NOT NULL DEFAULT NOW(),
    completed_at TIMESTAMP,
    attempt_number INT NOT NULL DEFAULT 1,
    
    INDEX idx_step_saga (saga_id),
    UNIQUE (saga_id, step_name, attempt_number)
);

-- Compensation tracking
CREATE TABLE saga_compensations (
    id UUID PRIMARY KEY,
    saga_id UUID NOT NULL REFERENCES saga_instances(saga_id),
    step_name VARCHAR(100) NOT NULL,
    status VARCHAR(50) NOT NULL,  -- pending, executing, completed, failed
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    retry_count INT DEFAULT 0,
    last_error TEXT,
    next_retry_at TIMESTAMP,
    
    INDEX idx_compensation_status (status),
    INDEX idx_compensation_retry (status, next_retry_at),
    UNIQUE (saga_id, step_name)
);

-- Idempotency records
CREATE TABLE saga_idempotency_keys (
    idempotency_key VARCHAR(255) PRIMARY KEY,
    saga_id UUID NOT NULL,
    step_name VARCHAR(100) NOT NULL,
    response_data JSONB,
    created_at TIMESTAMP NOT NULL DEFAULT NOW(),
    expires_at TIMESTAMP NOT NULL,
    
    INDEX idx_idempotency_expires (expires_at)
);

12.2 Recovery Job Implementation

# Background Job for Saga Recovery

import asyncio
from datetime import datetime, timedelta
from typing import List


class SagaRecoveryJob:
    """
    Background job that recovers stuck sagas.
    
    Handles:
    - Sagas interrupted by crashes
    - Stuck compensations
    - Timed-out steps
    """
    
    def __init__(self, db, orchestrator: SagaOrchestrator):
        self.db = db
        self.orchestrator = orchestrator
        self.running = False
    
    async def start(self):
        """Start the recovery job loop."""
        self.running = True
        
        while self.running:
            try:
                await self._recover_stuck_sagas()
                await self._retry_failed_compensations()
                await self._cleanup_expired_idempotency_keys()
            except Exception as e:
                logger.error(f"Recovery job error: {e}")
            
            await asyncio.sleep(30)  # Run every 30 seconds
    
    async def stop(self):
        """Stop the recovery job."""
        self.running = False
    
    async def _recover_stuck_sagas(self):
        """Find and recover sagas stuck in executing state."""
        # Find sagas stuck for more than 5 minutes
        cutoff = datetime.utcnow() - timedelta(minutes=5)
        
        stuck_sagas = await self.db.fetch(
            """
            SELECT * FROM saga_instances 
            WHERE status IN ('started', 'executing')
            AND updated_at < $1
            """,
            cutoff
        )
        
        for saga in stuck_sagas:
            logger.info(f"Recovering stuck saga: {saga['saga_id']}")
            
            # Reconstruct state and resume
            state = await self._reconstruct_state(saga)
            asyncio.create_task(self.orchestrator._execute_saga(state))
    
    async def _retry_failed_compensations(self):
        """Retry compensations that failed but are due for retry."""
        now = datetime.utcnow()
        
        pending = await self.db.fetch(
            """
            SELECT c.*, s.saga_type
            FROM saga_compensations c
            JOIN saga_instances s ON c.saga_id = s.saga_id
            WHERE c.status = 'failed'
            AND c.next_retry_at <= $1
            """,
            now
        )
        
        for comp in pending:
            logger.info(
                f"Retrying compensation: {comp['step_name']} "
                f"for saga {comp['saga_id']}"
            )
            
            try:
                await self._execute_compensation(comp)
                
                await self.db.execute(
                    """
                    UPDATE saga_compensations 
                    SET status = 'completed', completed_at = NOW()
                    WHERE id = $1
                    """,
                    comp['id']
                )
                
            except Exception as e:
                retry_count = comp['retry_count'] + 1
                next_retry = now + timedelta(seconds=min(300, 2 ** retry_count))
                
                await self.db.execute(
                    """
                    UPDATE saga_compensations 
                    SET retry_count = $2, last_error = $3, next_retry_at = $4
                    WHERE id = $1
                    """,
                    comp['id'], retry_count, str(e), next_retry
                )
                
                if retry_count > 10:
                    logger.critical(
                        f"Compensation stuck after {retry_count} retries: "
                        f"{comp['saga_id']}/{comp['step_name']}"
                    )
                    # Alert operations team
    
    async def _cleanup_expired_idempotency_keys(self):
        """Clean up expired idempotency keys."""
        await self.db.execute(
            "DELETE FROM saga_idempotency_keys WHERE expires_at < NOW()"
        )
    
    async def _reconstruct_state(self, saga_row) -> SagaState:
        """Reconstruct saga state from database."""
        step_results = {}
        
        # Get completed step results
        steps = await self.db.fetch(
            """
            SELECT step_name, output_data 
            FROM saga_step_executions 
            WHERE saga_id = $1 AND status = 'completed'
            ORDER BY step_index
            """,
            saga_row['saga_id']
        )
        
        for step in steps:
            step_results[step['step_name']] = step['output_data']
        
        return SagaState(
            saga_id=str(saga_row['saga_id']),
            saga_type=saga_row['saga_type'],
            status=SagaStatus(saga_row['status']),
            current_step=saga_row['current_step'],
            input_data=saga_row['input_data'],
            step_results=step_results,
            created_at=saga_row['created_at'],
            updated_at=saga_row['updated_at'],
            error=saga_row['error_message']
        )
    
    async def _execute_compensation(self, comp: dict):
        """Execute a single compensation."""
        saga = self.orchestrator.sagas.get(comp['saga_type'])
        if not saga:
            raise ValueError(f"Unknown saga type: {comp['saga_type']}")
        
        # Find the step
        step = next(
            (s for s in saga.steps if s.name == comp['step_name']),
            None
        )
        
        if not step or not step.compensation:
            return
        
        # Get saga state for context
        saga_row = await self.db.fetch_one(
            "SELECT * FROM saga_instances WHERE saga_id = $1",
            comp['saga_id']
        )
        
        state = await self._reconstruct_state(saga_row)
        
        context = {
            "saga_id": state.saga_id,
            "input": state.input_data,
            "results": state.step_results,
            "step_result": state.step_results.get(comp['step_name'])
        }
        
        await step.compensation(context)


## Chapter 13: Monitoring and Observability

### 13.1 Saga Metrics

```python
# Saga Monitoring and Metrics

from prometheus_client import Counter, Histogram, Gauge
from dataclasses import dataclass
from typing import Dict


# Prometheus metrics
saga_started = Counter(
    'saga_started_total',
    'Total sagas started',
    ['saga_type']
)

saga_completed = Counter(
    'saga_completed_total',
    'Total sagas completed successfully',
    ['saga_type']
)

saga_failed = Counter(
    'saga_failed_total',
    'Total sagas that failed',
    ['saga_type', 'failed_step']
)

saga_compensated = Counter(
    'saga_compensated_total',
    'Total sagas that were compensated',
    ['saga_type']
)

saga_duration = Histogram(
    'saga_duration_seconds',
    'Time to complete saga',
    ['saga_type', 'outcome'],
    buckets=[0.1, 0.5, 1, 5, 10, 30, 60, 300, 600]
)

saga_step_duration = Histogram(
    'saga_step_duration_seconds',
    'Time to complete saga step',
    ['saga_type', 'step_name'],
    buckets=[0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30]
)

saga_in_progress = Gauge(
    'saga_in_progress',
    'Number of sagas currently executing',
    ['saga_type']
)

compensation_retries = Counter(
    'saga_compensation_retries_total',
    'Total compensation retry attempts',
    ['saga_type', 'step_name']
)


class SagaMetricsCollector:
    """Collects and reports saga metrics."""
    
    def __init__(self):
        self.start_times: Dict[str, float] = {}
        self.step_start_times: Dict[str, float] = {}
    
    def on_saga_started(self, saga_id: str, saga_type: str):
        saga_started.labels(saga_type=saga_type).inc()
        saga_in_progress.labels(saga_type=saga_type).inc()
        self.start_times[saga_id] = time.time()
    
    def on_saga_completed(self, saga_id: str, saga_type: str):
        saga_completed.labels(saga_type=saga_type).inc()
        saga_in_progress.labels(saga_type=saga_type).dec()
        
        if saga_id in self.start_times:
            duration = time.time() - self.start_times[saga_id]
            saga_duration.labels(
                saga_type=saga_type,
                outcome='completed'
            ).observe(duration)
            del self.start_times[saga_id]
    
    def on_saga_failed(self, saga_id: str, saga_type: str, failed_step: str):
        saga_failed.labels(
            saga_type=saga_type,
            failed_step=failed_step
        ).inc()
        saga_in_progress.labels(saga_type=saga_type).dec()
        
        if saga_id in self.start_times:
            duration = time.time() - self.start_times[saga_id]
            saga_duration.labels(
                saga_type=saga_type,
                outcome='failed'
            ).observe(duration)
            del self.start_times[saga_id]
    
    def on_saga_compensated(self, saga_id: str, saga_type: str):
        saga_compensated.labels(saga_type=saga_type).inc()
    
    def on_step_started(self, saga_id: str, step_name: str):
        key = f"{saga_id}:{step_name}"
        self.step_start_times[key] = time.time()
    
    def on_step_completed(
        self,
        saga_id: str,
        saga_type: str,
        step_name: str
    ):
        key = f"{saga_id}:{step_name}"
        if key in self.step_start_times:
            duration = time.time() - self.step_start_times[key]
            saga_step_duration.labels(
                saga_type=saga_type,
                step_name=step_name
            ).observe(duration)
            del self.step_start_times[key]
    
    def on_compensation_retry(self, saga_type: str, step_name: str):
        compensation_retries.labels(
            saga_type=saga_type,
            step_name=step_name
        ).inc()

13.2 Saga Dashboard and Alerting

# Alerting Rules (Prometheus/Alertmanager format)

ALERTING_RULES = """
groups:
  - name: saga_alerts
    rules:
      # High saga failure rate
      - alert: SagaHighFailureRate
        expr: |
          rate(saga_failed_total[5m]) / rate(saga_started_total[5m]) > 0.1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High saga failure rate"
          description: "More than 10% of sagas are failing"
      
      # Saga taking too long
      - alert: SagaSlowExecution
        expr: |
          histogram_quantile(0.99, rate(saga_duration_seconds_bucket[5m])) > 60
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Sagas taking too long"
          description: "p99 saga duration is above 60 seconds"
      
      # Stuck compensations
      - alert: SagaStuckCompensations
        expr: |
          saga_compensations_pending > 10
        for: 15m
        labels:
          severity: critical
        annotations:
          summary: "Stuck saga compensations"
          description: "{{ $value }} compensations have been pending for 15+ minutes"
      
      # Compensation retry storm
      - alert: SagaCompensationRetryStorm
        expr: |
          rate(saga_compensation_retries_total[5m]) > 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High compensation retry rate"
          description: "Compensations are being retried frequently"
"""


# Dashboard query examples
DASHBOARD_QUERIES = {
    "saga_success_rate": """
        sum(rate(saga_completed_total[5m])) / 
        sum(rate(saga_started_total[5m]))
    """,
    
    "saga_p99_duration": """
        histogram_quantile(0.99, 
            sum(rate(saga_duration_seconds_bucket[5m])) by (le, saga_type)
        )
    """,
    
    "saga_failure_by_step": """
        sum(rate(saga_failed_total[1h])) by (saga_type, failed_step)
    """,
    
    "sagas_in_progress": """
        sum(saga_in_progress) by (saga_type)
    """,
    
    "compensation_pending": """
        count(saga_compensations_pending > 0) by (saga_type)
    """
}

Chapter 14: Testing Sagas

14.1 Unit Testing Saga Steps

# Testing Saga Steps

import pytest
from unittest.mock import AsyncMock, MagicMock


class TestOrderSagaSteps:
    """Unit tests for order saga steps."""
    
    @pytest.fixture
    def inventory_service(self):
        service = AsyncMock()
        service.reserve.return_value = "reservation-123"
        service.release.return_value = True
        return service
    
    @pytest.fixture
    def payment_service(self):
        service = AsyncMock()
        service.charge.return_value = "payment-456"
        service.refund.return_value = True
        return service
    
    @pytest.fixture
    def saga_steps(self, inventory_service, payment_service):
        return OrderSagaSteps(
            inventory_service=inventory_service,
            payment_service=payment_service,
            shipping_service=AsyncMock(),
            notification_service=AsyncMock()
        )
    
    @pytest.mark.asyncio
    async def test_reserve_inventory_success(self, saga_steps, inventory_service):
        """Test successful inventory reservation."""
        context = {
            'saga_id': 'saga-123',
            'input': {
                'items': [{'product_id': 'prod-1', 'quantity': 2}]
            },
            'results': {}
        }
        
        result = await saga_steps.reserve_inventory(context)
        
        assert result == {'reservation_id': 'reservation-123'}
        inventory_service.reserve.assert_called_once_with(
            order_id='saga-123',
            items=[{'product_id': 'prod-1', 'quantity': 2}]
        )
    
    @pytest.mark.asyncio
    async def test_release_inventory_compensation(self, saga_steps, inventory_service):
        """Test inventory release compensation."""
        context = {
            'saga_id': 'saga-123',
            'input': {},
            'results': {
                'reserve_inventory': {'reservation_id': 'reservation-123'}
            }
        }
        
        await saga_steps.release_inventory(context)
        
        inventory_service.release.assert_called_once_with('reservation-123')
    
    @pytest.mark.asyncio
    async def test_release_inventory_idempotent(self, saga_steps, inventory_service):
        """Test compensation is idempotent when no reservation exists."""
        context = {
            'saga_id': 'saga-123',
            'input': {},
            'results': {}  # No reservation result
        }
        
        # Should not raise, should be no-op
        await saga_steps.release_inventory(context)
        
        inventory_service.release.assert_not_called()
    
    @pytest.mark.asyncio
    async def test_charge_payment_uses_idempotency_key(
        self, 
        saga_steps, 
        payment_service
    ):
        """Test payment uses idempotency key."""
        context = {
            'saga_id': 'saga-123',
            'input': {
                'customer_id': 'cust-1',
                'total': 99.99
            },
            'results': {}
        }
        
        await saga_steps.charge_payment(context)
        
        payment_service.charge.assert_called_once()
        call_kwargs = payment_service.charge.call_args.kwargs
        assert call_kwargs['idempotency_key'] == 'saga-saga-123'


### 14.2 Integration Testing Sagas

```python
# Integration Testing for Sagas

import pytest
from testcontainers.postgres import PostgresContainer


class TestOrderSagaIntegration:
    """Integration tests for complete order saga."""
    
    @pytest.fixture(scope="class")
    def postgres(self):
        with PostgresContainer("postgres:14") as pg:
            yield pg
    
    @pytest.fixture
    async def db(self, postgres):
        conn = await asyncpg.connect(postgres.get_connection_url())
        await self._setup_schema(conn)
        yield conn
        await conn.close()
    
    @pytest.fixture
    def orchestrator(self, db):
        steps = OrderSagaSteps(
            inventory_service=RealInventoryService(db),
            payment_service=MockPaymentService(),
            shipping_service=MockShippingService(),
            notification_service=MockNotificationService()
        )
        
        saga = create_order_saga(steps)
        return SagaOrchestrator(db, {"OrderSaga": saga})
    
    @pytest.mark.asyncio
    async def test_successful_order_saga(self, orchestrator, db):
        """Test complete successful saga execution."""
        saga_id = await orchestrator.start_saga(
            "OrderSaga",
            {
                "customer_id": "cust-123",
                "items": [{"product_id": "prod-1", "quantity": 1}],
                "total": 99.99,
                "shipping_address": "123 Main St"
            }
        )
        
        # Wait for saga to complete
        await asyncio.sleep(2)
        
        # Check saga completed
        state = await db.fetch_one(
            "SELECT status FROM saga_instances WHERE saga_id = $1",
            saga_id
        )
        assert state['status'] == 'completed'
        
        # Check all steps completed
        steps = await db.fetch(
            """
            SELECT step_name, status FROM saga_step_executions 
            WHERE saga_id = $1 ORDER BY step_index
            """,
            saga_id
        )
        assert all(s['status'] == 'completed' for s in steps)
    
    @pytest.mark.asyncio
    async def test_saga_compensation_on_payment_failure(
        self, 
        orchestrator, 
        db
    ):
        """Test saga compensates when payment fails."""
        # Configure payment to fail
        orchestrator.sagas["OrderSaga"].steps[1].action = AsyncMock(
            side_effect=PaymentError("Card declined")
        )
        
        saga_id = await orchestrator.start_saga(
            "OrderSaga",
            {
                "customer_id": "cust-123",
                "items": [{"product_id": "prod-1", "quantity": 1}],
                "total": 99.99
            }
        )
        
        await asyncio.sleep(2)
        
        # Check saga was compensated
        state = await db.fetch_one(
            "SELECT status FROM saga_instances WHERE saga_id = $1",
            saga_id
        )
        assert state['status'] == 'compensated'
        
        # Check inventory was released
        compensation = await db.fetch_one(
            """
            SELECT status FROM saga_compensations 
            WHERE saga_id = $1 AND step_name = 'reserve_inventory'
            """,
            saga_id
        )
        assert compensation['status'] == 'completed'
    
    @pytest.mark.asyncio
    async def test_saga_recovery_after_crash(self, orchestrator, db):
        """Test saga recovers after simulated crash."""
        saga_id = await orchestrator.start_saga(
            "OrderSaga",
            {"customer_id": "cust-123", "items": [], "total": 50.00}
        )
        
        # Simulate crash by stopping orchestrator mid-execution
        await asyncio.sleep(0.1)
        
        # Mark saga as stuck
        await db.execute(
            """
            UPDATE saga_instances 
            SET updated_at = NOW() - INTERVAL '10 minutes'
            WHERE saga_id = $1
            """,
            saga_id
        )
        
        # Run recovery
        recovery_job = SagaRecoveryJob(db, orchestrator)
        await recovery_job._recover_stuck_sagas()
        
        await asyncio.sleep(2)
        
        # Verify saga eventually completes
        state = await db.fetch_one(
            "SELECT status FROM saga_instances WHERE saga_id = $1",
            saga_id
        )
        assert state['status'] in ('completed', 'compensated')

📚 Further Reading

Papers

  • "Sagas" by Hector Garcia-Molina and Kenneth Salem (1987) — Original paper
  • "Life Beyond Distributed Transactions" by Pat Helland

Books

  • "Microservices Patterns" by Chris Richardson — Chapter 4 covers sagas
  • "Designing Data-Intensive Applications" by Martin Kleppmann — Distributed transactions

Engineering Blogs


End of Day 2: Distributed Transactions — The Saga Pattern

Tomorrow: Day 3 — Saga Orchestration Deep Dive. We'll explore workflow engines like Temporal, durable execution, and how to build production-grade saga orchestrators.