The Great Travel Booking Disaster: How 2PC Nearly Killed Our Business and Saga Pattern Saved It
8/4/2025

Understanding the Problem: When Dreams Meet Distributed Reality
Meet Alex Chen, CTO of WonderTravel, a rapidly growing online travel platform that promises "seamless one-click vacation bookings" to millions of customers worldwide. The company's ambitious vision: book flights, hotels, and car rentals in a single atomic transaction, ensuring customers never face the nightmare of partial bookings—like having a hotel reservation in Tokyo but no flight to get there.
It's Black Friday 2024, and WonderTravel expects their biggest sales day ever. At 12:01 AM EST, disaster strikes. Their distributed transaction system, built on Two-Phase Commit (2PC), begins failing catastrophically under load. Customers see flights booked but hotels failing, or worse—credit cards charged with no reservations at all.
The Crisis Numbers:
- 50,000 simultaneous booking attempts during peak hour
- 3 independent microservices across different continents
- Flight Service: US-East (New York) - 200ms average response
- Hotel Service: Europe (Amsterdam) - 180ms average response
- Car Service: Asia-Pacific (Singapore) - 300ms average response
- Network failures: 0.1% packet loss between regions
- Service crashes: 2-3 instances per hour during peak load
Our Mission: Design a bulletproof distributed transaction system that maintains atomicity across microservices while surviving network partitions, service failures, and the chaos of global scale.
The Distributed Transaction Challenge: The ACID Test
WonderTravel's Booking Architecture:
Customer Journey:
1. Select flight: JFK → NRT (Japan Airlines, $1,200)
2. Choose hotel: Tokyo Grand Hotel (3 nights, $450/night)
3. Pick rental: Toyota Camry (3 days, $80/day)
4. Click "Book Everything" → Must be atomic!
System Architecture:
[Flight Service] (US-East) ---> PostgreSQL (Seats DB)
[Hotel Service] (Europe) ---> MongoDB (Rooms DB)
[Car Service] (Asia-Pacific) ---> MySQL (Cars DB)
The Atomic Requirement:
- Success Case: All three services must successfully reserve resources
- Failure Case: If ANY service fails, ALL reservations must be rolled back
- Consistency: No partial bookings should ever exist
- Isolation: Concurrent bookings shouldn't interfere with each other
- Durability: Confirmed bookings must survive system failures
Attempt #1: Two-Phase Commit (2PC) - The "Obvious" Choice
How 2PC Works: The Coordinator's Dance
Two-Phase Commit implements distributed atomicity through a coordinator that orchestrates a two-stage process across all participants.
sequenceDiagram
participant C as Coordinator
participant F as Flight Service
participant H as Hotel Service
participant R as Car Service
Note over C: Phase 1: PREPARE
C->>F: PREPARE: Reserve JFK→NRT flight
C->>H: PREPARE: Reserve Tokyo hotel room
C->>R: PREPARE: Reserve Toyota Camry
F->>C: PREPARED (flight locked)
H->>C: PREPARED (room locked)
R->>C: PREPARED (car locked)
Note over C: All services ready!
Note over C: Phase 2: COMMIT
C->>F: COMMIT: Finalize flight booking
C->>H: COMMIT: Finalize hotel reservation
C->>R: COMMIT: Finalize car rental
F->>C: COMMITTED
H->>C: COMMITTED
R->>C: COMMITTED
Note over C: Success! All bookings confirmed
WonderTravel's 2PC Implementation
import asyncio
import logging
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional
import uuid
class TransactionState(Enum):
INITIATED = "initiated"
PREPARED = "prepared"
COMMITTED = "committed"
ABORTED = "aborted"
@dataclass
class BookingRequest:
flight_details: dict
hotel_details: dict
car_details: dict
customer_id: str
total_amount: float
@dataclass
class ServiceResponse:
service_name: str
transaction_id: str
success: bool
resource_id: Optional[str] = None
error_message: Optional[str] = None
class TwoPCCoordinator:
"""Two-Phase Commit Coordinator for travel bookings"""
def __init__(self):
self.active_transactions: Dict[str, TransactionState] = {}
self.service_clients = {
'flight': FlightServiceClient(),
'hotel': HotelServiceClient(),
'car': CarServiceClient()
}
self.transaction_log = []
async def book_travel_package(self, booking: BookingRequest) -> dict:
"""Execute distributed travel booking transaction"""
transaction_id = str(uuid.uuid4())
self.active_transactions[transaction_id] = TransactionState.INITIATED
try:
# Phase 1: PREPARE - Lock all resources
prepare_results = await self._phase1_prepare(transaction_id, booking)
if self._all_services_prepared(prepare_results):
# Phase 2: COMMIT - Finalize all bookings
commit_results = await self._phase2_commit(transaction_id)
self.active_transactions[transaction_id] = TransactionState.COMMITTED
return {
'success': True,
'transaction_id': transaction_id,
'bookings': commit_results
}
else:
# Phase 2: ABORT - Release all locks
abort_results = await self._phase2_abort(transaction_id)
self.active_transactions[transaction_id] = TransactionState.ABORTED
return {
'success': False,
'transaction_id': transaction_id,
'error': 'One or more services failed to prepare',
'details': prepare_results
}
except Exception as e:
# Emergency abort on coordinator failure
await self._emergency_abort(transaction_id)
raise e
async def _phase1_prepare(self, transaction_id: str, booking: BookingRequest) -> List[ServiceResponse]:
"""Phase 1: Request all services to prepare (lock resources)"""
logging.info(f"Starting Phase 1 PREPARE for transaction {transaction_id}")
# Prepare requests for all services
prepare_tasks = [
self.service_clients['flight'].prepare(transaction_id, booking.flight_details),
self.service_clients['hotel'].prepare(transaction_id, booking.hotel_details),
self.service_clients['car'].prepare(transaction_id, booking.car_details)
]
# Execute all prepare operations in parallel with timeout
try:
results = await asyncio.wait_for(
asyncio.gather(*prepare_tasks, return_exceptions=True),
timeout=30.0 # 30 second timeout for prepare phase
)
# Process results and handle exceptions
responses = []
for i, result in enumerate(results):
service_name = ['flight', 'hotel', 'car'][i]
if isinstance(result, Exception):
responses.append(ServiceResponse(
service_name=service_name,
transaction_id=transaction_id,
success=False,
error_message=str(result)
))
else:
responses.append(result)
# Log prepare phase results
self._log_transaction_phase(transaction_id, "PREPARE", responses)
return responses
except asyncio.TimeoutError:
logging.error(f"Phase 1 PREPARE timeout for transaction {transaction_id}")
# Timeout handling - abort transaction
await self._emergency_abort(transaction_id)
raise Exception("Prepare phase timeout - transaction aborted")
async def _phase2_commit(self, transaction_id: str) -> List[ServiceResponse]:
"""Phase 2: Request all services to commit (finalize bookings)"""
logging.info(f"Starting Phase 2 COMMIT for transaction {transaction_id}")
commit_tasks = [
self.service_clients['flight'].commit(transaction_id),
self.service_clients['hotel'].commit(transaction_id),
self.service_clients['car'].commit(transaction_id)
]
try:
results = await asyncio.wait_for(
asyncio.gather(*commit_tasks, return_exceptions=True),
timeout=60.0 # Longer timeout for commit phase
)
# Process commit results
responses = []
for i, result in enumerate(results):
service_name = ['flight', 'hotel', 'car'][i]
if isinstance(result, Exception):
# CRITICAL: Commit phase failure is catastrophic!
logging.critical(f"COMMIT FAILED for {service_name} in transaction {transaction_id}: {result}")
responses.append(ServiceResponse(
service_name=service_name,
transaction_id=transaction_id,
success=False,
error_message=f"COMMIT_FAILURE: {result}"
))
else:
responses.append(result)
self._log_transaction_phase(transaction_id, "COMMIT", responses)
return responses
except asyncio.TimeoutError:
# Commit timeout is DISASTER - some services may have committed!
logging.critical(f"Phase 2 COMMIT timeout for transaction {transaction_id} - INCONSISTENT STATE!")
raise Exception("Commit phase timeout - system may be in inconsistent state")
async def _phase2_abort(self, transaction_id: str) -> List[ServiceResponse]:
"""Phase 2: Request all services to abort (release locks)"""
logging.info(f"Starting Phase 2 ABORT for transaction {transaction_id}")
abort_tasks = [
self.service_clients['flight'].abort(transaction_id),
self.service_clients['hotel'].abort(transaction_id),
self.service_clients['car'].abort(transaction_id)
]
# Abort operations should be best-effort
results = await asyncio.gather(*abort_tasks, return_exceptions=True)
responses = []
for i, result in enumerate(results):
service_name = ['flight', 'hotel', 'car'][i]
if isinstance(result, Exception):
logging.warning(f"Abort failed for {service_name}: {result}")
responses.append(ServiceResponse(
service_name=service_name,
transaction_id=transaction_id,
success=False,
error_message=f"ABORT_FAILED: {result}"
))
else:
responses.append(result)
self._log_transaction_phase(transaction_id, "ABORT", responses)
return responses
def _all_services_prepared(self, responses: List[ServiceResponse]) -> bool:
"""Check if all services successfully prepared"""
return all(response.success for response in responses)
def _log_transaction_phase(self, transaction_id: str, phase: str, responses: List[ServiceResponse]):
"""Log transaction phase for recovery purposes"""
log_entry = {
'transaction_id': transaction_id,
'phase': phase,
'timestamp': asyncio.get_event_loop().time(),
'responses': [
{
'service': r.service_name,
'success': r.success,
'error': r.error_message
} for r in responses
]
}
self.transaction_log.append(log_entry)
# Persistent logging for recovery
logging.info(f"Transaction {transaction_id} {phase}: {log_entry}")
# Flight Service Implementation
class FlightServiceClient:
def __init__(self):
self.prepared_reservations: Dict[str, dict] = {}
self.committed_bookings: Dict[str, dict] = {}
async def prepare(self, transaction_id: str, flight_details: dict) -> ServiceResponse:
"""Prepare phase: Lock flight seat without finalizing booking"""
try:
# Simulate network latency and occasional failures
await asyncio.sleep(0.2) # 200ms base latency
# Simulate 5% failure rate during peak load
import random
if random.random() < 0.05:
raise Exception("Flight service temporarily unavailable")
# Check seat availability
if not await self._check_seat_availability(flight_details['flight_id'], flight_details['seat_class']):
return ServiceResponse(
service_name='flight',
transaction_id=transaction_id,
success=False,
error_message="Requested seat not available"
)
# Reserve seat temporarily (with timeout lock)
reservation = {
'flight_id': flight_details['flight_id'],
'seat_class': flight_details['seat_class'],
'passenger_count': flight_details['passenger_count'],
'price': flight_details['price'],
'reserved_at': asyncio.get_event_loop().time()
}
self.prepared_reservations[transaction_id] = reservation
# Set timeout for reservation (auto-release after 5 minutes)
asyncio.create_task(self._auto_release_reservation(transaction_id, 300))
return ServiceResponse(
service_name='flight',
transaction_id=transaction_id,
success=True,
resource_id=f"flight_{flight_details['flight_id']}"
)
except Exception as e:
return ServiceResponse(
service_name='flight',
transaction_id=transaction_id,
success=False,
error_message=str(e)
)
async def commit(self, transaction_id: str) -> ServiceResponse:
"""Commit phase: Finalize flight booking"""
if transaction_id not in self.prepared_reservations:
return ServiceResponse(
service_name='flight',
transaction_id=transaction_id,
success=False,
error_message="No prepared reservation found"
)
try:
# Move from prepared to committed
reservation = self.prepared_reservations.pop(transaction_id)
self.committed_bookings[transaction_id] = reservation
# Generate booking confirmation
booking_reference = f"FL{transaction_id[:8].upper()}"
return ServiceResponse(
service_name='flight',
transaction_id=transaction_id,
success=True,
resource_id=booking_reference
)
except Exception as e:
return ServiceResponse(
service_name='flight',
transaction_id=transaction_id,
success=False,
error_message=f"Commit failed: {e}"
)
async def abort(self, transaction_id: str) -> ServiceResponse:
"""Abort phase: Release reserved seat"""
if transaction_id in self.prepared_reservations:
del self.prepared_reservations[transaction_id]
return ServiceResponse(
service_name='flight',
transaction_id=transaction_id,
success=True
)
async def _check_seat_availability(self, flight_id: str, seat_class: str) -> bool:
"""Check if seats are available for booking"""
# Simulate database query
await asyncio.sleep(0.05)
return True # Simplified - assume seats available
async def _auto_release_reservation(self, transaction_id: str, timeout_seconds: int):
"""Auto-release reservation after timeout"""
await asyncio.sleep(timeout_seconds)
if transaction_id in self.prepared_reservations:
logging.warning(f"Auto-releasing expired reservation {transaction_id}")
del self.prepared_reservations[transaction_id]
The Black Friday Disaster: When 2PC Goes Wrong
Scenario 1: The Coordinator Crashes
The Incident: At 12:15 AM, the 2PC coordinator service crashes due to memory exhaustion during peak load. 2,000 transactions are left in prepared state.
Current State at Crash:
[Flight Service] ---> 2,000 seats (LOCKED, awaiting coordinator)
[Hotel Service] ---> 2,000 rooms (LOCKED, awaiting coordinator)
[Car Service] ---> 2,000 cars (LOCKED, awaiting coordinator)
[Coordinator] ☠️ CRASHED ☠️
The Problem: All services are blocked, waiting for commit/abort decisions that will never come.
Customer Impact:
- 2,000 customers can't book travel packages (resources locked but not confirmed)
- 6,000 inventory items unusable until manual intervention
- Revenue loss: $2.4M in blocked bookings
- Recovery time: 4 hours to manually identify and unlock resources
Scenario 2: Network Partitions During Commit
The Incident: A network partition splits the coordinator from the Hotel Service during the commit phase.
sequenceDiagram
participant C as Coordinator
participant F as Flight Service
participant H as Hotel Service
participant R as Car Service
C->>F: COMMIT
C->>H: COMMIT (NETWORK FAILURE!)
C->>R: COMMIT
F->>C: COMMITTED ✅
Note over H: Network partition!<br/>Never receives COMMIT
R->>C: COMMITTED ✅
Note over C: 2/3 services committed<br/>What about Hotel?
Note over H: Still waiting for decision<br/>Resources locked indefinitely
The Disaster:
- Flight: ✅ Booked and charged customer
- Hotel: ❓ Locked but not confirmed
- Car: ✅ Booked and charged customer
- Customer: Has flight and car, but no hotel!
- System State: Inconsistent and corrupted
Why 2PC Fails in Microservices: The Fundamental Problems
Problem 1: Blocking and Availability
class ProblematicBlockingScenario:
"""Demonstration of 2PC blocking issues"""
def __init__(self):
self.blocked_transactions = {}
self.resource_locks = {}
def simulate_coordinator_failure(self, active_transactions: List[str]):
"""What happens when coordinator dies"""
for transaction_id in active_transactions:
# All resources remain locked indefinitely
self.resource_locks[transaction_id] = {
'flight_seats': 'LOCKED',
'hotel_rooms': 'LOCKED',
'rental_cars': 'LOCKED',
'locked_since': time.time(),
'can_proceed': False, # ❌ BLOCKED!
'requires_manual_intervention': True
}
# System statistics during blocking
total_locked_resources = sum(
len(locks) for locks in self.resource_locks.values()
)
return {
'blocked_transactions': len(active_transactions),
'locked_resources': total_locked_resources,
'system_availability': 0, # ❌ System effectively down
'customer_impact': 'SEVERE',
'recovery_method': 'MANUAL_INTERVENTION_REQUIRED'
}
# Real blocking statistics from WonderTravel
blocking_stats = {
'average_transaction_duration': 1200, # 20 minutes (prepare + commit)
'resources_locked_per_transaction': 3,
'concurrent_transactions_peak': 50000,
'total_resources_at_risk': 150000, # 50k × 3 resources
'coordinator_uptime': 99.5, # 99.5% uptime = 4.38 hours downtime/month
'blocked_customers_per_outage': 25000, # Customers affected during coordinator crash
}
Problem 2: CAP Theorem Violations
WonderTravel's CAP Analysis:
CAP Theorem Trade-offs in 2PC:
Consistency (C): ✅ STRONG
- All services see the same data
- No partial bookings exist
- ACID guarantees maintained
Availability (A): ❌ POOR
- System blocks during network partitions
- Coordinator failures cause total outages
- Services can't make progress independently
Partition Tolerance (P): ❌ POOR
- Network splits cause indefinite blocking
- Cannot distinguish between slow network and crashed node
- Manual intervention required for recovery
Problem 3: Performance Degradation
class PerformanceAnalysis:
"""2PC Performance characteristics under load"""
def __init__(self):
self.latency_measurements = []
self.throughput_measurements = []
def measure_2pc_performance(self, concurrent_transactions: int) -> dict:
"""Measure 2PC performance under various loads"""
# Base latencies for each service
base_latencies = {
'flight_service': 200, # ms
'hotel_service': 180, # ms
'car_service': 300 # ms
}
# 2PC adds coordination overhead
coordination_overhead = 150 # ms per phase
network_roundtrips = 2 # prepare + commit phases
# Calculate total transaction time
max_service_latency = max(base_latencies.values())
total_transaction_time = (
(max_service_latency + coordination_overhead) * network_roundtrips
)
# Throughput degradation with locks
lock_contention_factor = 1 + (concurrent_transactions / 1000)
effective_throughput = 1000 / lock_contention_factor # TPS
# Failure amplification
individual_failure_rate = 0.01 # 1% per service
combined_failure_rate = 1 - ((1 - individual_failure_rate) ** 3) # ~3%
return {
'transaction_latency_ms': total_transaction_time * lock_contention_factor,
'throughput_tps': effective_throughput,
'failure_rate': combined_failure_rate,
'availability': 1 - combined_failure_rate,
'resource_utilization': concurrent_transactions / 10000 * 100,
'coordinator_bottleneck': True
}
# Performance comparison
low_load = PerformanceAnalysis().measure_2pc_performance(1000)
high_load = PerformanceAnalysis().measure_2pc_performance(50000)
print("2PC Performance Analysis:")
print(f"Low Load (1K concurrent): {low_load['transaction_latency_ms']:.0f}ms, {low_load['throughput_tps']:.0f} TPS")
print(f"High Load (50K concurrent): {high_load['transaction_latency_ms']:.0f}ms, {high_load['throughput_tps']:.0f} TPS")
print(f"Performance degradation: {high_load['transaction_latency_ms'] / low_load['transaction_latency_ms']:.1f}x slower")
The Saga Pattern: Choreographed Resilience
Understanding Saga: The Netflix Model
After the 2PC disaster, Alex's team discovered the Saga Pattern—a way to manage distributed transactions without blocking or coordinators. Instead of locking resources and coordinating commits, Saga breaks the transaction into a sequence of local transactions, each with a corresponding compensating action.
Core Saga Principles:
- No locks: Each service commits immediately to its local database
- Compensating actions: For every action, define how to undo it
- Event-driven: Services communicate through events, not direct calls
- Eventually consistent: System reaches consistency over time, not immediately
Traditional 2PC Flow:
1. Lock flight → Lock hotel → Lock car
2. Commit flight → Commit hotel → Commit car
❌ Problem: If ANY step fails, ALL are blocked
Saga Pattern Flow:
1. Book flight ✅ → Publish "FlightBooked" event
2. Book hotel ✅ → Publish "HotelBooked" event
3. Book car ✅ → Publish "CarBooked" event
✅ Success: All booked independently
Failure Scenario:
1. Book flight ✅ → Publish "FlightBooked" event
2. Book hotel ✅ → Publish "HotelBooked" event
3. Book car ❌ → Publish "CarBookingFailed" event
4. Cancel hotel ✅ → Publish "HotelCancelled" event
5. Cancel flight ✅ → Publish "FlightCancelled" event
✅ Recovery: System self-heals through compensation
Orchestration vs Choreography: Two Saga Styles
Option 1: Orchestrated Saga (Centralized)
class TravelBookingSagaOrchestrator:
"""Centralized orchestrator managing booking workflow"""
def __init__(self):
self.event_store = EventStore()
self.service_clients = {
'flight': FlightServiceClient(),
'hotel': HotelServiceClient(),
'car': CarServiceClient()
}
self.compensation_handlers = {
'flight': self._compensate_flight_booking,
'hotel': self._compensate_hotel_booking,
'car': self._compensate_car_booking
}
async def execute_travel_booking_saga(self, booking_request: BookingRequest) -> dict:
"""Execute travel booking saga with orchestration"""
saga_id = str(uuid.uuid4())
saga_state = SagaState(saga_id, booking_request)
try:
# Step 1: Book Flight
saga_state.current_step = "booking_flight"
flight_result = await self._execute_step_with_retry(
self.service_clients['flight'].book_flight,
booking_request.flight_details,
saga_state
)
if not flight_result.success:
return await self._handle_saga_failure(saga_state, "flight_booking_failed")
saga_state.completed_steps.append({
'step': 'flight_booking',
'result': flight_result,
'compensation_required': True
})
# Step 2: Book Hotel
saga_state.current_step = "booking_hotel"
hotel_result = await self._execute_step_with_retry(
self.service_clients['hotel'].book_hotel,
booking_request.hotel_details,
saga_state
)
if not hotel_result.success:
return await self._handle_saga_failure(saga_state, "hotel_booking_failed")
saga_state.completed_steps.append({
'step': 'hotel_booking',
'result': hotel_result,
'compensation_required': True
})
# Step 3: Book Car
saga_state.current_step = "booking_car"
car_result = await self._execute_step_with_retry(
self.service_clients['car'].book_car,
booking_request.car_details,
saga_state
)
if not car_result.success:
return await self._handle_saga_failure(saga_state, "car_booking_failed")
saga_state.completed_steps.append({
'step': 'car_booking',
'result': car_result,
'compensation_required': True
})
# Success: All bookings completed
saga_state.status = SagaStatus.COMPLETED
await self._persist_saga_state(saga_state)
return {
'success': True,
'saga_id': saga_id,
'bookings': {
'flight': flight_result.booking_id,
'hotel': hotel_result.booking_id,
'car': car_result.booking_id
},
'total_amount': booking_request.total_amount
}
except Exception as e:
logging.error(f"Saga {saga_id} failed with exception: {e}")
return await self._handle_saga_failure(saga_state, f"unexpected_error: {e}")
async def _handle_saga_failure(self, saga_state: SagaState, reason: str) -> dict:
"""Handle saga failure by executing compensating actions"""
logging.warning(f"Saga {saga_state.saga_id} failed: {reason}")
saga_state.status = SagaStatus.COMPENSATING
saga_state.failure_reason = reason
# Execute compensating actions in reverse order
compensation_results = []
for step in reversed(saga_state.completed_steps):
if step['compensation_required']:
try:
compensation_result = await self._execute_compensation(
step['step'],
step['result'],
saga_state
)
compensation_results.append(compensation_result)
except Exception as e:
logging.error(f"Compensation failed for {step['step']}: {e}")
# Continue with other compensations even if one fails
compensation_results.append({
'step': step['step'],
'success': False,
'error': str(e)
})
saga_state.status = SagaStatus.COMPENSATED
await self._persist_saga_state(saga_state)
return {
'success': False,
'saga_id': saga_state.saga_id,
'reason': reason,
'compensations': compensation_results
}
async def _execute_step_with_retry(self, step_function, params, saga_state, max_retries=3):
"""Execute saga step with exponential backoff retry"""
for attempt in range(max_retries):
try:
result = await step_function(params)
# Log successful step
await self._log_saga_event(
saga_state.saga_id,
f"{saga_state.current_step}_success",
{'attempt': attempt + 1, 'result': result.__dict__}
)
return result
except Exception as e:
wait_time = (2 ** attempt) * 1.0 # Exponential backoff
if attempt < max_retries - 1:
logging.warning(f"Step {saga_state.current_step} failed (attempt {attempt + 1}): {e}. Retrying in {wait_time}s")
await asyncio.sleep(wait_time)
else:
logging.error(f"Step {saga_state.current_step} failed after {max_retries} attempts: {e}")
raise e
async def _execute_compensation(self, step_name: str, step_result, saga_state):
"""Execute compensating action for a completed step"""
compensation_function = self.compensation_handlers.get(step_name.split('_'))
if compensation_function:
return await compensation_function(step_result, saga_state)
else:
logging.warning(f"No compensation handler found for step: {step_name}")
return {'success': False, 'error': 'No compensation handler'}
async def _compensate_flight_booking(self, flight_result, saga_state):
"""Compensate flight booking by cancelling reservation"""
try:
cancellation_result = await self.service_clients['flight'].cancel_booking(
flight_result.booking_id
)
await self._log_saga_event(
saga_state.saga_id,
"flight_booking_compensated",
{'booking_id': flight_result.booking_id, 'result': cancellation_result.__dict__}
)
return {
'step': 'flight_compensation',
'success': cancellation_result.success,
'booking_id': flight_result.booking_id
}
except Exception as e:
logging.error(f"Flight compensation failed: {e}")
return {
'step': 'flight_compensation',
'success': False,
'error': str(e)
}
async def _compensate_hotel_booking(self, hotel_result, saga_state):
"""Compensate hotel booking by cancelling reservation"""
# Similar implementation to flight compensation
pass
async def _compensate_car_booking(self, car_result, saga_state):
"""Compensate car booking by cancelling reservation"""
# Similar implementation to flight compensation
pass
@dataclass
class SagaState:
saga_id: str
booking_request: BookingRequest
status: SagaStatus = SagaStatus.STARTED
current_step: str = ""
completed_steps: List[dict] = None
failure_reason: str = ""
def __post_init__(self):
if self.completed_steps is None:
self.completed_steps = []
class SagaStatus(Enum):
STARTED = "started"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
COMPENSATING = "compensating"
COMPENSATED = "compensated"
FAILED = "failed"
Option 2: Choreographed Saga (Decentralized)
class ChoreographedTravelBookingSaga:
"""Event-driven choreographed saga without central orchestrator"""
def __init__(self):
self.event_bus = EventBus()
self.saga_repository = SagaRepository()
self.event_handlers = {
'TravelBookingRequested': self._handle_travel_booking_requested,
'FlightBooked': self._handle_flight_booked,
'FlightBookingFailed': self._handle_flight_booking_failed,
'HotelBooked': self._handle_hotel_booked,
'HotelBookingFailed': self._handle_hotel_booking_failed,
'CarBooked': self._handle_car_booked,
'CarBookingFailed': self._handle_car_booking_failed,
}
# Subscribe to all relevant events
for event_type, handler in self.event_handlers.items():
self.event_bus.subscribe(event_type, handler)
async def start_travel_booking_saga(self, booking_request: BookingRequest) -> str:
"""Start choreographed saga by publishing initial event"""
saga_id = str(uuid.uuid4())
# Create saga state
saga_state = ChoreographedSagaState(
saga_id=saga_id,
booking_request=booking_request,
status='STARTED'
)
await self.saga_repository.save_saga_state(saga_state)
# Publish initial event to start the saga
await self.event_bus.publish(Event(
type='TravelBookingRequested',
saga_id=saga_id,
data={
'booking_request': booking_request.__dict__,
'timestamp': time.time()
}
))
return saga_id
async def _handle_travel_booking_requested(self, event: Event):
"""Handle initial travel booking request"""
saga_state = await self.saga_repository.get_saga_state(event.saga_id)
# Start with flight booking
await self.event_bus.publish(Event(
type='BookFlightRequested',
saga_id=event.saga_id,
data={
'flight_details': saga_state.booking_request.flight_details,
'saga_context': {
'next_steps': ['book_hotel', 'book_car'],
'compensation_stack': []
}
}
))
async def _handle_flight_booked(self, event: Event):
"""Handle successful flight booking"""
saga_state = await self.saga_repository.get_saga_state(event.saga_id)
# Update saga state
saga_state.flight_booking = event.data['booking_details']
saga_state.compensation_stack.append('cancel_flight')
await self.saga_repository.save_saga_state(saga_state)
# Trigger next step: hotel booking
await self.event_bus.publish(Event(
type='BookHotelRequested',
saga_id=event.saga_id,
data={
'hotel_details': saga_state.booking_request.hotel_details,
'saga_context': {
'next_steps': ['book_car'],
'compensation_stack': saga_state.compensation_stack
}
}
))
async def _handle_flight_booking_failed(self, event: Event):
"""Handle flight booking failure"""
saga_state = await self.saga_repository.get_saga_state(event.saga_id)
saga_state.status = 'FAILED'
saga_state.failure_reason = 'Flight booking failed'
await self.saga_repository.save_saga_state(saga_state)
# No compensations needed since flight was the first step
await self.event_bus.publish(Event(
type='TravelBookingSagaFailed',
saga_id=event.saga_id,
data={
'reason': 'Flight booking failed',
'compensations_completed': []
}
))
async def _handle_hotel_booked(self, event: Event):
"""Handle successful hotel booking"""
saga_state = await self.saga_repository.get_saga_state(event.saga_id)
# Update saga state
saga_state.hotel_booking = event.data['booking_details']
saga_state.compensation_stack.append('cancel_hotel')
await self.saga_repository.save_saga_state(saga_state)
# Trigger final step: car booking
await self.event_bus.publish(Event(
type='BookCarRequested',
saga_id=event.saga_id,
data={
'car_details': saga_state.booking_request.car_details,
'saga_context': {
'next_steps': [], # Final step
'compensation_stack': saga_state.compensation_stack
}
}
))
async def _handle_hotel_booking_failed(self, event: Event):
"""Handle hotel booking failure - trigger compensation"""
saga_state = await self.saga_repository.get_saga_state(event.saga_id)
saga_state.status = 'COMPENSATING'
await self.saga_repository.save_saga_state(saga_state)
# Start compensation process
await self._execute_compensation_chain(event.saga_id, saga_state.compensation_stack)
async def _handle_car_booked(self, event: Event):
"""Handle successful car booking - saga completion"""
saga_state = await self.saga_repository.get_saga_state(event.saga_id)
# Update saga state
saga_state.car_booking = event.data['booking_details']
saga_state.status = 'COMPLETED'
await self.saga_repository.save_saga_state(saga_state)
# Publish saga completion event
await self.event_bus.publish(Event(
type='TravelBookingSagaCompleted',
saga_id=event.saga_id,
data={
'bookings': {
'flight': saga_state.flight_booking,
'hotel': saga_state.hotel_booking,
'car': saga_state.car_booking
},
'completion_time': time.time()
}
))
async def _handle_car_booking_failed(self, event: Event):
"""Handle car booking failure - trigger compensation"""
saga_state = await self.saga_repository.get_saga_state(event.saga_id)
saga_state.status = 'COMPENSATING'
await self.saga_repository.save_saga_state(saga_state)
# Start compensation for flight and hotel
await self._execute_compensation_chain(event.saga_id, saga_state.compensation_stack)
async def _execute_compensation_chain(self, saga_id: str, compensation_stack: List[str]):
"""Execute compensation actions in reverse order"""
for compensation_action in reversed(compensation_stack):
await self.event_bus.publish(Event(
type=f'{compensation_action.title()}Requested',
saga_id=saga_id,
data={
'reason': 'saga_compensation',
'timestamp': time.time()
}
))
# Add delay between compensations to avoid overwhelming services
await asyncio.sleep(0.1)
@dataclass
class ChoreographedSagaState:
saga_id: str
booking_request: BookingRequest
status: str
flight_booking: Optional[dict] = None
hotel_booking: Optional[dict] = None
car_booking: Optional[dict] = None
compensation_stack: List[str] = None
failure_reason: str = ""
def __post_init__(self):
if self.compensation_stack is None:
self.compensation_stack = []
Handling Partial Failures and Idempotency
The Idempotency Challenge
class IdempotentServiceOperation:
"""Ensure operations can be safely retried without side effects"""
def __init__(self):
self.operation_log = {} # In production: use Redis or database
self.booking_repository = BookingRepository()
async def idempotent_flight_booking(self, booking_request: dict, idempotency_key: str) -> dict:
"""Book flight with idempotency guarantees"""
# Check if operation already completed
if idempotency_key in self.operation_log:
cached_result = self.operation_log[idempotency_key]
if cached_result['status'] == 'COMPLETED':
logging.info(f"Returning cached result for idempotency key {idempotency_key}")
return cached_result['result']
elif cached_result['status'] == 'IN_PROGRESS':
# Operation in progress - return appropriate response
return {
'success': False,
'error': 'Operation in progress',
'retry_after': 5
}
# Mark operation as in progress
self.operation_log[idempotency_key] = {
'status': 'IN_PROGRESS',
'started_at': time.time(),
'request': booking_request
}
try:
# Check if booking already exists (database-level idempotency)
existing_booking = await self.booking_repository.find_by_request_hash(
self._hash_booking_request(booking_request)
)
if existing_booking:
result = {
'success': True,
'booking_id': existing_booking.booking_id,
'amount': existing_booking.amount,
'status': 'already_exists'
}
else:
# Perform actual booking
result = await self._perform_flight_booking(booking_request)
# Cache successful result
self.operation_log[idempotency_key] = {
'status': 'COMPLETED',
'result': result,
'completed_at': time.time()
}
return result
except Exception as e:
# Remove from operation log on failure (allows retry)
if idempotency_key in self.operation_log:
del self.operation_log[idempotency_key]
logging.error(f"Flight booking failed for idempotency key {idempotency_key}: {e}")
raise e
async def idempotent_flight_cancellation(self, booking_id: str, idempotency_key: str) -> dict:
"""Cancel flight booking idempotently"""
# Check cache
if idempotency_key in self.operation_log:
cached_result = self.operation_log[idempotency_key]
if cached_result['status'] == 'COMPLETED':
return cached_result['result']
# Mark as in progress
self.operation_log[idempotency_key] = {
'status': 'IN_PROGRESS',
'started_at': time.time()
}
try:
# Check current booking status
booking = await self.booking_repository.get_booking(booking_id)
if not booking:
result = {
'success': True,
'status': 'not_found',
'message': 'Booking does not exist (may already be cancelled)'
}
elif booking.status == 'CANCELLED':
result = {
'success': True,
'status': 'already_cancelled',
'cancellation_id': booking.cancellation_id
}
else:
# Perform actual cancellation
result = await self._perform_flight_cancellation(booking_id)
# Cache result
self.operation_log[idempotency_key] = {
'status': 'COMPLETED',
'result': result,
'completed_at': time.time()
}
return result
except Exception as e:
# Remove from cache on failure
if idempotency_key in self.operation_log:
del self.operation_log[idempotency_key]
raise e
def _hash_booking_request(self, booking_request: dict) -> str:
"""Create deterministic hash of booking request for deduplication"""
# Extract key fields for hashing
key_fields = {
'customer_id': booking_request.get('customer_id'),
'flight_id': booking_request.get('flight_id'),
'departure_date': booking_request.get('departure_date'),
'passenger_count': booking_request.get('passenger_count'),
'seat_class': booking_request.get('seat_class')
}
# Create deterministic hash
import hashlib
import json
key_string = json.dumps(key_fields, sort_keys=True)
return hashlib.sha256(key_string.encode()).hexdigest()
# Distributed idempotency using Redis
class DistributedIdempotencyManager:
"""Manage idempotency across multiple service instances"""
def __init__(self, redis_client):
self.redis = redis_client
self.default_ttl = 86400 # 24 hours
async def with_idempotency(self, operation_func, idempotency_key: str, ttl: int = None):
"""Execute operation with distributed idempotency"""
ttl = ttl or self.default_ttl
lock_key = f"idempotency:lock:{idempotency_key}"
result_key = f"idempotency:result:{idempotency_key}"
# Check for cached result first
cached_result = await self.redis.get(result_key)
if cached_result:
return json.loads(cached_result)
# Acquire distributed lock
lock_acquired = await self.redis.set(lock_key, "locked", ex=300, nx=True) # 5 min lock
if not lock_acquired:
# Another instance is processing this operation
# Poll for result or return "in progress" status
for _ in range(30): # Poll for up to 30 seconds
cached_result = await self.redis.get(result_key)
if cached_result:
return json.loads(cached_result)
await asyncio.sleep(1)
return {
'success': False,
'error': 'Operation timeout - another instance may be processing',
'retry_after': 60
}
try:
# Execute the operation
result = await operation_func()
# Cache the result
await self.redis.set(result_key, json.dumps(result), ex=ttl)
return result
finally:
# Always release the lock
await self.redis.delete(lock_key)
Production Decision: 2PC vs Saga Pattern
Decision Matrix: When to Choose What
| Factor | 2PC | Saga Pattern | WonderTravel Choice |
|---|---|---|---|
| Consistency | Strong (ACID) | Eventual | Saga - eventual OK for travel |
| Availability | Poor (blocking) | High | Saga - 99.99% availability required |
| Performance | Degrades with load | Scales horizontally | Saga - handles 50K concurrent |
| Complexity | Medium (coordinator) | High (compensation logic) | Saga - worth the complexity |
| Failure Recovery | Manual intervention | Self-healing | Saga - automatic recovery |
| Network Partitions | Blocks indefinitely | Continues operating | Saga - partition tolerance |
| Development Time | Faster (simpler) | Slower (more complex) | Saga - long-term investment |
The Rare Cases for 2PC
class RareUseCasesFor2PC:
"""When 2PC might still be appropriate"""
@staticmethod
def financial_transfers():
"""Bank account transfers between different banks"""
return {
'use_case': 'Interbank money transfers',
'why_2pc': 'Absolute consistency required - no partial transfers allowed',
'constraints': 'Lower volume, can tolerate blocking',
'example': 'Transfer $10,000 from Bank A to Bank B - must be atomic',
'acceptable_trade_offs': 'Lower availability for perfect consistency'
}
@staticmethod
def inventory_allocation():
"""Critical inventory allocation for limited items"""
return {
'use_case': 'Concert tickets, limited edition products',
'why_2pc': 'Cannot oversell - inventory must be exactly tracked',
'constraints': 'Short time windows, manageable scale',
'example': '100 VIP tickets - ensure exactly 100 are sold',
'acceptable_trade_offs': 'System slowdown during high demand'
}
@staticmethod
def regulatory_compliance():
"""Operations requiring audit trails and exact consistency"""
return {
'use_case': 'Healthcare records, legal documents',
'why_2pc': 'Regulatory requirements for atomic operations',
'constraints': 'Lower volume, compliance over performance',
'example': 'Medical procedure billing - all records must match exactly',
'acceptable_trade_offs': 'Performance for compliance'
}
# When Saga Patterns Shine
class SagaPatternAdvantages:
"""Why Saga patterns are ideal for most business scenarios"""
@staticmethod
def e_commerce_scenarios():
"""Online shopping, order processing"""
return {
'use_case': 'Order processing: payment + inventory + shipping',
'why_saga': 'Can handle payment failures gracefully',
'benefits': [
'Immediate payment processing',
'Inventory reserved but recoverable',
'Shipping cancellable if needed'
],
'real_world_example': 'Amazon order processing'
}
@staticmethod
def travel_booking():
"""Flight + hotel + car booking"""
return {
'use_case': 'WonderTravel booking system',
'why_saga': 'Services are independent, compensation natural',
'benefits': [
'Each service highly available',
'Natural cancellation policies',
'User-friendly partial failure handling'
],
'real_world_example': 'Expedia, Booking.com'
}
@staticmethod
def microservices_architectures():
"""Modern distributed systems"""
return {
'use_case': 'Any multi-service business process',
'why_saga': 'Aligns with microservice independence',
'benefits': [
'Services evolve independently',
'No coordinator bottleneck',
'Resilient to service failures'
],
'real_world_example': 'Netflix, Uber, Airbnb'
}
WonderTravel's Production Architecture Decision
class WonderTravelProductionDecision:
"""Final architecture chosen for WonderTravel"""
def __init__(self):
self.chosen_pattern = "Hybrid Saga Pattern"
self.reasons = [
"High availability requirements (99.99%)",
"Global scale (50K+ concurrent bookings)",
"Independent service evolution",
"Natural compensation actions in travel domain",
"Better user experience during failures"
]
def production_implementation(self):
"""Production saga implementation strategy"""
return {
'pattern': 'Orchestrated Saga',
'orchestrator': 'Dedicated saga orchestrator service',
'event_store': 'Apache Kafka for event sourcing',
'compensation_strategy': 'Automatic with manual fallback',
'monitoring': 'Real-time saga state tracking',
'recovery': 'Automatic retry with exponential backoff',
'service_implementations': {
'flight_service': {
'database': 'PostgreSQL with saga state table',
'idempotency': 'Redis-based distributed locks',
'compensation': 'Automatic cancellation with refund'
},
'hotel_service': {
'database': 'MongoDB with saga context',
'idempotency': 'Document-level versioning',
'compensation': 'Free cancellation policy'
},
'car_service': {
'database': 'MySQL with saga tracking',
'idempotency': 'Unique constraint on booking hash',
'compensation': 'Automated cancellation'
}
},
'monitoring_metrics': [
'Saga completion rate',
'Compensation success rate',
'Average saga duration',
'Failed saga recovery time'
]
}
def business_impact_results(self):
"""Results after implementing Saga pattern"""
return {
'availability_improvement': '99.5% → 99.97%',
'booking_success_rate': '94% → 99.2%',
'peak_load_handling': '10K → 75K concurrent bookings',
'customer_satisfaction': '3.2 → 4.7 stars',
'revenue_impact': '$2.4M recovered from failed bookings',
'operational_overhead': 'Reduced manual interventions by 85%'
}
# Production monitoring and alerting
class SagaMonitoring:
"""Production monitoring for saga-based systems"""
def __init__(self):
self.metrics_collector = PrometheusMetrics()
self.alert_manager = AlertManager()
def define_saga_metrics(self):
"""Key metrics for saga health monitoring"""
return {
'saga_duration_histogram': 'Time from start to completion/compensation',
'saga_success_rate': 'Percentage of sagas completing successfully',
'compensation_success_rate': 'Percentage of compensations succeeding',
'saga_step_failure_rate': 'Failure rate by individual step',
'pending_sagas_gauge': 'Number of sagas currently in progress',
'saga_recovery_time': 'Time to recover from saga failures'
}
def setup_alerting_rules(self):
"""Critical alerts for saga operations"""
return {
'saga_success_rate_low': {
'condition': 'success_rate < 95%',
'severity': 'critical',
'action': 'Page on-call engineer'
},
'compensation_failure_spike': {
'condition': 'compensation_failures > 10 in 5min',
'severity': 'warning',
'action': 'Investigate failing compensations'
},
'saga_duration_anomaly': {
'condition': 'avg_duration > 3 × historical_avg',
'severity': 'warning',
'action': 'Check service health'
},
'pending_sagas_accumulation': {
'condition': 'pending_sagas > 1000',
'severity': 'critical',
'action': 'Check for saga orchestrator issues'
}
}
Advanced Patterns and Future Evolution
Saga with Machine Learning Compensation
class IntelligentSagaCompensation:
"""AI-powered compensation strategies"""
def __init__(self):
self.ml_model = self._load_compensation_model()
self.compensation_history = CompensationHistoryRepository()
async def predict_optimal_compensation(self, saga_context: dict, failure_reason: str) -> dict:
"""Use ML to determine best compensation strategy"""
features = self._extract_features(saga_context, failure_reason)
# Predict best compensation approach
compensation_scores = self.ml_model.predict_proba([features])
strategies = [
'immediate_full_refund',
'voucher_compensation',
'alternative_booking',
'partial_compensation',
'manual_review_required'
]
best_strategy = strategies[compensation_scores.argmax()]
confidence = compensation_scores.max()
return {
'recommended_strategy': best_strategy,
'confidence': confidence,
'alternative_strategies': [
{'strategy': strategies[i], 'score': score}
for i, score in enumerate(compensation_scores)
if i != compensation_scores.argmax()
]
}
def _extract_features(self, saga_context: dict, failure_reason: str) -> List[float]:
"""Extract features for ML model"""
return [
saga_context.get('customer_tier', 0), # VIP, Gold, Silver, Bronze
saga_context.get('booking_value', 0), # Total booking amount
saga_context.get('advance_booking_days', 0), # How far in advance
saga_context.get('failure_step', 0), # Which step failed
saga_context.get('historical_cancellations', 0), # Customer history
len(failure_reason), # Complexity of failure
saga_context.get('seasonal_demand', 0), # High/low season
]
class EventSourcingSaga:
"""Saga with complete event sourcing for auditability"""
def __init__(self):
self.event_store = EventStore()
self.saga_projector = SagaStateProjector()
async def execute_saga_with_event_sourcing(self, saga_id: str, booking_request: dict):
"""Execute saga with complete event history"""
# Start saga
await self._emit_event(saga_id, 'SagaStarted', booking_request)
# Reconstruct current state from events
current_state = await self._rebuild_saga_state_from_events(saga_id)
# Execute next step based on current state
next_step = self._determine_next_step(current_state)
if next_step:
await self._execute_saga_step(saga_id, next_step, current_state)
async def _rebuild_saga_state_from_events(self, saga_id: str) -> dict:
"""Rebuild saga state by replaying all events"""
events = await self.event_store.get_events_for_saga(saga_id)
state = {}
for event in events:
state = self.saga_projector.apply_event(state, event)
return state
async def _emit_event(self, saga_id: str, event_type: str, event_ dict):
"""Emit saga event to event store"""
event = {
'saga_id': saga_id,
'event_type': event_type,
'event_data': event_data,
'timestamp': time.time(),
'sequence_number': await self._get_next_sequence_number(saga_id)
}
await self.event_store.append_event(event)
# Also publish to event bus for real-time processing
await self.event_bus.publish(event)
Summary: The Distributed Transaction Revolution
The journey from Two-Phase Commit to Saga Pattern represents more than a technical migration—it's a fundamental shift in how we think about distributed systems, consistency, and user experience.
The Technical Transformation:
- From blocking to non-blocking: 2PC's coordinator bottleneck eliminated
- From strong to eventual consistency: Trading immediate consistency for availability
- From centralized to distributed resilience: Each service handles its own failures
- From manual to automatic recovery: Self-healing through compensation
The Business Impact:
- Availability: 99.5% → 99.97% uptime (from hours to minutes of downtime)
- Scale: 10K → 75K concurrent bookings handled seamlessly
- Customer satisfaction: 3.2 → 4.7 stars through better failure handling
- Revenue recovery: $2.4M in previously failed bookings now successful
The Architectural Wisdom:
- Embrace eventual consistency: Most business processes can tolerate brief inconsistency
- Design for failure: Compensation actions should be first-class citizens
- Choose based on context: 2PC has rare but valid use cases (banking, compliance)
- Monitor saga health: Success rates, compensation patterns, and recovery times
The Universal Lessons:
- Distributed systems require different thinking: What works in monoliths fails at scale
- Business requirements drive architecture: Travel booking tolerates eventual consistency, banking does not
- Operational complexity is worth business resilience: Saga's complexity pays dividends in reliability
- Recovery is as important as the happy path: Design compensation actions with the same care as primary actions
As WonderTravel's customers now book their dream vacations without fear of partial failures, and the system gracefully handles everything from network partitions to service crashes, the Saga pattern proves that sometimes the most robust solutions come not from preventing failures, but from embracing them and building systems that heal themselves.
The next time you face a distributed transaction challenge, remember: perfect consistency is the enemy of high availability. Sometimes the best way to ensure everything works is to accept that sometimes things fail—and build systems that know how to fix themselves.
The Final Truth: In distributed systems, it's not about avoiding failures—it's about failing gracefully and recovering automatically. Saga patterns don't eliminate the complexity of distributed transactions; they make that complexity manageable, observable, and ultimately, reliable at scale.