Week 5 — Day 2: Distributed Transactions — The Saga Pattern
System Design Mastery Series
Preface
Yesterday, we learned about consistency models — how to reason about when reads see writes in distributed systems. We discussed strong consistency, eventual consistency, and everything in between.
But there's a harder problem we glossed over:
THE DISTRIBUTED TRANSACTION PROBLEM
Single database transaction:
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 'A';
UPDATE accounts SET balance = balance + 100 WHERE id = 'B';
COMMIT;
If anything fails → ROLLBACK (automatic)
Money never disappears. ACID guarantees.
Microservices transaction:
Account Service Payment Service Inventory Service
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Debit $100 │────────▶│ Charge Card │────────▶│ Reserve Item│
│ ✓ Done │ │ ✓ Done │ │ ✗ FAILED │
└─────────────┘ └─────────────┘ └─────────────┘
Inventory service failed AFTER:
- Account was debited
- Card was charged
We can't just ROLLBACK — these are different databases!
What do we do?
This is the problem that Sagas solve.
Today, you'll learn:
- Why traditional distributed transactions (2PC) don't work in microservices
- The Saga pattern for distributed transactions
- Choreography vs Orchestration approaches
- How to design compensation logic
- Handling failures in compensation
Part I: Foundations
Chapter 1: The Problem with Distributed Transactions
1.1 What We Want: ACID Across Services
ACID PROPERTIES
A - Atomicity: All operations succeed or all fail
C - Consistency: Data remains valid after transaction
I - Isolation: Concurrent transactions don't interfere
D - Durability: Committed data survives failures
In a single database, we get these for free.
In microservices, each service has its own database.
How do we get ACID across services?
1.2 The Traditional Solution: Two-Phase Commit (2PC)
TWO-PHASE COMMIT PROTOCOL
Phase 1: PREPARE
Coordinator Participants
│ │
│─────── PREPARE ──────────▶│ Service A
│◀────── READY ─────────────│
│ │
│─────── PREPARE ──────────▶│ Service B
│◀────── READY ─────────────│
│ │
│─────── PREPARE ──────────▶│ Service C
│◀────── READY ─────────────│
Phase 2: COMMIT (if all ready)
Coordinator Participants
│ │
│─────── COMMIT ───────────▶│ Service A
│◀────── ACK ───────────────│
│ │
│─────── COMMIT ───────────▶│ Service B
│◀────── ACK ───────────────│
│ │
│─────── COMMIT ───────────▶│ Service C
│◀────── ACK ───────────────│
Phase 2: ABORT (if any not ready)
Coordinator Participants
│ │
│─────── ABORT ────────────▶│ All participants
│ │
│ (Everyone rolls back)
1.3 Why 2PC Fails in Microservices
PROBLEMS WITH 2PC
1. BLOCKING PROTOCOL
├── All participants HOLD LOCKS during prepare phase
├── If coordinator crashes after PREPARE, participants wait forever
├── Resources locked, other transactions blocked
└── System throughput collapses
2. COORDINATOR IS SINGLE POINT OF FAILURE
├── Coordinator crashes mid-transaction
├── Participants don't know whether to commit or abort
├── Need complex recovery protocols
└── In practice, often leaves data inconsistent
3. SYNCHRONOUS COMMUNICATION
├── All services must be available simultaneously
├── One slow service slows entire transaction
├── Network partition = stuck transaction
└── Violates microservices independence
4. DOESN'T SCALE
├── Lock duration proportional to slowest participant
├── Adding services increases failure probability
├── Latency is sum of all service latencies
└── Real microservices have 10+ services per transaction
5. NOT SUPPORTED BY MOST SERVICES
├── NoSQL databases don't support 2PC
├── Message queues don't support 2PC
├── Third-party APIs don't support 2PC
└── You can't tell Stripe to "prepare" a charge
1.4 The Alternative: Sagas
SAGA: A DIFFERENT APPROACH
Instead of:
"Lock everything, do everything, unlock everything"
Sagas say:
"Do each step. If a step fails, UNDO previous steps."
Key insight:
Each step is a LOCAL transaction (fast, independent)
If step N fails, run COMPENSATING transactions for steps 1..N-1
No distributed locks, no coordinator holding state
SAGA STRUCTURE:
T1 ──────▶ T2 ──────▶ T3 ──────▶ T4 ──────▶ SUCCESS
│ │ │ │
▼ ▼ ▼ ▼
C1 C2 C3 C4
(undo) (undo) (undo) (undo)
If T3 fails:
T1 ──────▶ T2 ──────▶ T3 ✗
│
▼
C2 ◀────── C3 not needed (T3 didn't complete)
│
▼
C1
│
▼
ROLLED BACK
Chapter 2: Saga Fundamentals
2.1 What Is a Saga?
A Saga is a sequence of local transactions where:
- Each transaction updates a single service
- Each transaction has a compensating transaction
- If any transaction fails, compensating transactions undo previous work
SAGA TERMINOLOGY
Transaction (Ti): A local database transaction in one service
Compensation (Ci): Undoes the effect of Ti (semantically, not literally)
Pivot Transaction: Point of no return - after this, saga must complete
Retriable Transaction: Safe to retry until success (idempotent)
Example: Order Processing Saga
T1: Reserve inventory C1: Release inventory
T2: Create payment C2: Refund payment
T3: Ship order (PIVOT) C3: (None - can't unship!)
T4: Send confirmation C4: (None - just notification)
T3 is the pivot - once shipped, we commit to completing.
Before T3, we can compensate. After T3, we must proceed.
2.2 Compensation vs Rollback
COMPENSATION IS NOT ROLLBACK
Database rollback:
- Undoes changes as if they never happened
- Uses transaction log
- Automatic, built-in
Saga compensation:
- Creates NEW transactions that reverse the effect
- Business logic, not database feature
- You must design and implement it
Example: Payment Compensation
Original: charge_customer(customer_id, $100)
Creates: Payment record, charge on card
Compensation: refund_customer(payment_id)
Creates: Refund record, credit on card
The original charge STILL EXISTS in history
The refund is a NEW transaction
Net effect: customer not charged
But audit trail shows both events
2.3 Designing Compensating Transactions
COMPENSATION DESIGN PRINCIPLES
1. SEMANTIC REVERSAL
├── Undo the business effect, not the technical operation
├── create_order → cancel_order (not DELETE)
├── reserve_inventory → release_inventory
└── charge_payment → refund_payment
2. IDEMPOTENT
├── Compensation might run multiple times (retries)
├── Running twice should have same effect as once
└── Use compensation IDs to prevent double-refunds
3. COMMUTATIVE WHERE POSSIBLE
├── Order of compensations shouldn't matter
├── Reduces coordination complexity
└── Not always possible (dependencies)
4. HANDLE MISSING ORIGINAL
├── What if T2 compensation runs but T2 never completed?
├── Compensation should check state first
└── Be a no-op if nothing to compensate
5. NEVER FAIL (or be retriable)
├── Compensation failure = stuck saga
├── Design compensations to always succeed
└── If must fail, alert for manual intervention
Chapter 3: Choreography vs Orchestration
3.1 Choreography: Event-Driven Sagas
In choreography, each service listens for events and decides what to do next.
CHOREOGRAPHY PATTERN
No central coordinator. Services react to events.
Order Saga via Choreography:
┌─────────────┐ order.created ┌─────────────┐
│ Order │─────────────────────▶ │ Inventory │
│ Service │ │ Service │
└─────────────┘ └──────┬──────┘
│
inventory.reserved│
▼
┌─────────────┐
│ Payment │
│ Service │
└──────┬──────┘
│
payment.completed│
▼
┌─────────────┐
│ Shipping │
│ Service │
└──────┬──────┘
│
shipping.created │
▼
┌─────────────┐
│ Order │
│ (complete) │
└─────────────┘
If Payment fails:
payment.failed event triggers:
- Inventory listens → releases reservation
- Order listens → marks order as failed
Pros:
- Simple for small sagas (2-3 steps)
- Loose coupling between services
- No single point of failure
Cons:
- Hard to understand full saga flow
- Cyclic dependencies possible
- Difficult to add/change steps
- Testing requires all services
3.2 Orchestration: Centralized Coordinator
In orchestration, a central coordinator tells each service what to do.
ORCHESTRATION PATTERN
Central orchestrator manages the saga flow.
Order Saga via Orchestration:
┌─────────────────┐
│ Orchestrator │
│ (Order Saga) │
└────────┬────────┘
│
┌──────────────────────┼──────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Inventory │ │ Payment │ │ Shipping │
│ Service │ │ Service │ │ Service │
└─────────────┘ └─────────────┘ └─────────────┘
Flow:
1. Orchestrator: "Inventory, reserve items"
2. Inventory: "Reserved" (or "Failed")
3. Orchestrator: "Payment, charge customer"
4. Payment: "Charged" (or "Failed")
5. Orchestrator: "Shipping, create shipment"
6. Shipping: "Created"
7. Orchestrator: "Saga complete"
If Payment fails:
Orchestrator handles compensation:
1. Orchestrator: "Inventory, release reservation"
2. Orchestrator: "Order, mark as failed"
Pros:
- Easy to understand (linear flow)
- Centralized saga logic
- Easy to add/modify steps
- Better for complex sagas
Cons:
- Orchestrator is coupling point
- Orchestrator must be highly available
- Risk of orchestrator becoming a monolith
3.3 When to Use Each
CHOOSING BETWEEN CHOREOGRAPHY AND ORCHESTRATION
Use CHOREOGRAPHY when:
├── Saga has 2-4 steps
├── Services already publish relevant events
├── Teams are independent and don't want coordination
├── Low complexity, well-understood domain
└── Example: Simple order → payment → notification
Use ORCHESTRATION when:
├── Saga has 5+ steps
├── Complex branching logic
├── Need visibility into saga state
├── Multiple sagas share services
├── Need to change saga logic frequently
└── Example: Travel booking (flight + hotel + car + insurance)
Real-world observation:
├── Start with choreography
├── Switch to orchestration when choreography becomes confusing
├── Many companies use both (simple sagas choreographed, complex orchestrated)
Chapter 4: Saga Execution Semantics
4.1 Forward Recovery vs Backward Recovery
RECOVERY STRATEGIES
BACKWARD RECOVERY (Compensate)
├── Step fails → undo previous steps
├── Return system to original state
├── Use when failure means "give up"
└── Example: Payment failed → cancel order
FORWARD RECOVERY (Retry)
├── Step fails → retry until success
├── Eventually complete the saga
├── Use when saga MUST complete
└── Example: Shipment tracking update (eventually consistent)
Many sagas use BOTH:
T1 ──────▶ T2 ──────▶ T3 (pivot) ──────▶ T4 ──────▶ T5
Before pivot: Backward recovery (compensate)
After pivot: Forward recovery (retry until success)
Example: E-commerce Order
Reserve ──▶ Charge ──▶ Ship ──▶ Notify ──▶ Complete
Inventory Payment (PIVOT) Customer Order
Before shipping: Can compensate (release inventory, refund)
After shipping: Must complete (can't unship, so notify and complete)
4.2 The Saga Execution Coordinator (SEC)
SAGA EXECUTION COORDINATOR (SEC)
The SEC is responsible for:
1. Starting sagas
2. Tracking saga state
3. Executing steps (or triggering them)
4. Handling failures and compensation
5. Ensuring saga completes (one way or another)
SEC State Machine:
┌──────────────────┐
│ STARTED │
└────────┬─────────┘
│
┌────────▼─────────┐
┌─────│ EXECUTING │────┐
│ └────────┬─────────┘ │
│ │ │
step fails all steps timeout
│ succeeded │
▼ │ ▼
┌─────────────────┐ │ ┌─────────────────┐
│ COMPENSATING │ │ │ COMPENSATION │
└────────┬────────┘ │ │ STARTED │
│ │ └────────┬────────┘
│ │ │
compensation │ ┌────────▼────────┐
complete │ │ COMPENSATING │
│ │ └────────┬────────┘
▼ ▼ │
┌─────────────────┐ ┌─────────────────┐
│ COMPENSATED │ │ COMPLETED │
│ (failed) │ │ (succeeded) │
└─────────────────┘ └─────────────────┘
Part II: Implementation
Chapter 5: Choreography Implementation
# Saga Choreography Implementation
import asyncio
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
from datetime import datetime
from enum import Enum
import json
import logging
logger = logging.getLogger(__name__)
# =============================================================================
# Domain Events
# =============================================================================
@dataclass
class DomainEvent:
"""Base class for domain events."""
event_id: str
correlation_id: str # Saga ID
timestamp: datetime = field(default_factory=datetime.utcnow)
def to_dict(self) -> dict:
return {
"event_id": self.event_id,
"correlation_id": self.correlation_id,
"timestamp": self.timestamp.isoformat(),
"event_type": self.__class__.__name__,
"data": self._event_data()
}
def _event_data(self) -> dict:
raise NotImplementedError
@dataclass
class OrderCreated(DomainEvent):
order_id: str
customer_id: str
items: List[dict]
total_amount: float
def _event_data(self) -> dict:
return {
"order_id": self.order_id,
"customer_id": self.customer_id,
"items": self.items,
"total_amount": self.total_amount
}
@dataclass
class InventoryReserved(DomainEvent):
order_id: str
reservation_id: str
items: List[dict]
def _event_data(self) -> dict:
return {
"order_id": self.order_id,
"reservation_id": self.reservation_id,
"items": self.items
}
@dataclass
class InventoryReservationFailed(DomainEvent):
order_id: str
reason: str
def _event_data(self) -> dict:
return {"order_id": self.order_id, "reason": self.reason}
@dataclass
class PaymentCompleted(DomainEvent):
order_id: str
payment_id: str
amount: float
def _event_data(self) -> dict:
return {
"order_id": self.order_id,
"payment_id": self.payment_id,
"amount": self.amount
}
@dataclass
class PaymentFailed(DomainEvent):
order_id: str
reason: str
def _event_data(self) -> dict:
return {"order_id": self.order_id, "reason": self.reason}
@dataclass
class InventoryReleased(DomainEvent):
order_id: str
reservation_id: str
def _event_data(self) -> dict:
return {
"order_id": self.order_id,
"reservation_id": self.reservation_id
}
@dataclass
class OrderCompleted(DomainEvent):
order_id: str
def _event_data(self) -> dict:
return {"order_id": self.order_id}
@dataclass
class OrderFailed(DomainEvent):
order_id: str
reason: str
def _event_data(self) -> dict:
return {"order_id": self.order_id, "reason": self.reason}
# =============================================================================
# Event Publisher/Subscriber
# =============================================================================
class EventBus:
"""Simple in-memory event bus for demonstration."""
def __init__(self):
self._handlers: Dict[str, List[callable]] = {}
def subscribe(self, event_type: str, handler: callable):
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
async def publish(self, event: DomainEvent):
event_type = event.__class__.__name__
logger.info(f"Publishing event: {event_type} for saga {event.correlation_id}")
handlers = self._handlers.get(event_type, [])
for handler in handlers:
try:
await handler(event)
except Exception as e:
logger.error(f"Handler failed for {event_type}: {e}")
# =============================================================================
# Services (Choreography Participants)
# =============================================================================
class OrderService:
"""Order service - initiates and tracks order saga."""
def __init__(self, db, event_bus: EventBus):
self.db = db
self.events = event_bus
# Subscribe to events
self.events.subscribe("InventoryReserved", self._on_inventory_reserved)
self.events.subscribe("InventoryReservationFailed", self._on_inventory_failed)
self.events.subscribe("PaymentCompleted", self._on_payment_completed)
self.events.subscribe("PaymentFailed", self._on_payment_failed)
async def create_order(self, customer_id: str, items: List[dict]) -> str:
"""Create order and start saga via event."""
order_id = generate_id()
total = sum(item['price'] * item['quantity'] for item in items)
# Save order in PENDING state
await self.db.execute(
"""
INSERT INTO orders (id, customer_id, items, total, status, created_at)
VALUES ($1, $2, $3, $4, 'PENDING', NOW())
""",
order_id, customer_id, json.dumps(items), total
)
# Publish event to start saga
event = OrderCreated(
event_id=generate_id(),
correlation_id=order_id, # Order ID is the saga ID
order_id=order_id,
customer_id=customer_id,
items=items,
total_amount=total
)
await self.events.publish(event)
return order_id
async def _on_inventory_reserved(self, event: InventoryReserved):
"""Handle inventory reserved - update order state."""
await self.db.execute(
"""
UPDATE orders
SET status = 'INVENTORY_RESERVED', reservation_id = $2
WHERE id = $1
""",
event.order_id, event.reservation_id
)
async def _on_inventory_failed(self, event: InventoryReservationFailed):
"""Handle inventory failure - fail order."""
await self.db.execute(
"UPDATE orders SET status = 'FAILED', failure_reason = $2 WHERE id = $1",
event.order_id, event.reason
)
await self.events.publish(OrderFailed(
event_id=generate_id(),
correlation_id=event.correlation_id,
order_id=event.order_id,
reason=event.reason
))
async def _on_payment_completed(self, event: PaymentCompleted):
"""Handle payment completed - complete order."""
await self.db.execute(
"""
UPDATE orders
SET status = 'COMPLETED', payment_id = $2
WHERE id = $1
""",
event.order_id, event.payment_id
)
await self.events.publish(OrderCompleted(
event_id=generate_id(),
correlation_id=event.correlation_id,
order_id=event.order_id
))
async def _on_payment_failed(self, event: PaymentFailed):
"""Handle payment failure - will trigger compensation."""
await self.db.execute(
"UPDATE orders SET status = 'PAYMENT_FAILED', failure_reason = $2 WHERE id = $1",
event.order_id, event.reason
)
class InventoryService:
"""Inventory service - reserves and releases inventory."""
def __init__(self, db, event_bus: EventBus):
self.db = db
self.events = event_bus
# Subscribe to events
self.events.subscribe("OrderCreated", self._on_order_created)
self.events.subscribe("PaymentFailed", self._on_payment_failed)
async def _on_order_created(self, event: OrderCreated):
"""React to order created - try to reserve inventory."""
try:
reservation_id = await self._reserve_inventory(
event.order_id,
event.items
)
await self.events.publish(InventoryReserved(
event_id=generate_id(),
correlation_id=event.correlation_id,
order_id=event.order_id,
reservation_id=reservation_id,
items=event.items
))
except InsufficientInventoryError as e:
await self.events.publish(InventoryReservationFailed(
event_id=generate_id(),
correlation_id=event.correlation_id,
order_id=event.order_id,
reason=str(e)
))
async def _on_payment_failed(self, event: PaymentFailed):
"""
React to payment failed - COMPENSATE by releasing inventory.
This is the compensation transaction triggered by choreography.
"""
# Find the reservation for this order
reservation = await self.db.fetch_one(
"SELECT id FROM inventory_reservations WHERE order_id = $1 AND status = 'active'",
event.order_id
)
if reservation:
await self._release_inventory(reservation['id'])
await self.events.publish(InventoryReleased(
event_id=generate_id(),
correlation_id=event.correlation_id,
order_id=event.order_id,
reservation_id=reservation['id']
))
async def _reserve_inventory(self, order_id: str, items: List[dict]) -> str:
"""Reserve inventory for order items."""
reservation_id = generate_id()
async with self.db.transaction(isolation="SERIALIZABLE"):
for item in items:
# Check and decrement inventory
result = await self.db.execute(
"""
UPDATE inventory
SET quantity = quantity - $2
WHERE product_id = $1 AND quantity >= $2
""",
item['product_id'], item['quantity']
)
if result.rowcount == 0:
raise InsufficientInventoryError(
f"Insufficient inventory for {item['product_id']}"
)
# Create reservation record
await self.db.execute(
"""
INSERT INTO inventory_reservations (id, order_id, items, status, created_at)
VALUES ($1, $2, $3, 'active', NOW())
""",
reservation_id, order_id, json.dumps(items)
)
return reservation_id
async def _release_inventory(self, reservation_id: str):
"""Release inventory reservation (compensation)."""
async with self.db.transaction():
reservation = await self.db.fetch_one(
"""
SELECT items FROM inventory_reservations
WHERE id = $1 AND status = 'active'
FOR UPDATE
""",
reservation_id
)
if not reservation:
logger.warning(f"Reservation {reservation_id} not found or not active")
return # Idempotent - already released
items = json.loads(reservation['items'])
for item in items:
await self.db.execute(
"UPDATE inventory SET quantity = quantity + $2 WHERE product_id = $1",
item['product_id'], item['quantity']
)
await self.db.execute(
"UPDATE inventory_reservations SET status = 'released' WHERE id = $1",
reservation_id
)
class PaymentService:
"""Payment service - charges and refunds payments."""
def __init__(self, db, event_bus: EventBus, payment_gateway):
self.db = db
self.events = event_bus
self.gateway = payment_gateway
# Subscribe to events
self.events.subscribe("InventoryReserved", self._on_inventory_reserved)
async def _on_inventory_reserved(self, event: InventoryReserved):
"""React to inventory reserved - charge payment."""
# Get order details
order = await self.db.fetch_one(
"SELECT customer_id, total FROM orders WHERE id = $1",
event.order_id
)
try:
payment_id = await self._charge_customer(
order['customer_id'],
order['total'],
event.order_id
)
await self.events.publish(PaymentCompleted(
event_id=generate_id(),
correlation_id=event.correlation_id,
order_id=event.order_id,
payment_id=payment_id,
amount=order['total']
))
except PaymentError as e:
await self.events.publish(PaymentFailed(
event_id=generate_id(),
correlation_id=event.correlation_id,
order_id=event.order_id,
reason=str(e)
))
async def _charge_customer(
self,
customer_id: str,
amount: float,
order_id: str
) -> str:
"""Charge customer via payment gateway."""
payment_id = generate_id()
# Get customer payment method
customer = await self.db.fetch_one(
"SELECT payment_method_id FROM customers WHERE id = $1",
customer_id
)
# Charge via gateway
result = await self.gateway.charge(
payment_method_id=customer['payment_method_id'],
amount=amount,
idempotency_key=f"order-{order_id}" # Idempotent!
)
if not result.success:
raise PaymentError(result.error_message)
# Record payment
await self.db.execute(
"""
INSERT INTO payments (id, order_id, customer_id, amount, status, gateway_ref)
VALUES ($1, $2, $3, $4, 'completed', $5)
""",
payment_id, order_id, customer_id, amount, result.reference
)
return payment_id
Chapter 6: Orchestration Implementation
# Saga Orchestration Implementation
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any, Callable
from enum import Enum
from datetime import datetime, timedelta
import asyncio
import json
import logging
logger = logging.getLogger(__name__)
# =============================================================================
# Saga Definition
# =============================================================================
class SagaStepStatus(Enum):
PENDING = "pending"
EXECUTING = "executing"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATING = "compensating"
COMPENSATED = "compensated"
class SagaStatus(Enum):
STARTED = "started"
EXECUTING = "executing"
COMPLETED = "completed"
COMPENSATING = "compensating"
COMPENSATED = "compensated"
FAILED = "failed"
@dataclass
class SagaStep:
"""Definition of a saga step."""
name: str
action: Callable # The forward action
compensation: Optional[Callable] # The compensation action
is_pivot: bool = False # After pivot, no compensation - must complete
retryable: bool = True
max_retries: int = 3
timeout: timedelta = field(default_factory=lambda: timedelta(seconds=30))
@dataclass
class SagaStepResult:
"""Result of executing a saga step."""
success: bool
data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
@dataclass
class SagaState:
"""Persistent state of a saga execution."""
saga_id: str
saga_type: str
status: SagaStatus
current_step: int
input_data: Dict[str, Any]
step_results: Dict[str, Any] # Results from each step
created_at: datetime
updated_at: datetime
error: Optional[str] = None
# =============================================================================
# Saga Definition DSL
# =============================================================================
class SagaDefinition:
"""
Fluent interface for defining sagas.
Usage:
saga = (SagaDefinition("OrderSaga")
.step("reserve_inventory", reserve_fn, release_fn)
.step("charge_payment", charge_fn, refund_fn)
.step("create_shipment", ship_fn, None, is_pivot=True)
.step("send_notification", notify_fn, None)
.build())
"""
def __init__(self, name: str):
self.name = name
self.steps: List[SagaStep] = []
def step(
self,
name: str,
action: Callable,
compensation: Optional[Callable] = None,
is_pivot: bool = False,
retryable: bool = True,
max_retries: int = 3,
timeout_seconds: int = 30
) -> 'SagaDefinition':
"""Add a step to the saga."""
self.steps.append(SagaStep(
name=name,
action=action,
compensation=compensation,
is_pivot=is_pivot,
retryable=retryable,
max_retries=max_retries,
timeout=timedelta(seconds=timeout_seconds)
))
return self
def build(self) -> 'Saga':
"""Build the saga."""
return Saga(self.name, self.steps)
class Saga:
"""Executable saga definition."""
def __init__(self, name: str, steps: List[SagaStep]):
self.name = name
self.steps = steps
# Validate: steps after pivot should not have compensation
pivot_found = False
for step in steps:
if step.is_pivot:
pivot_found = True
elif pivot_found and step.compensation:
logger.warning(
f"Step {step.name} has compensation after pivot - will be ignored"
)
# =============================================================================
# Saga Orchestrator
# =============================================================================
class SagaOrchestrator:
"""
Central orchestrator for executing sagas.
Responsibilities:
- Execute saga steps in order
- Persist saga state for recovery
- Handle failures and compensation
- Retry failed steps
"""
def __init__(self, db, sagas: Dict[str, Saga]):
self.db = db
self.sagas = sagas # Registered saga definitions
async def start_saga(
self,
saga_type: str,
input_data: Dict[str, Any]
) -> str:
"""Start a new saga execution."""
saga = self.sagas.get(saga_type)
if not saga:
raise ValueError(f"Unknown saga type: {saga_type}")
saga_id = generate_id()
state = SagaState(
saga_id=saga_id,
saga_type=saga_type,
status=SagaStatus.STARTED,
current_step=0,
input_data=input_data,
step_results={},
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
await self._save_state(state)
# Execute asynchronously
asyncio.create_task(self._execute_saga(state))
return saga_id
async def _execute_saga(self, state: SagaState):
"""Execute the saga steps."""
saga = self.sagas[state.saga_type]
state.status = SagaStatus.EXECUTING
await self._save_state(state)
try:
# Execute each step
for i, step in enumerate(saga.steps):
state.current_step = i
await self._save_state(state)
result = await self._execute_step(step, state)
if result.success:
state.step_results[step.name] = result.data
await self._save_state(state)
else:
# Step failed
state.error = result.error
await self._save_state(state)
if step.is_pivot or self._past_pivot(saga, i):
# Past pivot - must complete with forward recovery
await self._forward_recovery(saga, state, i)
else:
# Before pivot - compensate
await self._compensate(saga, state, i)
return
# All steps completed successfully
state.status = SagaStatus.COMPLETED
await self._save_state(state)
logger.info(f"Saga {state.saga_id} completed successfully")
except Exception as e:
logger.error(f"Saga {state.saga_id} failed with error: {e}")
state.status = SagaStatus.FAILED
state.error = str(e)
await self._save_state(state)
async def _execute_step(
self,
step: SagaStep,
state: SagaState
) -> SagaStepResult:
"""Execute a single saga step with retry logic."""
last_error = None
for attempt in range(step.max_retries):
try:
# Build step context with all previous results
context = {
"saga_id": state.saga_id,
"input": state.input_data,
"results": state.step_results
}
# Execute with timeout
result = await asyncio.wait_for(
step.action(context),
timeout=step.timeout.total_seconds()
)
return SagaStepResult(success=True, data=result)
except asyncio.TimeoutError:
last_error = f"Step {step.name} timed out"
logger.warning(f"{last_error}, attempt {attempt + 1}/{step.max_retries}")
except Exception as e:
last_error = str(e)
logger.warning(
f"Step {step.name} failed: {e}, attempt {attempt + 1}/{step.max_retries}"
)
if attempt < step.max_retries - 1 and step.retryable:
# Exponential backoff
await asyncio.sleep(2 ** attempt)
return SagaStepResult(success=False, error=last_error)
async def _compensate(self, saga: Saga, state: SagaState, failed_step: int):
"""Execute compensation for completed steps in reverse order."""
state.status = SagaStatus.COMPENSATING
await self._save_state(state)
logger.info(f"Starting compensation for saga {state.saga_id}")
# Compensate in reverse order, excluding the failed step
for i in range(failed_step - 1, -1, -1):
step = saga.steps[i]
if step.compensation is None:
logger.info(f"No compensation for step {step.name}")
continue
if step.is_pivot:
# Don't compensate pivot or steps before it
logger.info(f"Reached pivot step {step.name}, stopping compensation")
break
await self._execute_compensation(step, state)
state.status = SagaStatus.COMPENSATED
await self._save_state(state)
logger.info(f"Saga {state.saga_id} compensated")
async def _execute_compensation(self, step: SagaStep, state: SagaState):
"""Execute compensation for a single step."""
context = {
"saga_id": state.saga_id,
"input": state.input_data,
"results": state.step_results,
"step_result": state.step_results.get(step.name)
}
# Compensation should not fail - retry until success
for attempt in range(10): # More retries for compensation
try:
await asyncio.wait_for(
step.compensation(context),
timeout=step.timeout.total_seconds() * 2
)
logger.info(f"Compensated step {step.name}")
return
except Exception as e:
logger.error(
f"Compensation for {step.name} failed: {e}, "
f"attempt {attempt + 1}/10"
)
await asyncio.sleep(2 ** attempt)
# Compensation failed - this is bad!
logger.critical(
f"COMPENSATION FAILED for step {step.name} in saga {state.saga_id}. "
f"Manual intervention required!"
)
# Alert operations team, create incident ticket, etc.
async def _forward_recovery(
self,
saga: Saga,
state: SagaState,
failed_step: int
):
"""Retry steps after pivot until success."""
logger.info(f"Forward recovery for saga {state.saga_id} from step {failed_step}")
for i in range(failed_step, len(saga.steps)):
step = saga.steps[i]
# Retry indefinitely until success (with backoff)
for attempt in range(100): # Effectively unlimited
result = await self._execute_step(step, state)
if result.success:
state.step_results[step.name] = result.data
await self._save_state(state)
break
wait_time = min(300, 2 ** attempt) # Cap at 5 minutes
logger.warning(
f"Forward recovery step {step.name} failed, "
f"retrying in {wait_time}s"
)
await asyncio.sleep(wait_time)
state.status = SagaStatus.COMPLETED
await self._save_state(state)
def _past_pivot(self, saga: Saga, step_index: int) -> bool:
"""Check if we're past the pivot point."""
for i in range(step_index):
if saga.steps[i].is_pivot:
return True
return False
async def _save_state(self, state: SagaState):
"""Persist saga state to database."""
state.updated_at = datetime.utcnow()
await self.db.execute(
"""
INSERT INTO saga_state (saga_id, saga_type, status, current_step,
input_data, step_results, error, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (saga_id) DO UPDATE SET
status = $3, current_step = $4, step_results = $6,
error = $7, updated_at = $9
""",
state.saga_id, state.saga_type, state.status.value, state.current_step,
json.dumps(state.input_data), json.dumps(state.step_results),
state.error, state.created_at, state.updated_at
)
async def recover_sagas(self):
"""
Recover incomplete sagas after restart.
Called on startup to resume interrupted sagas.
"""
incomplete = await self.db.fetch(
"""
SELECT * FROM saga_state
WHERE status IN ('started', 'executing', 'compensating')
"""
)
for row in incomplete:
state = SagaState(
saga_id=row['saga_id'],
saga_type=row['saga_type'],
status=SagaStatus(row['status']),
current_step=row['current_step'],
input_data=json.loads(row['input_data']),
step_results=json.loads(row['step_results']),
created_at=row['created_at'],
updated_at=row['updated_at'],
error=row['error']
)
logger.info(f"Recovering saga {state.saga_id} from step {state.current_step}")
asyncio.create_task(self._execute_saga(state))
# =============================================================================
# Order Saga Definition
# =============================================================================
class OrderSagaSteps:
"""Step implementations for order saga."""
def __init__(self, inventory_service, payment_service, shipping_service, notification_service):
self.inventory = inventory_service
self.payment = payment_service
self.shipping = shipping_service
self.notification = notification_service
async def reserve_inventory(self, context: dict) -> dict:
"""Step 1: Reserve inventory."""
order = context['input']
reservation_id = await self.inventory.reserve(
order_id=context['saga_id'],
items=order['items']
)
return {"reservation_id": reservation_id}
async def release_inventory(self, context: dict):
"""Compensation for reserve_inventory."""
reservation_id = context['results'].get('reserve_inventory', {}).get('reservation_id')
if reservation_id:
await self.inventory.release(reservation_id)
async def charge_payment(self, context: dict) -> dict:
"""Step 2: Charge payment."""
order = context['input']
payment_id = await self.payment.charge(
customer_id=order['customer_id'],
amount=order['total'],
idempotency_key=f"saga-{context['saga_id']}"
)
return {"payment_id": payment_id}
async def refund_payment(self, context: dict):
"""Compensation for charge_payment."""
payment_id = context['results'].get('charge_payment', {}).get('payment_id')
if payment_id:
await self.payment.refund(
payment_id=payment_id,
idempotency_key=f"refund-{context['saga_id']}"
)
async def create_shipment(self, context: dict) -> dict:
"""Step 3: Create shipment (PIVOT - no compensation after this)."""
order = context['input']
shipment_id = await self.shipping.create_shipment(
order_id=context['saga_id'],
items=order['items'],
address=order['shipping_address']
)
return {"shipment_id": shipment_id}
async def send_notification(self, context: dict) -> dict:
"""Step 4: Send confirmation notification."""
order = context['input']
shipment_id = context['results']['create_shipment']['shipment_id']
await self.notification.send_order_confirmation(
customer_id=order['customer_id'],
order_id=context['saga_id'],
shipment_id=shipment_id
)
return {"notification_sent": True}
def create_order_saga(steps: OrderSagaSteps) -> Saga:
"""Create the order processing saga."""
return (SagaDefinition("OrderSaga")
.step(
"reserve_inventory",
steps.reserve_inventory,
steps.release_inventory
)
.step(
"charge_payment",
steps.charge_payment,
steps.refund_payment
)
.step(
"create_shipment",
steps.create_shipment,
None, # No compensation - this is the pivot
is_pivot=True
)
.step(
"send_notification",
steps.send_notification,
None # Notification doesn't need compensation
)
.build())
Part III: Real-World Application
Chapter 7: Case Studies
7.1 Case Study: Uber Trip Saga
UBER TRIP SAGA
A trip involves multiple services that must coordinate:
FORWARD FLOW:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Matching │───▶│ Pricing │───▶│ Dispatch │
│ Service │ │ Service │ │ Service │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
▼ ▼ ▼
Match rider Calculate fare Assign driver
to driver and hold funds and notify
COMPENSATION (if dispatch fails):
┌──────────────┐ ┌──────────────┐
│ Pricing │◀───│ Matching │
│ Service │ │ Service │
└──────────────┘ └──────────────┘
│ │
▼ ▼
Release held Unmatch rider
funds and driver
KEY DECISIONS:
├── Dispatch is the pivot point (driver is on the way)
├── After dispatch, must complete (pay driver even if rider cancels)
├── Before dispatch, can compensate (no charge to rider)
└── Uses orchestration (centralized trip service)
7.2 Case Study: Airbnb Booking Saga
AIRBNB BOOKING SAGA
Steps:
T1: Hold dates on calendar C1: Release dates
T2: Pre-authorize payment C2: Void pre-auth
T3: Confirm with host (async) C3: Notify cancellation
T4: Capture payment (PIVOT)
T5: Send confirmation emails
INTERESTING ASPECTS:
1. ASYNC HOST CONFIRMATION
├── Host has 24 hours to accept
├── Saga enters "waiting" state
├── Timer triggers compensation if host doesn't respond
└── Implement with saga timeout/deadline
2. PIVOT AT PAYMENT CAPTURE
├── Once payment captured, booking is confirmed
├── Cancellation after capture = refund (different flow)
└── Forward recovery ensures emails sent
3. IDEMPOTENCY
├── Pre-auth must be idempotent (same key = same auth)
├── Calendar hold must be idempotent
└── Confirmation email should have dedup
7.3 Common Patterns Across Companies
| Company | Saga Type | Pivot Point | Key Learning |
|---|---|---|---|
| Uber | Orchestrated | Driver dispatch | Compensate matching before dispatch |
| Airbnb | Hybrid | Payment capture | Async steps with timeouts |
| Stripe | Orchestrated | Charge capture | Idempotency keys for all steps |
| DoorDash | Choreographed | Restaurant acceptance | Events for loose coupling |
| Netflix | Orchestrated (Conductor) | N/A (internal) | Workflow visualization |
Chapter 8: Common Mistakes
8.1 Mistake 1: Compensation That Can Fail
❌ WRONG: Compensation with external dependency
async def refund_payment(context):
"""Compensation that calls external API."""
payment_id = context['results']['charge_payment']['payment_id']
# This can fail! Network error, API down, etc.
await stripe.refunds.create(payment=payment_id)
# If this fails, saga is stuck in partial state!
✅ CORRECT: Idempotent compensation with retries
async def refund_payment(context):
"""Idempotent compensation with guaranteed completion."""
payment_id = context['results']['charge_payment']['payment_id']
saga_id = context['saga_id']
# Idempotency key ensures we can retry safely
idempotency_key = f"refund-{saga_id}-{payment_id}"
# Record intent to refund FIRST (in our database)
await db.execute(
"""
INSERT INTO pending_refunds (saga_id, payment_id, status)
VALUES ($1, $2, 'pending')
ON CONFLICT (saga_id, payment_id) DO NOTHING
""",
saga_id, payment_id
)
# Background job will retry until Stripe confirms
# Even if this call fails, job will pick it up
try:
await stripe.refunds.create(
payment=payment_id,
idempotency_key=idempotency_key
)
await db.execute(
"UPDATE pending_refunds SET status = 'completed' WHERE saga_id = $1",
saga_id
)
except Exception as e:
logger.warning(f"Refund failed, will retry: {e}")
# Background job handles retry
8.2 Mistake 2: Not Handling Partial Compensation
❌ WRONG: Assuming compensation is all-or-nothing
async def compensate_order(saga_id):
"""Compensate all steps - but what if one fails?"""
await release_inventory(saga_id) # Success
await refund_payment(saga_id) # FAILS!
await cancel_shipment(saga_id) # Never runs!
# Inventory released, but payment not refunded
# Inconsistent state!
✅ CORRECT: Track compensation progress
async def compensate_order(saga_id):
"""Compensate with progress tracking."""
compensation_steps = [
("inventory", release_inventory),
("payment", refund_payment),
("shipment", cancel_shipment),
]
for step_name, compensation_fn in compensation_steps:
# Check if already compensated
status = await get_compensation_status(saga_id, step_name)
if status == 'completed':
continue
try:
await compensation_fn(saga_id)
await mark_compensation_complete(saga_id, step_name)
except Exception as e:
# Log and continue - will retry later
logger.error(f"Compensation {step_name} failed: {e}")
await mark_compensation_failed(saga_id, step_name, str(e))
# Background job retries failed compensations
8.3 Mistake 3: Non-Idempotent Steps
❌ WRONG: Step that charges multiple times on retry
async def charge_payment(context):
"""Non-idempotent payment - dangerous!"""
customer_id = context['input']['customer_id']
amount = context['input']['total']
# If this times out and retries, customer charged twice!
result = await stripe.charges.create(
amount=amount,
customer=customer_id
)
return {"charge_id": result.id}
✅ CORRECT: Idempotent step with idempotency key
async def charge_payment(context):
"""Idempotent payment - safe to retry."""
customer_id = context['input']['customer_id']
amount = context['input']['total']
saga_id = context['saga_id']
# Same key = same charge, even if called multiple times
idempotency_key = f"charge-{saga_id}"
result = await stripe.charges.create(
amount=amount,
customer=customer_id,
idempotency_key=idempotency_key
)
return {"charge_id": result.id}
8.4 Mistake Checklist
- Every step has idempotency key — Safe to retry
- Compensation recorded before execution — Can resume
- Compensation steps are idempotent — Safe to retry compensation
- Pivot point is clearly defined — Know when to compensate vs forward recover
- Timeouts on all external calls — Don't hang forever
- Saga state is persisted — Can recover after crash
- Compensation has unlimited retries — Must eventually succeed
Part IV: Interview Preparation
Chapter 9: Interview Tips
9.1 Key Phrases
INTRODUCING SAGAS:
"Since we have multiple services with separate databases,
we can't use a traditional distributed transaction. Instead,
I'd use the Saga pattern — a sequence of local transactions
with compensating transactions if something fails."
EXPLAINING COMPENSATION:
"Compensation isn't rollback — it's a new transaction that
semantically undoes the previous one. For example, if we
charged a customer and then the saga fails, we don't 'rollback'
the charge — we issue a refund, which is a new transaction."
ON CHOREOGRAPHY VS ORCHESTRATION:
"For simple sagas with 2-3 steps, choreography works well —
each service reacts to events. But for complex flows, I prefer
orchestration with a central coordinator. It's easier to
understand, modify, and debug."
ON FAILURE HANDLING:
"The tricky part is compensation failures. If the refund fails,
we can't just give up — we need to retry until it succeeds,
possibly with human intervention. I'd use idempotency keys
and a background job for retrying stuck compensations."
ON PIVOT POINTS:
"After a certain point, compensation doesn't make sense. For
example, once we've shipped a physical item, we can't 'unship'
it. That's the pivot point — after that, we use forward
recovery and retry until the saga completes."
9.2 Common Questions
| Question | Good Answer |
|---|---|
| "Why not use 2PC?" | "2PC requires holding locks across services while waiting for prepare responses. It's blocking, doesn't scale, and creates a single point of failure. Most modern databases and APIs don't even support it." |
| "What if compensation fails?" | "Compensation must be designed to eventually succeed. Use idempotency keys, persist compensation intent first, and have a background job that retries failed compensations indefinitely. As a last resort, alert for human intervention." |
| "How do you handle concurrent sagas?" | "Each saga operates independently. If they conflict (e.g., same inventory), the first to commit wins. Others fail and compensate. For high-contention resources, consider reservation patterns." |
| "Choreography or orchestration?" | "Depends on complexity. Simple sagas (2-4 steps, clear flow): choreography. Complex sagas (5+ steps, branching, shared services): orchestration. Many teams use both." |
Chapter 10: Practice Problems
Problem 1: Travel Booking Saga
Setup: Design a saga for booking a trip that includes flight + hotel + car rental.
Requirements:
- All three must succeed, or none should be booked
- Each vendor has different cancellation policies
- Hotels require pre-authorization, flights require immediate payment
- Users want to see progress
Questions:
- What's the order of steps and why?
- What's the pivot point?
- How do you handle hotel pre-auth expiration?
Order of steps:
- Reserve car (easiest to cancel, hold for 24h)
- Pre-authorize hotel (hold for 24-48h)
- Book flight (PIVOT - immediate charge, expensive to cancel)
- Capture hotel payment
- Confirm car rental
Pivot point: Flight booking. Airlines charge immediately and have expensive change fees.
Hotel pre-auth expiration:
- Set saga deadline to 23 hours (before 24h expiration)
- If saga not complete by deadline, compensate all steps
- Background job monitors saga deadlines
Problem 2: Subscription Renewal Saga
Setup: Design a saga for monthly subscription renewal.
Requirements:
- Charge payment
- Update subscription status
- Provision resources (if plan changed)
- Send receipt email
Questions:
- What if payment fails?
- What if provisioning fails after payment?
- How do you handle retries for declined cards?
Payment failure:
- No compensation needed (nothing to undo)
- Mark subscription as "payment_failed"
- Send payment failed email
- Retry payment after 24h (grace period)
Provisioning failure after payment:
- Refund payment (compensation)
- Keep subscription on old plan
- Send failure notification
- Alert for manual review
Declined cards:
- Not a saga failure — it's expected
- Retry with exponential backoff (1, 3, 7 days)
- After 3 failures, cancel subscription (separate saga)
Chapter 11: Sample Interview Dialogue
Scenario: Design an Order Processing System
Interviewer: "Let's design an order processing system for an e-commerce site. Walk me through how you'd handle the transaction."
You: "First, let me identify the services involved and their transactions..."
You sketch on the whiteboard:
Order Processing Steps:
1. Inventory Service: Reserve items
2. Payment Service: Charge customer
3. Fulfillment Service: Create shipment
4. Notification Service: Send confirmation
You: "Since each service has its own database, I can't use a traditional database transaction. Instead, I'll use the Saga pattern. Each step is a local transaction, and each has a compensating transaction in case we need to rollback."
Interviewer: "What if the payment fails after inventory is reserved?"
You: "That's exactly why we need compensation. If payment fails, I'd trigger the compensation for the inventory step — release the reservation. The key is that these compensations must be idempotent. If we retry the release multiple times, it should have the same effect as doing it once."
You add to the diagram:
Step Compensation
───── ────────────
Reserve inventory ───▶ Release inventory
Charge payment ───▶ Refund payment
Create shipment ───▶ (PIVOT - no compensation)
Send notification ───▶ (No compensation needed)
You: "Notice the shipment step is marked as the pivot. Once we've initiated shipment, we can't really undo it — the package is on its way. So after that point, we use forward recovery instead of compensation. We retry until success."
Interviewer: "How would you implement this? Would you use choreography or orchestration?"
You: "For four steps with a linear flow, either could work. But I'd lean toward orchestration — a central saga coordinator that explicitly calls each service in order. It's easier to add logging, monitoring, and to understand the flow."
You sketch the orchestrator:
┌─────────────────────┐
│ Saga Orchestrator │
└──────────┬──────────┘
│
┌─────┴─────┐
│ │
▼ ▼
Execute Compensate
forward (on failure)
You: "The orchestrator persists its state to a database. If it crashes mid-saga, it can recover and continue from where it left off. Each step is called with an idempotency key, so retries are safe."
Interviewer: "What happens if the compensation itself fails?"
You: "Great question — that's the hardest part. Compensations must be designed to eventually succeed. I'd:
- Record the compensation intent in our database BEFORE calling the external service
- Use an idempotency key so retries are safe
- Have a background job that periodically checks for stuck compensations and retries them
- If it still fails after many retries, alert the operations team for manual intervention
The key insight is that compensation failure doesn't mean 'give up' — it means 'keep trying'. We'd rather have a background job retrying a refund for hours than leave a customer incorrectly charged."
Interviewer: "How do you prevent double-charging if the payment step times out?"
You: "Idempotency keys. When I call the payment service, I include a unique key based on the saga ID:
await payment_service.charge(
amount=order.total,
idempotency_key=f'saga-{saga_id}-payment'
)
If the call times out and we retry, the payment service recognizes the idempotency key and returns the same result without charging again. Stripe, for example, supports this natively."
Summary
DAY 2 KEY TAKEAWAYS
WHY SAGAS:
• 2PC doesn't work in microservices (blocking, no support)
• Sagas use local transactions + compensation
• Each step is independent, no distributed locks
COMPENSATION DESIGN:
• Semantic reversal, not rollback
• Must be idempotent (safe to retry)
• Must eventually succeed (retry forever)
• Record intent before execution
CHOREOGRAPHY VS ORCHESTRATION:
• Choreography: Events between services, decentralized
└── Good for simple sagas (2-4 steps)
• Orchestration: Central coordinator, explicit flow
└── Good for complex sagas (5+ steps)
PIVOT POINT:
• Before pivot: Compensate (backward recovery)
• After pivot: Retry until success (forward recovery)
• Choose pivot based on business logic
FAILURE HANDLING:
• Persist saga state for crash recovery
• Idempotency keys for all external calls
• Background jobs for stuck compensations
• Human intervention as last resort
DEFAULT APPROACH:
• Start with orchestration
• Use idempotency keys everywhere
• Design compensation for guaranteed success
• Monitor compensation failure rate
Part V: Production Patterns
Chapter 12: Saga State Persistence
12.1 Database Schema for Saga State
-- Saga State Persistence Schema
-- Main saga state table
CREATE TABLE saga_instances (
saga_id UUID PRIMARY KEY,
saga_type VARCHAR(100) NOT NULL,
status VARCHAR(50) NOT NULL,
current_step INT NOT NULL DEFAULT 0,
input_data JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
completed_at TIMESTAMP,
error_message TEXT,
retry_count INT DEFAULT 0,
-- Indexes for recovery queries
INDEX idx_saga_status (status),
INDEX idx_saga_type_status (saga_type, status),
INDEX idx_saga_created (created_at)
);
-- Step execution history
CREATE TABLE saga_step_executions (
id UUID PRIMARY KEY,
saga_id UUID NOT NULL REFERENCES saga_instances(saga_id),
step_name VARCHAR(100) NOT NULL,
step_index INT NOT NULL,
status VARCHAR(50) NOT NULL,
input_data JSONB,
output_data JSONB,
error_message TEXT,
started_at TIMESTAMP NOT NULL DEFAULT NOW(),
completed_at TIMESTAMP,
attempt_number INT NOT NULL DEFAULT 1,
INDEX idx_step_saga (saga_id),
UNIQUE (saga_id, step_name, attempt_number)
);
-- Compensation tracking
CREATE TABLE saga_compensations (
id UUID PRIMARY KEY,
saga_id UUID NOT NULL REFERENCES saga_instances(saga_id),
step_name VARCHAR(100) NOT NULL,
status VARCHAR(50) NOT NULL, -- pending, executing, completed, failed
started_at TIMESTAMP,
completed_at TIMESTAMP,
retry_count INT DEFAULT 0,
last_error TEXT,
next_retry_at TIMESTAMP,
INDEX idx_compensation_status (status),
INDEX idx_compensation_retry (status, next_retry_at),
UNIQUE (saga_id, step_name)
);
-- Idempotency records
CREATE TABLE saga_idempotency_keys (
idempotency_key VARCHAR(255) PRIMARY KEY,
saga_id UUID NOT NULL,
step_name VARCHAR(100) NOT NULL,
response_data JSONB,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
expires_at TIMESTAMP NOT NULL,
INDEX idx_idempotency_expires (expires_at)
);
12.2 Recovery Job Implementation
# Background Job for Saga Recovery
import asyncio
from datetime import datetime, timedelta
from typing import List
class SagaRecoveryJob:
"""
Background job that recovers stuck sagas.
Handles:
- Sagas interrupted by crashes
- Stuck compensations
- Timed-out steps
"""
def __init__(self, db, orchestrator: SagaOrchestrator):
self.db = db
self.orchestrator = orchestrator
self.running = False
async def start(self):
"""Start the recovery job loop."""
self.running = True
while self.running:
try:
await self._recover_stuck_sagas()
await self._retry_failed_compensations()
await self._cleanup_expired_idempotency_keys()
except Exception as e:
logger.error(f"Recovery job error: {e}")
await asyncio.sleep(30) # Run every 30 seconds
async def stop(self):
"""Stop the recovery job."""
self.running = False
async def _recover_stuck_sagas(self):
"""Find and recover sagas stuck in executing state."""
# Find sagas stuck for more than 5 minutes
cutoff = datetime.utcnow() - timedelta(minutes=5)
stuck_sagas = await self.db.fetch(
"""
SELECT * FROM saga_instances
WHERE status IN ('started', 'executing')
AND updated_at < $1
""",
cutoff
)
for saga in stuck_sagas:
logger.info(f"Recovering stuck saga: {saga['saga_id']}")
# Reconstruct state and resume
state = await self._reconstruct_state(saga)
asyncio.create_task(self.orchestrator._execute_saga(state))
async def _retry_failed_compensations(self):
"""Retry compensations that failed but are due for retry."""
now = datetime.utcnow()
pending = await self.db.fetch(
"""
SELECT c.*, s.saga_type
FROM saga_compensations c
JOIN saga_instances s ON c.saga_id = s.saga_id
WHERE c.status = 'failed'
AND c.next_retry_at <= $1
""",
now
)
for comp in pending:
logger.info(
f"Retrying compensation: {comp['step_name']} "
f"for saga {comp['saga_id']}"
)
try:
await self._execute_compensation(comp)
await self.db.execute(
"""
UPDATE saga_compensations
SET status = 'completed', completed_at = NOW()
WHERE id = $1
""",
comp['id']
)
except Exception as e:
retry_count = comp['retry_count'] + 1
next_retry = now + timedelta(seconds=min(300, 2 ** retry_count))
await self.db.execute(
"""
UPDATE saga_compensations
SET retry_count = $2, last_error = $3, next_retry_at = $4
WHERE id = $1
""",
comp['id'], retry_count, str(e), next_retry
)
if retry_count > 10:
logger.critical(
f"Compensation stuck after {retry_count} retries: "
f"{comp['saga_id']}/{comp['step_name']}"
)
# Alert operations team
async def _cleanup_expired_idempotency_keys(self):
"""Clean up expired idempotency keys."""
await self.db.execute(
"DELETE FROM saga_idempotency_keys WHERE expires_at < NOW()"
)
async def _reconstruct_state(self, saga_row) -> SagaState:
"""Reconstruct saga state from database."""
step_results = {}
# Get completed step results
steps = await self.db.fetch(
"""
SELECT step_name, output_data
FROM saga_step_executions
WHERE saga_id = $1 AND status = 'completed'
ORDER BY step_index
""",
saga_row['saga_id']
)
for step in steps:
step_results[step['step_name']] = step['output_data']
return SagaState(
saga_id=str(saga_row['saga_id']),
saga_type=saga_row['saga_type'],
status=SagaStatus(saga_row['status']),
current_step=saga_row['current_step'],
input_data=saga_row['input_data'],
step_results=step_results,
created_at=saga_row['created_at'],
updated_at=saga_row['updated_at'],
error=saga_row['error_message']
)
async def _execute_compensation(self, comp: dict):
"""Execute a single compensation."""
saga = self.orchestrator.sagas.get(comp['saga_type'])
if not saga:
raise ValueError(f"Unknown saga type: {comp['saga_type']}")
# Find the step
step = next(
(s for s in saga.steps if s.name == comp['step_name']),
None
)
if not step or not step.compensation:
return
# Get saga state for context
saga_row = await self.db.fetch_one(
"SELECT * FROM saga_instances WHERE saga_id = $1",
comp['saga_id']
)
state = await self._reconstruct_state(saga_row)
context = {
"saga_id": state.saga_id,
"input": state.input_data,
"results": state.step_results,
"step_result": state.step_results.get(comp['step_name'])
}
await step.compensation(context)
## Chapter 13: Monitoring and Observability
### 13.1 Saga Metrics
```python
# Saga Monitoring and Metrics
from prometheus_client import Counter, Histogram, Gauge
from dataclasses import dataclass
from typing import Dict
# Prometheus metrics
saga_started = Counter(
'saga_started_total',
'Total sagas started',
['saga_type']
)
saga_completed = Counter(
'saga_completed_total',
'Total sagas completed successfully',
['saga_type']
)
saga_failed = Counter(
'saga_failed_total',
'Total sagas that failed',
['saga_type', 'failed_step']
)
saga_compensated = Counter(
'saga_compensated_total',
'Total sagas that were compensated',
['saga_type']
)
saga_duration = Histogram(
'saga_duration_seconds',
'Time to complete saga',
['saga_type', 'outcome'],
buckets=[0.1, 0.5, 1, 5, 10, 30, 60, 300, 600]
)
saga_step_duration = Histogram(
'saga_step_duration_seconds',
'Time to complete saga step',
['saga_type', 'step_name'],
buckets=[0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30]
)
saga_in_progress = Gauge(
'saga_in_progress',
'Number of sagas currently executing',
['saga_type']
)
compensation_retries = Counter(
'saga_compensation_retries_total',
'Total compensation retry attempts',
['saga_type', 'step_name']
)
class SagaMetricsCollector:
"""Collects and reports saga metrics."""
def __init__(self):
self.start_times: Dict[str, float] = {}
self.step_start_times: Dict[str, float] = {}
def on_saga_started(self, saga_id: str, saga_type: str):
saga_started.labels(saga_type=saga_type).inc()
saga_in_progress.labels(saga_type=saga_type).inc()
self.start_times[saga_id] = time.time()
def on_saga_completed(self, saga_id: str, saga_type: str):
saga_completed.labels(saga_type=saga_type).inc()
saga_in_progress.labels(saga_type=saga_type).dec()
if saga_id in self.start_times:
duration = time.time() - self.start_times[saga_id]
saga_duration.labels(
saga_type=saga_type,
outcome='completed'
).observe(duration)
del self.start_times[saga_id]
def on_saga_failed(self, saga_id: str, saga_type: str, failed_step: str):
saga_failed.labels(
saga_type=saga_type,
failed_step=failed_step
).inc()
saga_in_progress.labels(saga_type=saga_type).dec()
if saga_id in self.start_times:
duration = time.time() - self.start_times[saga_id]
saga_duration.labels(
saga_type=saga_type,
outcome='failed'
).observe(duration)
del self.start_times[saga_id]
def on_saga_compensated(self, saga_id: str, saga_type: str):
saga_compensated.labels(saga_type=saga_type).inc()
def on_step_started(self, saga_id: str, step_name: str):
key = f"{saga_id}:{step_name}"
self.step_start_times[key] = time.time()
def on_step_completed(
self,
saga_id: str,
saga_type: str,
step_name: str
):
key = f"{saga_id}:{step_name}"
if key in self.step_start_times:
duration = time.time() - self.step_start_times[key]
saga_step_duration.labels(
saga_type=saga_type,
step_name=step_name
).observe(duration)
del self.step_start_times[key]
def on_compensation_retry(self, saga_type: str, step_name: str):
compensation_retries.labels(
saga_type=saga_type,
step_name=step_name
).inc()
13.2 Saga Dashboard and Alerting
# Alerting Rules (Prometheus/Alertmanager format)
ALERTING_RULES = """
groups:
- name: saga_alerts
rules:
# High saga failure rate
- alert: SagaHighFailureRate
expr: |
rate(saga_failed_total[5m]) / rate(saga_started_total[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High saga failure rate"
description: "More than 10% of sagas are failing"
# Saga taking too long
- alert: SagaSlowExecution
expr: |
histogram_quantile(0.99, rate(saga_duration_seconds_bucket[5m])) > 60
for: 5m
labels:
severity: warning
annotations:
summary: "Sagas taking too long"
description: "p99 saga duration is above 60 seconds"
# Stuck compensations
- alert: SagaStuckCompensations
expr: |
saga_compensations_pending > 10
for: 15m
labels:
severity: critical
annotations:
summary: "Stuck saga compensations"
description: "{{ $value }} compensations have been pending for 15+ minutes"
# Compensation retry storm
- alert: SagaCompensationRetryStorm
expr: |
rate(saga_compensation_retries_total[5m]) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "High compensation retry rate"
description: "Compensations are being retried frequently"
"""
# Dashboard query examples
DASHBOARD_QUERIES = {
"saga_success_rate": """
sum(rate(saga_completed_total[5m])) /
sum(rate(saga_started_total[5m]))
""",
"saga_p99_duration": """
histogram_quantile(0.99,
sum(rate(saga_duration_seconds_bucket[5m])) by (le, saga_type)
)
""",
"saga_failure_by_step": """
sum(rate(saga_failed_total[1h])) by (saga_type, failed_step)
""",
"sagas_in_progress": """
sum(saga_in_progress) by (saga_type)
""",
"compensation_pending": """
count(saga_compensations_pending > 0) by (saga_type)
"""
}
Chapter 14: Testing Sagas
14.1 Unit Testing Saga Steps
# Testing Saga Steps
import pytest
from unittest.mock import AsyncMock, MagicMock
class TestOrderSagaSteps:
"""Unit tests for order saga steps."""
@pytest.fixture
def inventory_service(self):
service = AsyncMock()
service.reserve.return_value = "reservation-123"
service.release.return_value = True
return service
@pytest.fixture
def payment_service(self):
service = AsyncMock()
service.charge.return_value = "payment-456"
service.refund.return_value = True
return service
@pytest.fixture
def saga_steps(self, inventory_service, payment_service):
return OrderSagaSteps(
inventory_service=inventory_service,
payment_service=payment_service,
shipping_service=AsyncMock(),
notification_service=AsyncMock()
)
@pytest.mark.asyncio
async def test_reserve_inventory_success(self, saga_steps, inventory_service):
"""Test successful inventory reservation."""
context = {
'saga_id': 'saga-123',
'input': {
'items': [{'product_id': 'prod-1', 'quantity': 2}]
},
'results': {}
}
result = await saga_steps.reserve_inventory(context)
assert result == {'reservation_id': 'reservation-123'}
inventory_service.reserve.assert_called_once_with(
order_id='saga-123',
items=[{'product_id': 'prod-1', 'quantity': 2}]
)
@pytest.mark.asyncio
async def test_release_inventory_compensation(self, saga_steps, inventory_service):
"""Test inventory release compensation."""
context = {
'saga_id': 'saga-123',
'input': {},
'results': {
'reserve_inventory': {'reservation_id': 'reservation-123'}
}
}
await saga_steps.release_inventory(context)
inventory_service.release.assert_called_once_with('reservation-123')
@pytest.mark.asyncio
async def test_release_inventory_idempotent(self, saga_steps, inventory_service):
"""Test compensation is idempotent when no reservation exists."""
context = {
'saga_id': 'saga-123',
'input': {},
'results': {} # No reservation result
}
# Should not raise, should be no-op
await saga_steps.release_inventory(context)
inventory_service.release.assert_not_called()
@pytest.mark.asyncio
async def test_charge_payment_uses_idempotency_key(
self,
saga_steps,
payment_service
):
"""Test payment uses idempotency key."""
context = {
'saga_id': 'saga-123',
'input': {
'customer_id': 'cust-1',
'total': 99.99
},
'results': {}
}
await saga_steps.charge_payment(context)
payment_service.charge.assert_called_once()
call_kwargs = payment_service.charge.call_args.kwargs
assert call_kwargs['idempotency_key'] == 'saga-saga-123'
### 14.2 Integration Testing Sagas
```python
# Integration Testing for Sagas
import pytest
from testcontainers.postgres import PostgresContainer
class TestOrderSagaIntegration:
"""Integration tests for complete order saga."""
@pytest.fixture(scope="class")
def postgres(self):
with PostgresContainer("postgres:14") as pg:
yield pg
@pytest.fixture
async def db(self, postgres):
conn = await asyncpg.connect(postgres.get_connection_url())
await self._setup_schema(conn)
yield conn
await conn.close()
@pytest.fixture
def orchestrator(self, db):
steps = OrderSagaSteps(
inventory_service=RealInventoryService(db),
payment_service=MockPaymentService(),
shipping_service=MockShippingService(),
notification_service=MockNotificationService()
)
saga = create_order_saga(steps)
return SagaOrchestrator(db, {"OrderSaga": saga})
@pytest.mark.asyncio
async def test_successful_order_saga(self, orchestrator, db):
"""Test complete successful saga execution."""
saga_id = await orchestrator.start_saga(
"OrderSaga",
{
"customer_id": "cust-123",
"items": [{"product_id": "prod-1", "quantity": 1}],
"total": 99.99,
"shipping_address": "123 Main St"
}
)
# Wait for saga to complete
await asyncio.sleep(2)
# Check saga completed
state = await db.fetch_one(
"SELECT status FROM saga_instances WHERE saga_id = $1",
saga_id
)
assert state['status'] == 'completed'
# Check all steps completed
steps = await db.fetch(
"""
SELECT step_name, status FROM saga_step_executions
WHERE saga_id = $1 ORDER BY step_index
""",
saga_id
)
assert all(s['status'] == 'completed' for s in steps)
@pytest.mark.asyncio
async def test_saga_compensation_on_payment_failure(
self,
orchestrator,
db
):
"""Test saga compensates when payment fails."""
# Configure payment to fail
orchestrator.sagas["OrderSaga"].steps[1].action = AsyncMock(
side_effect=PaymentError("Card declined")
)
saga_id = await orchestrator.start_saga(
"OrderSaga",
{
"customer_id": "cust-123",
"items": [{"product_id": "prod-1", "quantity": 1}],
"total": 99.99
}
)
await asyncio.sleep(2)
# Check saga was compensated
state = await db.fetch_one(
"SELECT status FROM saga_instances WHERE saga_id = $1",
saga_id
)
assert state['status'] == 'compensated'
# Check inventory was released
compensation = await db.fetch_one(
"""
SELECT status FROM saga_compensations
WHERE saga_id = $1 AND step_name = 'reserve_inventory'
""",
saga_id
)
assert compensation['status'] == 'completed'
@pytest.mark.asyncio
async def test_saga_recovery_after_crash(self, orchestrator, db):
"""Test saga recovers after simulated crash."""
saga_id = await orchestrator.start_saga(
"OrderSaga",
{"customer_id": "cust-123", "items": [], "total": 50.00}
)
# Simulate crash by stopping orchestrator mid-execution
await asyncio.sleep(0.1)
# Mark saga as stuck
await db.execute(
"""
UPDATE saga_instances
SET updated_at = NOW() - INTERVAL '10 minutes'
WHERE saga_id = $1
""",
saga_id
)
# Run recovery
recovery_job = SagaRecoveryJob(db, orchestrator)
await recovery_job._recover_stuck_sagas()
await asyncio.sleep(2)
# Verify saga eventually completes
state = await db.fetch_one(
"SELECT status FROM saga_instances WHERE saga_id = $1",
saga_id
)
assert state['status'] in ('completed', 'compensated')
📚 Further Reading
Papers
- "Sagas" by Hector Garcia-Molina and Kenneth Salem (1987) — Original paper
- "Life Beyond Distributed Transactions" by Pat Helland
Books
- "Microservices Patterns" by Chris Richardson — Chapter 4 covers sagas
- "Designing Data-Intensive Applications" by Martin Kleppmann — Distributed transactions
Engineering Blogs
- Uber Cadence: https://cadenceworkflow.io/
- Netflix Conductor: https://netflix.github.io/conductor/
- Temporal: https://temporal.io/
End of Day 2: Distributed Transactions — The Saga Pattern
Tomorrow: Day 3 — Saga Orchestration Deep Dive. We'll explore workflow engines like Temporal, durable execution, and how to build production-grade saga orchestrators.