Himanshu Kukreja
0%
LearnSystem DesignWeek 5Saga Orchestration
Day 03

Week 5 — Day 3: Saga Orchestration Deep Dive

System Design Mastery Series


Preface

Yesterday, we learned the Saga pattern — how to coordinate distributed transactions using local transactions with compensation. We built a basic orchestrator that executes steps and handles failures.

But our orchestrator had limitations:

YESTERDAY'S ORCHESTRATOR LIMITATIONS

1. CRASH RECOVERY
   ├── What if orchestrator crashes mid-step?
   ├── We persist state to DB, but recovery is manual
   ├── What if the step was half-executed?
   └── Did the external API receive our request or not?

2. LONG-RUNNING WORKFLOWS
   ├── What if a step takes hours? (human approval)
   ├── What if we need to wait for external events?
   ├── Holding connections open isn't scalable
   └── Polling the database is inefficient

3. VERSIONING
   ├── What if we need to change the saga logic?
   ├── In-flight sagas use old code
   ├── How do we migrate without breaking things?
   └── What about A/B testing saga variations?

4. VISIBILITY
   ├── Where is this saga right now?
   ├── How long has it been stuck?
   ├── What's the history of this execution?
   └── Can we replay a failed saga?

Today, we'll solve these problems with workflow orchestration engines — specifically Temporal (formerly Cadence from Uber). These systems provide durable execution — your code runs exactly once, survives any failure, and can wait for years if needed.


Part I: Foundations

Chapter 1: What Is Durable Execution?

1.1 The Problem with Normal Code

# Normal code - NOT durable

async def process_order(order_id: str):
    # Step 1: Reserve inventory
    reservation = await inventory_service.reserve(order_id)
    
    # CRASH HERE - what happens?
    # - reservation exists in inventory service
    # - but we lost track of it
    # - if we restart, do we reserve again? (double reserve!)
    # - if we don't, order is stuck
    
    # Step 2: Charge payment
    payment = await payment_service.charge(order_id)
    
    # Step 3: Ship
    shipment = await shipping_service.create(order_id)
    
    return shipment.tracking_number

The problem: Function state is lost on crash. We don't know where we were or what we've done.

1.2 Durable Execution: Code That Survives Anything

# Durable execution with Temporal

@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        # Step 1: Reserve inventory
        reservation = await workflow.execute_activity(
            reserve_inventory,
            order_id,
            start_to_close_timeout=timedelta(minutes=5)
        )
        
        # CRASH HERE - no problem!
        # Temporal recorded that reserve_inventory completed
        # On restart, it SKIPS step 1 and continues from step 2
        # The reservation result is replayed from history
        
        # Step 2: Charge payment
        payment = await workflow.execute_activity(
            charge_payment,
            order_id,
            start_to_close_timeout=timedelta(minutes=5)
        )
        
        # Step 3: Ship
        shipment = await workflow.execute_activity(
            create_shipment,
            order_id,
            start_to_close_timeout=timedelta(minutes=5)
        )
        
        return shipment["tracking_number"]

How it works:

  1. Every activity completion is recorded in Temporal's database
  2. On crash, workflow replays from the beginning
  3. Completed activities return their stored results (not re-executed)
  4. Execution continues from where it left off

1.3 The Magic of Event Sourcing for Workflows

WORKFLOW EVENT SOURCING

Temporal stores workflow history as events:

┌────────────────────────────────────────────────────────────────────────┐
│                    WORKFLOW HISTORY                                    │
│                                                                        │
│  Event 1:  WorkflowExecutionStarted                                    │
│            {order_id: "order-123"}                                     │
│                                                                        │
│  Event 2:  ActivityTaskScheduled                                       │
│            {activity: "reserve_inventory", input: "order-123"}         │
│                                                                        │
│  Event 3:  ActivityTaskCompleted                                       │
│            {result: {reservation_id: "res-456"}}                       │
│                                                                        │
│  Event 4:  ActivityTaskScheduled                                       │
│            {activity: "charge_payment", input: "order-123"}            │
│                                                                        │
│  <<< CRASH HERE >>>                                                    │
│                                                                        │
│  --- On recovery, workflow replays: ---                                │
│                                                                        │
│  Replay Event 1: Start workflow with order_id                          │
│  Replay Event 2: Schedule reserve_inventory                            │
│  Replay Event 3: Return stored result (DON'T re-execute!)              │
│  Replay Event 4: Schedule charge_payment                               │
│  Continue:       Activity worker picks up charge_payment               │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Key insight: Activities are NOT re-executed on replay.
Their recorded results are returned to the workflow code.

Chapter 2: Temporal Architecture

2.1 Core Components

TEMPORAL ARCHITECTURE

┌───────────────────────────────────────────────────────────────────────┐
│                                                                       │
│                         TEMPORAL CLUSTER                              │
│                                                                       │
│  ┌────────────────────────────────────────────────────────────────┐   │
│  │                      TEMPORAL SERVER                           │   │
│  │                                                                │   │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │   │
│  │  │   Frontend   │  │   History    │  │   Matching   │          │   │
│  │  │   Service    │  │   Service    │  │   Service    │          │   │
│  │  └──────────────┘  └──────────────┘  └──────────────┘          │   │
│  │                                                                │   │
│  │  ┌─────────────────────────────────────────────────────────┐   │   │
│  │  │                    Persistence                          │   │   │
│  │  │            (PostgreSQL / Cassandra / MySQL)             │   │   │
│  │  └─────────────────────────────────────────────────────────┘   │   │
│  │                                                                │   │
│  └────────────────────────────────────────────────────────────────┘   │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘
                                    │
                                    │ gRPC
                                    │
         ┌──────────────────────────┼──────────────────────────┐
         │                          │                          │
         ▼                          ▼                          ▼
┌─────────────────┐      ┌─────────────────┐      ┌─────────────────┐
│ WORKFLOW WORKER │      │ ACTIVITY WORKER │      │     CLIENT      │
│                 │      │                 │      │                 │
│ Executes        │      │ Executes        │      │ Starts          │
│ workflow code   │      │ activity code   │      │ workflows       │
│ (deterministic) │      │ (side effects)  │      │ Sends signals   │
│                 │      │                 │      │ Queries state   │
└─────────────────┘      └─────────────────┘      └─────────────────┘


COMPONENT RESPONSIBILITIES:

Frontend Service:
  - API gateway for clients
  - Rate limiting, auth
  - Request routing

History Service:
  - Workflow state management
  - Event history storage
  - Workflow execution

Matching Service:
  - Task queue management
  - Worker polling
  - Task dispatch

Workers:
  - Workflow Worker: Runs workflow code (decision making)
  - Activity Worker: Runs activity code (actual work)

2.2 Workflows vs Activities

WORKFLOWS vs ACTIVITIES

WORKFLOWS:
├── Deterministic code (same input → same decisions)
├── Orchestration logic (what to do next)
├── Can run for years
├── Replayed on crash (must be deterministic!)
├── NO side effects allowed
└── Examples: if/else, loops, wait for signal

ACTIVITIES:
├── Non-deterministic code (can have side effects)
├── Actual work (API calls, DB writes)
├── Timeout and retry policies
├── NOT replayed (result stored and returned)
├── Side effects ARE allowed
└── Examples: HTTP calls, database operations


DETERMINISM RULES FOR WORKFLOWS:

❌ DON'T DO THIS IN WORKFLOWS:
   - Random numbers (use workflow.random())
   - Current time (use workflow.now())
   - UUID generation (use workflow.uuid())
   - HTTP calls (use activities)
   - Database queries (use activities)
   - File I/O (use activities)
   - Sleep (use workflow.sleep())

✅ DO THIS:
   - Pure logic (if/else, loops)
   - Call activities for side effects
   - Use workflow.* APIs for non-deterministic needs
   - Signal handling
   - State queries

2.3 Task Queues

TASK QUEUES

Task queues decouple workflow scheduling from execution.

                         Temporal Server
                               │
                    ┌──────────┴──────────┐
                    │                     │
                    ▼                     ▼
            ┌─────────────┐       ┌─────────────┐
            │  Workflow   │       │  Activity   │
            │ Task Queue  │       │ Task Queue  │
            │  "orders"   │       │  "orders"   │
            └──────┬──────┘       └──────┬──────┘
                   │                     │
         ┌─────────┼─────────┐    ┌──────┼──────┐
         │         │         │    │      │      │
         ▼         ▼         ▼    ▼      ▼      ▼
      Worker    Worker    Worker  Worker Worker Worker
        1         2         3       1      2      3


Benefits:
├── Scale workers independently
├── Different queues for different priorities
├── Workers can be language-specific
├── Easy deployment (add/remove workers)
└── Rate limiting per queue

Chapter 3: Writing Temporal Workflows

3.1 Basic Workflow Structure

# Basic Temporal Workflow

from datetime import timedelta
from temporalio import workflow, activity
from temporalio.common import RetryPolicy
from dataclasses import dataclass
from typing import Optional


# =============================================================================
# Data Classes
# =============================================================================

@dataclass
class OrderInput:
    order_id: str
    customer_id: str
    items: list
    total_amount: float
    shipping_address: str


@dataclass
class OrderResult:
    order_id: str
    status: str
    tracking_number: Optional[str] = None
    error: Optional[str] = None


# =============================================================================
# Activities (Side Effects)
# =============================================================================

@activity.defn
async def reserve_inventory(order_id: str, items: list) -> dict:
    """
    Activity: Reserve inventory.
    
    This is where actual work happens (DB calls, API calls).
    Activities can fail and will be retried according to policy.
    """
    async with get_db_connection() as db:
        reservation_id = await db.execute(
            "INSERT INTO reservations (order_id, items, status) VALUES ($1, $2, 'active') RETURNING id",
            order_id, json.dumps(items)
        )
        
        for item in items:
            result = await db.execute(
                "UPDATE inventory SET quantity = quantity - $1 WHERE product_id = $2 AND quantity >= $1",
                item['quantity'], item['product_id']
            )
            if result.rowcount == 0:
                raise InsufficientInventoryError(f"Not enough {item['product_id']}")
        
        return {"reservation_id": str(reservation_id)}


@activity.defn
async def release_inventory(reservation_id: str) -> dict:
    """Activity: Release inventory reservation (compensation)."""
    async with get_db_connection() as db:
        reservation = await db.fetch_one(
            "SELECT items FROM reservations WHERE id = $1 AND status = 'active'",
            reservation_id
        )
        
        if not reservation:
            return {"status": "already_released"}
        
        items = json.loads(reservation['items'])
        for item in items:
            await db.execute(
                "UPDATE inventory SET quantity = quantity + $1 WHERE product_id = $2",
                item['quantity'], item['product_id']
            )
        
        await db.execute(
            "UPDATE reservations SET status = 'released' WHERE id = $1",
            reservation_id
        )
        
        return {"status": "released"}


@activity.defn
async def charge_payment(
    customer_id: str,
    amount: float,
    idempotency_key: str
) -> dict:
    """Activity: Charge customer payment."""
    result = await stripe.charges.create(
        amount=int(amount * 100),
        currency="usd",
        customer=customer_id,
        idempotency_key=idempotency_key
    )
    return {"payment_id": result.id, "status": result.status}


@activity.defn
async def refund_payment(payment_id: str, idempotency_key: str) -> dict:
    """Activity: Refund payment (compensation)."""
    result = await stripe.refunds.create(
        payment_intent=payment_id,
        idempotency_key=idempotency_key
    )
    return {"refund_id": result.id, "status": result.status}


@activity.defn
async def create_shipment(order_id: str, address: str, items: list) -> dict:
    """Activity: Create shipment with carrier."""
    shipment = await shipping_api.create_shipment(
        reference=order_id,
        destination=address,
        items=items
    )
    return {
        "shipment_id": shipment.id,
        "tracking_number": shipment.tracking_number
    }


@activity.defn
async def send_notification(
    customer_id: str,
    notification_type: str,
    data: dict
) -> dict:
    """Activity: Send notification to customer."""
    await notification_service.send(
        customer_id=customer_id,
        type=notification_type,
        data=data
    )
    return {"sent": True}


# =============================================================================
# Workflow (Orchestration Logic)
# =============================================================================

@workflow.defn
class OrderWorkflow:
    """
    Order processing workflow with saga compensation.
    
    This workflow:
    1. Reserves inventory
    2. Charges payment
    3. Creates shipment (pivot point)
    4. Sends confirmation
    
    If any step before shipment fails, it compensates.
    """
    
    def __init__(self):
        self.status = "pending"
        self.reservation_id = None
        self.payment_id = None
        self.tracking_number = None
    
    @workflow.run
    async def run(self, input: OrderInput) -> OrderResult:
        """Main workflow execution."""
        
        # Retry policy for activities
        retry_policy = RetryPolicy(
            initial_interval=timedelta(seconds=1),
            maximum_interval=timedelta(minutes=1),
            backoff_coefficient=2.0,
            maximum_attempts=5,
            non_retryable_error_types=["InsufficientInventoryError", "PaymentDeclinedError"]
        )
        
        try:
            # Step 1: Reserve inventory
            self.status = "reserving_inventory"
            reservation = await workflow.execute_activity(
                reserve_inventory,
                args=[input.order_id, input.items],
                start_to_close_timeout=timedelta(minutes=5),
                retry_policy=retry_policy
            )
            self.reservation_id = reservation["reservation_id"]
            
            # Step 2: Charge payment
            self.status = "charging_payment"
            payment = await workflow.execute_activity(
                charge_payment,
                args=[input.customer_id, input.total_amount, f"order-{input.order_id}"],
                start_to_close_timeout=timedelta(minutes=5),
                retry_policy=retry_policy
            )
            self.payment_id = payment["payment_id"]
            
            # Step 3: Create shipment (PIVOT - no compensation after this)
            self.status = "creating_shipment"
            shipment = await workflow.execute_activity(
                create_shipment,
                args=[input.order_id, input.shipping_address, input.items],
                start_to_close_timeout=timedelta(minutes=10),
                retry_policy=RetryPolicy(maximum_attempts=0)  # Retry forever after pivot
            )
            self.tracking_number = shipment["tracking_number"]
            
            # Step 4: Send confirmation
            self.status = "sending_confirmation"
            await workflow.execute_activity(
                send_notification,
                args=[input.customer_id, "order_confirmation", {
                    "order_id": input.order_id,
                    "tracking_number": self.tracking_number
                }],
                start_to_close_timeout=timedelta(minutes=1),
                retry_policy=retry_policy
            )
            
            self.status = "completed"
            return OrderResult(
                order_id=input.order_id,
                status="completed",
                tracking_number=self.tracking_number
            )
            
        except Exception as e:
            # Compensation logic
            workflow.logger.error(f"Order workflow failed: {e}")
            await self._compensate(input)
            
            return OrderResult(
                order_id=input.order_id,
                status="failed",
                error=str(e)
            )
    
    async def _compensate(self, input: OrderInput):
        """Execute compensation for completed steps."""
        self.status = "compensating"
        
        compensation_policy = RetryPolicy(
            maximum_attempts=0  # Retry forever - compensation must succeed
        )
        
        # Refund payment if charged
        if self.payment_id:
            try:
                await workflow.execute_activity(
                    refund_payment,
                    args=[self.payment_id, f"refund-{input.order_id}"],
                    start_to_close_timeout=timedelta(minutes=5),
                    retry_policy=compensation_policy
                )
            except Exception as e:
                workflow.logger.error(f"Refund failed: {e}")
        
        # Release inventory if reserved
        if self.reservation_id:
            try:
                await workflow.execute_activity(
                    release_inventory,
                    args=[self.reservation_id],
                    start_to_close_timeout=timedelta(minutes=5),
                    retry_policy=compensation_policy
                )
            except Exception as e:
                workflow.logger.error(f"Release inventory failed: {e}")
        
        # Notify customer of failure
        await workflow.execute_activity(
            send_notification,
            args=[input.customer_id, "order_failed", {
                "order_id": input.order_id,
                "reason": "Payment or inventory issue"
            }],
            start_to_close_timeout=timedelta(minutes=1),
            retry_policy=compensation_policy
        )
        
        self.status = "compensated"
    
    @workflow.query
    def get_status(self) -> str:
        """Query: Get current workflow status."""
        return self.status
    
    @workflow.query
    def get_tracking_number(self) -> Optional[str]:
        """Query: Get tracking number if available."""
        return self.tracking_number

3.2 Signals: External Events

# Workflows with Signals

@workflow.defn
class OrderWithApprovalWorkflow:
    """
    Order workflow that waits for manager approval.
    
    Demonstrates:
    - Signals for external events
    - Waiting for signals with timeout
    - Workflow state queries
    """
    
    def __init__(self):
        self.status = "pending"
        self.approved = None  # None = waiting, True/False = decided
        self.approver = None
    
    @workflow.run
    async def run(self, input: OrderInput) -> OrderResult:
        # Step 1: Reserve inventory
        reservation = await workflow.execute_activity(
            reserve_inventory,
            args=[input.order_id, input.items],
            start_to_close_timeout=timedelta(minutes=5)
        )
        
        # Step 2: Wait for approval if order > $1000
        if input.total_amount > 1000:
            self.status = "pending_approval"
            
            # Send approval request notification
            await workflow.execute_activity(
                send_notification,
                args=["managers", "approval_required", {
                    "order_id": input.order_id,
                    "amount": input.total_amount,
                    "customer": input.customer_id
                }],
                start_to_close_timeout=timedelta(minutes=1)
            )
            
            # Wait for approval signal (up to 24 hours)
            try:
                await workflow.wait_condition(
                    lambda: self.approved is not None,
                    timeout=timedelta(hours=24)
                )
            except asyncio.TimeoutError:
                # No approval received - auto-reject
                self.approved = False
                self.approver = "system_timeout"
            
            if not self.approved:
                # Rejected - compensate and return
                await self._compensate(input, reservation["reservation_id"])
                return OrderResult(
                    order_id=input.order_id,
                    status="rejected",
                    error=f"Rejected by {self.approver}"
                )
        
        # Step 3: Continue with payment and shipment
        self.status = "processing"
        # ... rest of workflow
    
    @workflow.signal
    async def approve(self, approver: str):
        """Signal: Approve the order."""
        self.approved = True
        self.approver = approver
    
    @workflow.signal
    async def reject(self, approver: str, reason: str):
        """Signal: Reject the order."""
        self.approved = False
        self.approver = approver
        self.rejection_reason = reason
    
    @workflow.query
    def get_approval_status(self) -> dict:
        """Query: Get approval status."""
        return {
            "status": self.status,
            "approved": self.approved,
            "approver": self.approver
        }


# Client code to send signals
async def approve_order(order_id: str, manager_id: str):
    """Manager approves an order."""
    client = await Client.connect("localhost:7233")
    
    handle = client.get_workflow_handle(f"order-{order_id}")
    await handle.signal(OrderWithApprovalWorkflow.approve, manager_id)


async def reject_order(order_id: str, manager_id: str, reason: str):
    """Manager rejects an order."""
    client = await Client.connect("localhost:7233")
    
    handle = client.get_workflow_handle(f"order-{order_id}")
    await handle.signal(OrderWithApprovalWorkflow.reject, manager_id, reason)

3.3 Child Workflows

# Parent-Child Workflow Pattern

@workflow.defn
class BatchOrderWorkflow:
    """
    Process multiple orders as a batch.
    
    Each order runs as a child workflow.
    Parent tracks overall progress.
    """
    
    @workflow.run
    async def run(self, order_ids: list[str]) -> dict:
        results = {}
        
        # Start all orders as child workflows
        handles = []
        for order_id in order_ids:
            # Get order details (this would be an activity in real code)
            order_input = OrderInput(
                order_id=order_id,
                customer_id="batch-customer",
                items=[],
                total_amount=100.0,
                shipping_address="123 Main St"
            )
            
            # Start child workflow
            handle = await workflow.start_child_workflow(
                OrderWorkflow.run,
                order_input,
                id=f"order-{order_id}",
                task_queue="orders"
            )
            handles.append((order_id, handle))
        
        # Wait for all child workflows to complete
        for order_id, handle in handles:
            try:
                result = await handle
                results[order_id] = {
                    "status": result.status,
                    "tracking": result.tracking_number
                }
            except Exception as e:
                results[order_id] = {
                    "status": "failed",
                    "error": str(e)
                }
        
        return {
            "total": len(order_ids),
            "succeeded": sum(1 for r in results.values() if r["status"] == "completed"),
            "failed": sum(1 for r in results.values() if r["status"] == "failed"),
            "results": results
        }

Chapter 4: Advanced Patterns

4.1 Saga State Machine

# State Machine Pattern for Complex Sagas

from enum import Enum
from typing import Dict, Callable, Optional
from dataclasses import dataclass


class SagaState(Enum):
    INITIALIZED = "initialized"
    RESERVING_INVENTORY = "reserving_inventory"
    INVENTORY_RESERVED = "inventory_reserved"
    CHARGING_PAYMENT = "charging_payment"
    PAYMENT_CHARGED = "payment_charged"
    CREATING_SHIPMENT = "creating_shipment"
    SHIPMENT_CREATED = "shipment_created"
    COMPLETING = "completing"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"
    FAILED = "failed"


@dataclass
class StateTransition:
    from_state: SagaState
    to_state: SagaState
    action: Optional[Callable] = None
    compensation: Optional[Callable] = None


@workflow.defn
class StateMachineOrderWorkflow:
    """
    Order workflow using explicit state machine.
    
    Benefits:
    - Clear visualization of all states
    - Explicit transitions
    - Easy to add new states
    - Better for complex branching
    """
    
    # State machine definition
    TRANSITIONS = {
        SagaState.INITIALIZED: [
            StateTransition(
                SagaState.INITIALIZED,
                SagaState.RESERVING_INVENTORY
            )
        ],
        SagaState.RESERVING_INVENTORY: [
            StateTransition(
                SagaState.RESERVING_INVENTORY,
                SagaState.INVENTORY_RESERVED,
                action="reserve_inventory",
                compensation="release_inventory"
            ),
            StateTransition(
                SagaState.RESERVING_INVENTORY,
                SagaState.FAILED  # On error
            )
        ],
        SagaState.INVENTORY_RESERVED: [
            StateTransition(
                SagaState.INVENTORY_RESERVED,
                SagaState.CHARGING_PAYMENT
            )
        ],
        SagaState.CHARGING_PAYMENT: [
            StateTransition(
                SagaState.CHARGING_PAYMENT,
                SagaState.PAYMENT_CHARGED,
                action="charge_payment",
                compensation="refund_payment"
            ),
            StateTransition(
                SagaState.CHARGING_PAYMENT,
                SagaState.COMPENSATING  # On error, start compensation
            )
        ],
        # ... more transitions
    }
    
    def __init__(self):
        self.state = SagaState.INITIALIZED
        self.history: list[SagaState] = []
        self.results: dict = {}
    
    @workflow.run
    async def run(self, input: OrderInput) -> OrderResult:
        """Execute state machine."""
        
        while self.state not in [SagaState.COMPLETED, SagaState.COMPENSATED, SagaState.FAILED]:
            next_state = await self._execute_state(input)
            self.history.append(self.state)
            self.state = next_state
            
            workflow.logger.info(f"State transition: {self.history[-1]} -> {self.state}")
        
        return OrderResult(
            order_id=input.order_id,
            status=self.state.value,
            tracking_number=self.results.get("tracking_number")
        )
    
    async def _execute_state(self, input: OrderInput) -> SagaState:
        """Execute current state and return next state."""
        
        if self.state == SagaState.INITIALIZED:
            return SagaState.RESERVING_INVENTORY
        
        elif self.state == SagaState.RESERVING_INVENTORY:
            try:
                result = await workflow.execute_activity(
                    reserve_inventory,
                    args=[input.order_id, input.items],
                    start_to_close_timeout=timedelta(minutes=5)
                )
                self.results["reservation_id"] = result["reservation_id"]
                return SagaState.INVENTORY_RESERVED
            except Exception:
                return SagaState.FAILED
        
        elif self.state == SagaState.INVENTORY_RESERVED:
            return SagaState.CHARGING_PAYMENT
        
        elif self.state == SagaState.CHARGING_PAYMENT:
            try:
                result = await workflow.execute_activity(
                    charge_payment,
                    args=[input.customer_id, input.total_amount, f"order-{input.order_id}"],
                    start_to_close_timeout=timedelta(minutes=5)
                )
                self.results["payment_id"] = result["payment_id"]
                return SagaState.PAYMENT_CHARGED
            except Exception:
                return SagaState.COMPENSATING
        
        elif self.state == SagaState.PAYMENT_CHARGED:
            return SagaState.CREATING_SHIPMENT
        
        elif self.state == SagaState.CREATING_SHIPMENT:
            # This is the pivot - retry forever
            result = await workflow.execute_activity(
                create_shipment,
                args=[input.order_id, input.shipping_address, input.items],
                start_to_close_timeout=timedelta(minutes=10),
                retry_policy=RetryPolicy(maximum_attempts=0)
            )
            self.results["tracking_number"] = result["tracking_number"]
            return SagaState.SHIPMENT_CREATED
        
        elif self.state == SagaState.SHIPMENT_CREATED:
            return SagaState.COMPLETING
        
        elif self.state == SagaState.COMPLETING:
            await workflow.execute_activity(
                send_notification,
                args=[input.customer_id, "order_confirmation", self.results],
                start_to_close_timeout=timedelta(minutes=1)
            )
            return SagaState.COMPLETED
        
        elif self.state == SagaState.COMPENSATING:
            await self._run_compensation(input)
            return SagaState.COMPENSATED
        
        return SagaState.FAILED
    
    async def _run_compensation(self, input: OrderInput):
        """Run compensation based on history."""
        # Compensate in reverse order
        for state in reversed(self.history):
            if state == SagaState.CHARGING_PAYMENT and "payment_id" in self.results:
                await workflow.execute_activity(
                    refund_payment,
                    args=[self.results["payment_id"], f"refund-{input.order_id}"],
                    start_to_close_timeout=timedelta(minutes=5),
                    retry_policy=RetryPolicy(maximum_attempts=0)
                )
            
            elif state == SagaState.RESERVING_INVENTORY and "reservation_id" in self.results:
                await workflow.execute_activity(
                    release_inventory,
                    args=[self.results["reservation_id"]],
                    start_to_close_timeout=timedelta(minutes=5),
                    retry_policy=RetryPolicy(maximum_attempts=0)
                )
    
    @workflow.query
    def get_state(self) -> dict:
        """Query current state machine status."""
        return {
            "current_state": self.state.value,
            "history": [s.value for s in self.history],
            "results": self.results
        }

4.2 Workflow Versioning

# Workflow Versioning for Safe Deployments

@workflow.defn
class VersionedOrderWorkflow:
    """
    Workflow with versioning for safe deployments.
    
    When you need to change workflow logic, use versioning
    to ensure in-flight workflows use old code while
    new workflows use new code.
    """
    
    @workflow.run
    async def run(self, input: OrderInput) -> OrderResult:
        # Reserve inventory (unchanged)
        reservation = await workflow.execute_activity(
            reserve_inventory,
            args=[input.order_id, input.items],
            start_to_close_timeout=timedelta(minutes=5)
        )
        
        # Version 1: Original flow - charge then ship
        # Version 2: New flow - fraud check before charge
        
        version = workflow.patched("fraud-check-v2")
        
        if version:
            # New code path: Add fraud check
            fraud_result = await workflow.execute_activity(
                check_fraud,
                args=[input.customer_id, input.total_amount],
                start_to_close_timeout=timedelta(minutes=2)
            )
            
            if fraud_result["risk_score"] > 0.8:
                # High risk - require manual review
                await workflow.wait_condition(
                    lambda: self.fraud_approved is not None,
                    timeout=timedelta(hours=24)
                )
                
                if not self.fraud_approved:
                    await self._compensate(input, reservation)
                    return OrderResult(
                        order_id=input.order_id,
                        status="rejected_fraud"
                    )
        
        # Continue with payment (both versions)
        payment = await workflow.execute_activity(
            charge_payment,
            args=[input.customer_id, input.total_amount, f"order-{input.order_id}"],
            start_to_close_timeout=timedelta(minutes=5)
        )
        
        # Version 3: Add loyalty points
        version_loyalty = workflow.patched("loyalty-points-v3")
        
        if version_loyalty:
            await workflow.execute_activity(
                award_loyalty_points,
                args=[input.customer_id, input.total_amount],
                start_to_close_timeout=timedelta(minutes=1)
            )
        
        # Continue with shipment...


"""
VERSION BEHAVIOR:

Existing workflows (started before deploy):
├── workflow.patched("fraud-check-v2") returns False
├── Uses old code path (no fraud check)
└── Completes with original logic

New workflows (started after deploy):
├── workflow.patched("fraud-check-v2") returns True
├── Uses new code path (with fraud check)
└── Gets new functionality

This allows safe deployment without breaking in-flight workflows.
"""

4.3 Long-Running Workflows with Timers

# Long-Running Subscription Workflow

@workflow.defn
class SubscriptionWorkflow:
    """
    Subscription lifecycle workflow.
    
    This workflow can run for YEARS:
    - Handles monthly billing
    - Processes upgrades/downgrades
    - Manages cancellation
    """
    
    def __init__(self):
        self.status = "active"
        self.current_plan = None
        self.next_billing_date = None
        self.cancelled = False
        self.upgrade_request = None
    
    @workflow.run
    async def run(self, customer_id: str, initial_plan: str) -> dict:
        """Main subscription loop."""
        self.current_plan = initial_plan
        self.next_billing_date = workflow.now() + timedelta(days=30)
        
        while not self.cancelled:
            # Wait until next billing date OR a signal arrives
            try:
                await workflow.wait_condition(
                    lambda: self.cancelled or self.upgrade_request is not None,
                    timeout=self._time_until_billing()
                )
            except asyncio.TimeoutError:
                # Billing time!
                pass
            
            # Check for cancellation
            if self.cancelled:
                break
            
            # Check for upgrade/downgrade
            if self.upgrade_request:
                await self._process_plan_change()
                self.upgrade_request = None
                continue
            
            # Process billing
            await self._process_billing(customer_id)
            self.next_billing_date = workflow.now() + timedelta(days=30)
        
        # Subscription ended
        await self._process_cancellation(customer_id)
        
        return {
            "status": "cancelled",
            "final_billing_date": str(self.next_billing_date)
        }
    
    async def _process_billing(self, customer_id: str):
        """Process monthly billing."""
        plan_details = await workflow.execute_activity(
            get_plan_details,
            args=[self.current_plan],
            start_to_close_timeout=timedelta(minutes=1)
        )
        
        try:
            await workflow.execute_activity(
                charge_subscription,
                args=[customer_id, plan_details["price"], f"sub-{workflow.info().workflow_id}-{workflow.now().isoformat()}"],
                start_to_close_timeout=timedelta(minutes=5),
                retry_policy=RetryPolicy(
                    maximum_attempts=5,
                    initial_interval=timedelta(hours=1)  # Retry over days
                )
            )
            self.status = "active"
            
        except Exception as e:
            self.status = "payment_failed"
            # Could implement grace period, dunning, etc.
    
    async def _process_plan_change(self):
        """Process plan upgrade/downgrade."""
        new_plan = self.upgrade_request
        
        # Prorate current billing period
        await workflow.execute_activity(
            prorate_subscription,
            args=[self.current_plan, new_plan, self.next_billing_date],
            start_to_close_timeout=timedelta(minutes=5)
        )
        
        self.current_plan = new_plan
    
    async def _process_cancellation(self, customer_id: str):
        """Process subscription cancellation."""
        await workflow.execute_activity(
            send_notification,
            args=[customer_id, "subscription_cancelled", {
                "plan": self.current_plan
            }],
            start_to_close_timeout=timedelta(minutes=1)
        )
    
    def _time_until_billing(self) -> timedelta:
        """Calculate time until next billing."""
        now = workflow.now()
        if self.next_billing_date > now:
            return self.next_billing_date - now
        return timedelta(seconds=0)
    
    @workflow.signal
    async def cancel(self):
        """Signal: Cancel subscription."""
        self.cancelled = True
    
    @workflow.signal
    async def change_plan(self, new_plan: str):
        """Signal: Change subscription plan."""
        self.upgrade_request = new_plan
    
    @workflow.query
    def get_subscription_status(self) -> dict:
        """Query: Get current subscription status."""
        return {
            "status": self.status,
            "plan": self.current_plan,
            "next_billing": str(self.next_billing_date),
            "cancelled": self.cancelled
        }

Part II: Production Implementation

Chapter 5: Worker Setup and Configuration

# Production Worker Configuration

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner
import logging

logger = logging.getLogger(__name__)


class WorkerConfig:
    """Configuration for Temporal workers."""
    
    def __init__(
        self,
        temporal_host: str = "localhost:7233",
        namespace: str = "default",
        task_queue: str = "orders",
        max_concurrent_activities: int = 100,
        max_concurrent_workflow_tasks: int = 100,
        sticky_queue_schedule_to_start_timeout: int = 10,
    ):
        self.temporal_host = temporal_host
        self.namespace = namespace
        self.task_queue = task_queue
        self.max_concurrent_activities = max_concurrent_activities
        self.max_concurrent_workflow_tasks = max_concurrent_workflow_tasks
        self.sticky_queue_schedule_to_start_timeout = sticky_queue_schedule_to_start_timeout


async def run_worker(config: WorkerConfig):
    """Run a Temporal worker."""
    
    # Connect to Temporal
    client = await Client.connect(
        config.temporal_host,
        namespace=config.namespace
    )
    
    # Create worker
    worker = Worker(
        client,
        task_queue=config.task_queue,
        
        # Register workflows
        workflows=[
            OrderWorkflow,
            OrderWithApprovalWorkflow,
            SubscriptionWorkflow,
            BatchOrderWorkflow,
        ],
        
        # Register activities
        activities=[
            reserve_inventory,
            release_inventory,
            charge_payment,
            refund_payment,
            create_shipment,
            send_notification,
            check_fraud,
            award_loyalty_points,
        ],
        
        # Concurrency limits
        max_concurrent_activities=config.max_concurrent_activities,
        max_concurrent_workflow_tasks=config.max_concurrent_workflow_tasks,
        
        # Sticky execution (workflow affinity)
        sticky_queue_schedule_to_start_timeout=timedelta(
            seconds=config.sticky_queue_schedule_to_start_timeout
        ),
    )
    
    logger.info(f"Starting worker on queue: {config.task_queue}")
    
    # Run worker until interrupted
    await worker.run()


async def run_activity_worker(config: WorkerConfig):
    """
    Run activity-only worker.
    
    Useful for scaling activities independently from workflows.
    """
    client = await Client.connect(
        config.temporal_host,
        namespace=config.namespace
    )
    
    worker = Worker(
        client,
        task_queue=config.task_queue,
        
        # Only activities, no workflows
        activities=[
            reserve_inventory,
            release_inventory,
            charge_payment,
            refund_payment,
            create_shipment,
            send_notification,
        ],
        
        max_concurrent_activities=config.max_concurrent_activities,
    )
    
    await worker.run()


# Entry point
if __name__ == "__main__":
    config = WorkerConfig(
        temporal_host=os.getenv("TEMPORAL_HOST", "localhost:7233"),
        namespace=os.getenv("TEMPORAL_NAMESPACE", "default"),
        task_queue=os.getenv("TASK_QUEUE", "orders"),
    )
    
    asyncio.run(run_worker(config))

Chapter 6: Client Usage

# Production Client Usage

from temporalio.client import Client, WorkflowHandle
from temporalio.common import WorkflowIDReusePolicy
import asyncio


class OrderService:
    """
    Service for managing orders via Temporal workflows.
    """
    
    def __init__(self, temporal_client: Client):
        self.client = temporal_client
    
    async def create_order(self, input: OrderInput) -> str:
        """
        Start a new order workflow.
        
        Returns workflow ID for tracking.
        """
        # Use order ID as workflow ID for idempotency
        workflow_id = f"order-{input.order_id}"
        
        handle = await self.client.start_workflow(
            OrderWorkflow.run,
            input,
            id=workflow_id,
            task_queue="orders",
            
            # Prevent duplicate orders
            id_reuse_policy=WorkflowIDReusePolicy.REJECT_DUPLICATE,
            
            # Execution timeout (entire workflow)
            execution_timeout=timedelta(hours=24),
            
            # Run timeout (single run, before continue-as-new)
            run_timeout=timedelta(hours=1),
        )
        
        return workflow_id
    
    async def get_order_status(self, order_id: str) -> dict:
        """
        Query order status.
        
        Non-blocking - returns current state without waiting.
        """
        workflow_id = f"order-{order_id}"
        
        try:
            handle = self.client.get_workflow_handle(workflow_id)
            status = await handle.query(OrderWorkflow.get_status)
            tracking = await handle.query(OrderWorkflow.get_tracking_number)
            
            return {
                "order_id": order_id,
                "status": status,
                "tracking_number": tracking
            }
            
        except Exception as e:
            return {
                "order_id": order_id,
                "status": "unknown",
                "error": str(e)
            }
    
    async def wait_for_completion(
        self,
        order_id: str,
        timeout: timedelta = timedelta(minutes=30)
    ) -> OrderResult:
        """
        Wait for order to complete.
        
        Blocking - waits until workflow finishes.
        """
        workflow_id = f"order-{order_id}"
        
        handle = self.client.get_workflow_handle(workflow_id)
        
        try:
            result = await asyncio.wait_for(
                handle.result(),
                timeout=timeout.total_seconds()
            )
            return result
        except asyncio.TimeoutError:
            raise TimeoutError(f"Order {order_id} did not complete in {timeout}")
    
    async def cancel_order(self, order_id: str):
        """
        Cancel an order.
        
        Sends cancellation signal to workflow.
        """
        workflow_id = f"order-{order_id}"
        
        handle = self.client.get_workflow_handle(workflow_id)
        await handle.cancel()
    
    async def approve_order(self, order_id: str, approver: str):
        """Send approval signal to order waiting for approval."""
        workflow_id = f"order-{order_id}"
        
        handle = self.client.get_workflow_handle(workflow_id)
        await handle.signal(OrderWithApprovalWorkflow.approve, approver)


# FastAPI integration
from fastapi import FastAPI, HTTPException

app = FastAPI()
order_service: OrderService = None


@app.on_event("startup")
async def startup():
    global order_service
    client = await Client.connect("localhost:7233")
    order_service = OrderService(client)


@app.post("/orders")
async def create_order(request: OrderInput):
    """Create a new order."""
    workflow_id = await order_service.create_order(request)
    return {"workflow_id": workflow_id, "status": "started"}


@app.get("/orders/{order_id}")
async def get_order(order_id: str):
    """Get order status."""
    return await order_service.get_order_status(order_id)


@app.post("/orders/{order_id}/approve")
async def approve_order(order_id: str, approver: str):
    """Approve an order."""
    await order_service.approve_order(order_id, approver)
    return {"status": "approved"}

Part III: Real-World Application

Chapter 7: Case Studies

7.1 Case Study: Uber's Cadence (Temporal's Origin)

UBER CADENCE USAGE

Uber built Cadence (which became Temporal) for:

1. TRIP PROCESSING
   ├── Match rider to driver
   ├── Track driver arrival
   ├── Handle trip completion
   ├── Process payment
   └── Handle disputes

2. DRIVER ONBOARDING
   ├── Background check (days)
   ├── Document verification (hours)
   ├── Training completion (weeks)
   ├── Vehicle inspection (scheduled)
   └── Activation

3. SURGE PRICING
   ├── Monitor demand in real-time
   ├── Calculate pricing zones
   ├── Apply time-limited surges
   └── Automatic expiration

SCALE AT UBER:
├── 100,000+ workflow executions per second
├── Workflows running for months (subscriptions)
├── Sub-second activity completion
└── Cross-datacenter replication


KEY LEARNINGS:
├── Workflow-per-entity pattern (one workflow per trip, per driver)
├── Signals for external events (rider cancelled, driver arrived)
├── Queries for real-time status
└── Continue-as-new for long-running workflows

7.2 Case Study: Netflix Conductor

NETFLIX CONDUCTOR

Netflix's workflow orchestration (different from Temporal):

ARCHITECTURE:
├── JSON-based workflow definitions
├── REST API for operations
├── Pluggable task workers
└── Elasticsearch for search

USE CASES AT NETFLIX:

1. CONTENT ONBOARDING
   ├── Upload raw video
   ├── Transcode to multiple formats (parallel)
   ├── Generate thumbnails
   ├── Quality check
   ├── Metadata enrichment
   └── Publish to CDN

2. DATA PIPELINES
   ├── ETL workflows
   ├── ML model training
   ├── Report generation
   └── Analytics

3. INFRASTRUCTURE AUTOMATION
   ├── Server provisioning
   ├── Deployment pipelines
   ├── Chaos engineering tests
   └── Incident response


CONDUCTOR vs TEMPORAL:

Conductor:
├── JSON workflow definitions
├── REST-based
├── Simpler, less powerful
└── Good for microservices orchestration

Temporal:
├── Code-based workflow definitions
├── gRPC-based
├── More powerful (signals, queries, child workflows)
└── Better for complex business logic

7.3 Case Study: Stripe Payments

STRIPE-STYLE PAYMENT ORCHESTRATION

Complex payment flows benefit from workflow orchestration:

PAYMENT INTENT WORKFLOW:

┌────────────────────────────────────────────────────────────────────────┐
│                    PAYMENT INTENT STATE MACHINE                        │
│                                                                        │
│  ┌──────────┐    ┌──────────────┐    ┌──────────────┐    ┌──────────┐  │
│  │ Created  │───▶│ Processing   │───▶│ Requires     │───▶│Succeeded │  │
│  │          │    │              │    │ Action       │    │          │  │
│  └──────────┘    └──────────────┘    └──────────────┘    └──────────┘  │
│       │                │                    │                  │       │
│       │                │                    │                  │       │
│       ▼                ▼                    ▼                  ▼       │
│  ┌──────────┐    ┌──────────────┐    ┌──────────────┐    ┌──────────┐  │
│  │ Canceled │    │   Failed     │    │   Canceled   │    │Refunded  │  │
│  │          │    │              │    │              │    │(partial) │  │
│  └──────────┘    └──────────────┘    └──────────────┘    └──────────┘  │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

ACTIVITIES IN PAYMENT WORKFLOW:

1. Fraud Check
   ├── Call ML model for fraud score
   ├── Apply rules engine
   └── Decision: approve, review, block

2. Card Network Authorization
   ├── Call card network (Visa, Mastercard)
   ├── Handle 3DS if required
   └── Store authorization code

3. Capture
   ├── Settle authorized transaction
   ├── Transfer funds
   └── Update ledger

4. Refund
   ├── Validate refund amount
   ├── Reverse settlement
   └── Update ledger

DURABLE EXECUTION BENEFITS:
├── 3DS waits for customer action (minutes/hours)
├── Retry network failures automatically
├── Audit trail of every step
└── Easy investigation of failed payments

Chapter 8: Common Mistakes

8.1 Mistake 1: Non-Deterministic Workflow Code

❌ WRONG: Non-deterministic workflow

@workflow.defn
class BadWorkflow:
    @workflow.run
    async def run(self, input: dict) -> dict:
        # DON'T: Use random in workflow
        if random.random() > 0.5:
            await self.do_something()
        
        # DON'T: Use current time directly
        timestamp = datetime.now()
        
        # DON'T: Use UUID directly
        id = str(uuid.uuid4())
        
        # DON'T: Make HTTP calls in workflow
        response = await httpx.get("https://api.example.com/data")
        
        # On replay, these will produce DIFFERENT values
        # causing non-determinism errors!


✅ CORRECT: Deterministic workflow

@workflow.defn
class GoodWorkflow:
    @workflow.run
    async def run(self, input: dict) -> dict:
        # DO: Use workflow.random()
        if workflow.random().random() > 0.5:
            await self.do_something()
        
        # DO: Use workflow.now()
        timestamp = workflow.now()
        
        # DO: Use workflow.uuid4()
        id = str(workflow.uuid4())
        
        # DO: Use activities for side effects
        response = await workflow.execute_activity(
            fetch_data,
            args=["https://api.example.com/data"],
            start_to_close_timeout=timedelta(minutes=1)
        )

8.2 Mistake 2: Unbounded Workflow History

❌ WRONG: Workflow that grows forever

@workflow.defn
class BadSubscriptionWorkflow:
    @workflow.run
    async def run(self, customer_id: str) -> None:
        while True:
            await workflow.sleep(timedelta(days=30))
            
            # Each iteration adds events to history
            # After years, history becomes HUGE
            # Performance degrades, memory issues
            
            await workflow.execute_activity(
                charge_monthly,
                args=[customer_id],
                start_to_close_timeout=timedelta(minutes=5)
            )


✅ CORRECT: Use continue-as-new

@workflow.defn
class GoodSubscriptionWorkflow:
    @workflow.run
    async def run(self, customer_id: str, billing_count: int = 0) -> None:
        while billing_count < 12:  # One year max
            await workflow.sleep(timedelta(days=30))
            
            await workflow.execute_activity(
                charge_monthly,
                args=[customer_id],
                start_to_close_timeout=timedelta(minutes=5)
            )
            
            billing_count += 1
        
        # Continue with fresh history
        workflow.continue_as_new(customer_id, 0)

8.3 Mistake 3: Missing Activity Timeouts

❌ WRONG: No timeouts on activities

@workflow.defn
class BadWorkflow:
    @workflow.run
    async def run(self, input: dict) -> dict:
        # No timeout - could hang forever!
        result = await workflow.execute_activity(
            call_external_api,
            args=[input]
        )


✅ CORRECT: Always set appropriate timeouts

@workflow.defn
class GoodWorkflow:
    @workflow.run
    async def run(self, input: dict) -> dict:
        result = await workflow.execute_activity(
            call_external_api,
            args=[input],
            
            # Schedule-to-start: Time in task queue
            schedule_to_start_timeout=timedelta(minutes=5),
            
            # Start-to-close: Time for activity to complete
            start_to_close_timeout=timedelta(minutes=10),
            
            # Schedule-to-close: Total time from schedule to completion
            schedule_to_close_timeout=timedelta(minutes=15),
            
            # Heartbeat timeout for long activities
            heartbeat_timeout=timedelta(minutes=1),
            
            # Retry policy
            retry_policy=RetryPolicy(
                initial_interval=timedelta(seconds=1),
                maximum_interval=timedelta(minutes=1),
                maximum_attempts=5
            )
        )

8.4 Mistake Checklist

  • Workflow code is deterministic — No random, time, UUID, HTTP calls
  • Long workflows use continue-as-new — Prevent history bloat
  • All activities have timeouts — Never hang forever
  • Compensation activities retry forever — Must eventually succeed
  • Workflow IDs are idempotent — Use business IDs (order-123)
  • Signals are idempotent — Handle duplicate signals gracefully
  • Use versioning for changes — Don't break in-flight workflows

Part IV: Interview Preparation

Chapter 9: Interview Tips

9.1 Key Phrases

INTRODUCING WORKFLOW ORCHESTRATION:

"For this complex saga, I'd use a workflow orchestration 
engine like Temporal. It provides durable execution — the 
workflow survives any crash, automatically retries failures, 
and can wait for external events for years if needed."


EXPLAINING DURABLE EXECUTION:

"Durable execution means every step is recorded. If a worker 
crashes mid-workflow, a new worker picks up from exactly 
where it left off. Completed activities aren't re-executed — 
their results are replayed from the event history."


ON DETERMINISM:

"Workflow code must be deterministic because it replays on 
recovery. You can't use random numbers, current time, or make 
HTTP calls directly in workflow code. Those need to be in 
activities, which are recorded and replayed."


ON LONG-RUNNING WORKFLOWS:

"For workflows that run for months or years — like 
subscriptions — I'd use continue-as-new to keep the 
event history bounded. After N billing cycles, the 
workflow starts fresh with new history."


ON SIGNALS AND QUERIES:

"External events come in as signals — like a manager 
approving an order. The workflow waits for the signal, 
then continues. Queries let external systems check 
workflow state without blocking."

9.2 Common Questions

Question Good Answer
"How does Temporal handle crashes?" "Every activity completion is recorded in Temporal's database. On crash, the workflow replays from the beginning, but completed activities return their stored results instead of re-executing. The workflow continues from where it left off."
"What's the difference between workflows and activities?" "Workflows are deterministic orchestration code — they decide what to do next. Activities are where actual work happens — API calls, database operations. Workflows must be deterministic for replay; activities can have side effects."
"How do you handle workflows that run for years?" "Use continue-as-new to prevent unbounded history growth. After N iterations, the workflow completes and starts a new execution with fresh history, passing along necessary state."
"When would you use Temporal vs custom saga orchestrator?" "Custom orchestrators work for simple sagas. Temporal shines for complex workflows — long-running, waiting for external events, versioning, visibility. The operational burden of running Temporal is worth it for non-trivial workflows."

Chapter 10: Practice Problems

Problem 1: Travel Booking with Temporal

Setup: Design a travel booking workflow using Temporal that books flight + hotel + car.

Requirements:

  • Flight has immediate payment
  • Hotel has pre-authorization
  • Car is free cancellation
  • User can modify booking for 24 hours

Questions:

  1. What's the workflow structure?
  2. How do you handle the 24-hour modification window?
  3. What happens if the hotel pre-auth expires?
@workflow.defn
class TravelBookingWorkflow:
    @workflow.run
    async def run(self, booking: TravelBooking) -> BookingResult:
        # Book in order of cancellation difficulty
        car = await workflow.execute_activity(book_car, booking.car)
        hotel_auth = await workflow.execute_activity(preauth_hotel, booking.hotel)
        flight = await workflow.execute_activity(book_flight, booking.flight)  # PIVOT
        
        # Wait 24 hours for modifications
        try:
            await workflow.wait_condition(
                lambda: self.modification_request is not None,
                timeout=timedelta(hours=24)
            )
            if self.modification_request:
                await self._handle_modification()
        except asyncio.TimeoutError:
            pass  # No modifications, proceed
        
        # Capture hotel payment
        hotel = await workflow.execute_activity(
            capture_hotel,
            hotel_auth["auth_id"]
        )
        
        return BookingResult(flight=flight, hotel=hotel, car=car)
    
    @workflow.signal
    async def modify(self, modification: dict):
        self.modification_request = modification

Hotel pre-auth expiration: Set workflow timeout before pre-auth expires. If workflow hasn't completed, compensate and notify user.

Problem 2: Content Moderation Pipeline

Setup: Design a content moderation workflow for a social media platform.

Requirements:

  • Automatic ML moderation (fast)
  • Human review queue for edge cases
  • Appeals process
  • SLA: 4 hours for human review

Questions:

  1. How do you model the review queue?
  2. How do you handle the SLA?
  3. What signals does the workflow need?
@workflow.defn
class ContentModerationWorkflow:
    @workflow.run
    async def run(self, content_id: str) -> ModerationResult:
        # Step 1: ML moderation
        ml_result = await workflow.execute_activity(
            ml_moderate,
            content_id,
            start_to_close_timeout=timedelta(minutes=1)
        )
        
        if ml_result["confidence"] > 0.95:
            return ModerationResult(
                decision=ml_result["decision"],
                method="automatic"
            )
        
        # Step 2: Queue for human review
        await workflow.execute_activity(
            add_to_review_queue,
            content_id,
            ml_result
        )
        
        # Wait for human decision with SLA
        try:
            await workflow.wait_condition(
                lambda: self.human_decision is not None,
                timeout=timedelta(hours=4)
            )
        except asyncio.TimeoutError:
            # SLA breached - escalate
            await workflow.execute_activity(escalate_to_manager, content_id)
            await workflow.wait_condition(lambda: self.human_decision is not None)
        
        return ModerationResult(
            decision=self.human_decision,
            method="human_review"
        )
    
    @workflow.signal
    async def human_review_complete(self, decision: str, reviewer: str):
        self.human_decision = decision
        self.reviewer = reviewer
    
    @workflow.signal
    async def appeal(self, reason: str):
        self.appeal_requested = True
        self.appeal_reason = reason

Chapter 11: Sample Interview Dialogue

Scenario: Design a Loan Application System

Interviewer: "Design a loan application processing system. Applications need credit check, income verification, and manager approval for large amounts."

You: "This sounds like a complex workflow with external dependencies and human approval. I'd use Temporal for durable execution. Let me sketch the workflow..."

You draw on the whiteboard:

Loan Application Workflow:

1. Validate Application (activity)
2. Credit Check (activity - external API)
3. Income Verification (activity - may take days)
4. Risk Assessment (activity - ML model)
5. If amount > $50K: Wait for Manager Approval (signal)
6. Decision: Approve or Reject
7. If Approved: Fund Disbursement (activity)
8. Notification (activity)

You: "Each step is an activity except the manager approval, which is a signal. The workflow can wait for days for income verification or manager approval without holding resources."

Interviewer: "What if income verification takes a week?"

You: "No problem with Temporal. The workflow sleeps — no resources consumed. When verification completes, an external system sends a signal, and the workflow continues.

# Wait for income verification
await workflow.execute_activity(
    request_income_verification,
    application_id,
    start_to_close_timeout=timedelta(minutes=5)
)

# Wait for result (up to 14 days)
await workflow.wait_condition(
    lambda: self.income_verified is not None,
    timeout=timedelta(days=14)
)

If we don't get verification in 14 days, we auto-reject with a timeout."

Interviewer: "How do you handle the manager approval?"

You: "Manager approval is a signal. The workflow waits for the signal, and a separate UI allows managers to approve or reject. I'd also set an SLA:

if application.amount > 50000:
    self.status = 'pending_approval'
    
    try:
        await workflow.wait_condition(
            lambda: self.approval_decision is not None,
            timeout=timedelta(hours=24)
        )
    except asyncio.TimeoutError:
        # Escalate to senior manager
        await workflow.execute_activity(escalate_approval, application_id)
        await workflow.wait_condition(lambda: self.approval_decision is not None)

The workflow handles escalation automatically if no response in 24 hours."

Interviewer: "What about the credit check failing?"

You: "Credit check failure isn't an error — it's a business outcome. If the external API fails, the activity retries. If the credit check passes but score is low, the workflow rejects the application. I'd separate infrastructure failures from business logic:

try:
    credit_result = await workflow.execute_activity(
        check_credit,
        application_id,
        retry_policy=RetryPolicy(maximum_attempts=5)
    )
except ActivityError:
    # Infrastructure failure - need manual review
    self.status = 'manual_review_required'
    return

if credit_result['score'] < 600:
    # Business rejection
    return LoanResult(status='rejected', reason='credit_score')
```"

---

# Summary

DAY 3 KEY TAKEAWAYS

DURABLE EXECUTION: • Code survives any crash • Activities recorded, not re-executed on replay • Workflows can run for years • Event sourcing for workflow state

TEMPORAL ARCHITECTURE: • Workflows: Deterministic orchestration logic • Activities: Actual work with side effects • Signals: External events into workflow • Queries: Non-blocking state inspection • Task queues: Decouple scheduling from execution

DETERMINISM RULES: • No random, time, UUID in workflow code • Use workflow.random(), workflow.now(), workflow.uuid4() • All side effects in activities • Same input → same decisions

ADVANCED PATTERNS: • State machines for complex flows • Versioning for safe deployments • Continue-as-new for long-running workflows • Child workflows for parallelism • Signals for external events

PRODUCTION CONCERNS: • Set appropriate timeouts on all activities • Use idempotency keys for external calls • Monitor workflow history size • Plan for versioning from the start

DEFAULT APPROACH: • Simple sagas: Custom orchestrator • Complex workflows: Temporal • Long-running: Temporal with continue-as-new • Human-in-the-loop: Temporal with signals


---

# 📚 Further Reading

## Documentation
- **Temporal Documentation**: https://docs.temporal.io/
- **Temporal Python SDK**: https://github.com/temporalio/sdk-python

## Videos
- **Temporal 101**: https://learn.temporal.io/
- **Uber Cadence Talk**: Search "Cadence Uber" on YouTube

## Books
- **"Practical Process Automation"** by Bernd Rücker
- **"Flow Architectures"** by James Urquhart

## Engineering Blogs
- **Temporal Blog**: https://temporal.io/blog
- **Netflix Conductor**: https://netflix.github.io/conductor/

---

*End of Day 3: Saga Orchestration Deep Dive*

**Tomorrow:** Day 4 — Conflict Resolution. We'll learn about last-write-wins problems, vector clocks, and CRDTs for automatic conflict resolution in distributed systems.