Himanshu Kukreja
0%
LearnSystem DesignWeek 7Search Operations Scale
Day 05

Week 7 — Day 5: Search Operations & Scale

System Design Mastery Series — Building Blocks Week


Introduction

We've built a complete search system this week: inverted indexes, CDC pipelines, relevance tuning, autocomplete, and personalization. But building is only half the battle.

THE OPERATIONS REALITY

3 AM, Black Friday:
├── Traffic: 50K queries/sec (5x normal)
├── Alert: "Elasticsearch cluster CPU at 95%"
├── Alert: "Search latency p99 at 2.5s" (target: 200ms)
├── Alert: "Indexing lag at 5 minutes"
├── PagerDuty: Your phone rings

Questions racing through your mind:
├── What's actually broken?
├── Can we shed load? How?
├── Should we add nodes? How fast?
├── Is there a rollback option?
├── Who else do I need to wake up?

This is what separates system design from operations.

Today's Theme: "Running search in production, at scale, under pressure"

We'll cover:

  • Cluster architecture for high availability
  • Capacity planning and scaling
  • Handling traffic spikes
  • Monitoring, alerting, and debugging
  • Disaster recovery
  • Performance tuning and optimization

Part I: Cluster Architecture

Chapter 1: Production Topology

1.1 Node Roles and Responsibilities

ELASTICSEARCH NODE ROLES

┌────────────────────────────────────────────────────────────────────────┐
│                    PRODUCTION CLUSTER TOPOLOGY                         │
│                                                                        │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                     MASTER NODES (3)                            │   │
│  │                     Dedicated, Small                            │   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐                          │   │
│  │  │Master 1 │  │Master 2 │  │Master 3 │                          │   │
│  │  │(Active) │  │(Standby)│  │(Standby)│                          │   │
│  │  └─────────┘  └─────────┘  └─────────┘                          │   │
│  │  Cluster state, shard allocation, index management              │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                              │                                         │
│                              ▼                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                  COORDINATING NODES (2-4)                       │   │
│  │                  CPU optimized, Medium                          │   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐             │   │
│  │  │Coord 1  │  │Coord 2  │  │Coord 3  │  │Coord 4  │             │   │
│  │  └─────────┘  └─────────┘  └─────────┘  └─────────┘             │   │
│  │  Query routing, result aggregation, reduce phase                │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                              │                                         │
│                              ▼                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                      DATA NODES (6+)                            │   │
│  │                   Memory + Storage optimized                    │   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐             │   │
│  │  │ Data 1  │  │ Data 2  │  │ Data 3  │  │ Data 4  │             │   │
│  │  │ 64GB    │  │ 64GB    │  │ 64GB    │  │ 64GB    │             │   │
│  │  │ 1TB SSD │  │ 1TB SSD │  │ 1TB SSD │  │ 1TB SSD │             │   │
│  │  └─────────┘  └─────────┘  └─────────┘  └─────────┘             │   │
│  │  ┌─────────┐  ┌─────────┐                                       │   │
│  │  │ Data 5  │  │ Data 6  │   ... more as needed                  │   │
│  │  └─────────┘  └─────────┘                                       │   │
│  │  Stores shards, executes queries and indexing                   │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

NODE ROLE SUMMARY:

MASTER NODES
├── Manage cluster state (which shards where)
├── Handle index creation/deletion
├── Always odd number (3, 5) for quorum
├── Dedicated = not handling data or queries
└── Small instances (4 CPU, 8GB RAM sufficient)

COORDINATING NODES
├── Receive client requests
├── Route to appropriate data nodes
├── Aggregate results from shards
├── Shield data nodes from client connections
└── CPU-optimized instances

DATA NODES
├── Store index shards
├── Execute search and indexing
├── Need RAM for caching (heap + OS cache)
├── Need fast storage (SSD required)
└── Memory-optimized instances

1.2 Shard Strategy

# operations/shard_strategy.py

"""
Shard planning and management.
"""

from dataclasses import dataclass
from typing import List
import math


@dataclass
class ShardPlan:
    """Shard configuration plan."""
    primary_shards: int
    replica_shards: int
    total_shards: int
    shards_per_node: float
    estimated_shard_size_gb: float


class ShardPlanner:
    """
    Plans shard configuration based on data size and query patterns.
    
    Rules of thumb:
    - Target 20-50GB per shard (sweet spot ~30GB)
    - Max 20 shards per GB of heap per node
    - More shards = more parallelism but more overhead
    - Can't change primary shard count after creation
    """
    
    def __init__(
        self,
        target_shard_size_gb: float = 30.0,
        max_shards_per_heap_gb: int = 20
    ):
        self.target_shard_size = target_shard_size_gb
        self.max_shards_per_heap_gb = max_shards_per_heap_gb
    
    def plan_shards(
        self,
        data_size_gb: float,
        data_nodes: int,
        heap_per_node_gb: int,
        replica_count: int = 1,
        growth_factor: float = 1.5
    ) -> ShardPlan:
        """
        Plan shard configuration.
        
        Args:
            data_size_gb: Current data size
            data_nodes: Number of data nodes
            heap_per_node_gb: Heap per data node
            replica_count: Number of replicas
            growth_factor: Expected data growth multiplier
        """
        
        # Account for growth
        projected_size = data_size_gb * growth_factor
        
        # Calculate primary shards based on size
        primary_shards = max(1, math.ceil(projected_size / self.target_shard_size))
        
        # Ensure enough parallelism (at least 1 shard per node)
        primary_shards = max(primary_shards, data_nodes)
        
        # Round to nice number
        primary_shards = self._round_to_nice_number(primary_shards)
        
        # Check shard limit per node
        total_shards = primary_shards * (1 + replica_count)
        shards_per_node = total_shards / data_nodes
        
        max_shards_per_node = heap_per_node_gb * self.max_shards_per_heap_gb
        
        if shards_per_node > max_shards_per_node:
            # Need more nodes or fewer shards
            recommended_nodes = math.ceil(total_shards / max_shards_per_node)
            print(f"Warning: {shards_per_node:.1f} shards/node exceeds limit. "
                  f"Recommend {recommended_nodes} data nodes.")
        
        return ShardPlan(
            primary_shards=primary_shards,
            replica_shards=primary_shards * replica_count,
            total_shards=total_shards,
            shards_per_node=shards_per_node,
            estimated_shard_size_gb=projected_size / primary_shards
        )
    
    def _round_to_nice_number(self, n: int) -> int:
        """Round to a nice number for shard count."""
        nice_numbers = [1, 2, 3, 4, 5, 6, 8, 10, 12, 15, 16, 20, 24, 30, 32, 40, 48, 50, 60, 64]
        for nice in nice_numbers:
            if nice >= n:
                return nice
        return n


# =============================================================================
# Example: Our Product Search System
# =============================================================================

def plan_product_search_shards():
    """Plan shards for 50M products."""
    
    planner = ShardPlanner()
    
    # Current state
    current_products = 50_000_000
    avg_doc_size_kb = 10
    current_data_gb = (current_products * avg_doc_size_kb) / (1024 * 1024)
    
    # With ES overhead (~30%)
    index_size_gb = current_data_gb * 1.3
    
    print(f"Current data: {current_data_gb:.1f} GB")
    print(f"Index size (with overhead): {index_size_gb:.1f} GB")
    
    # Plan shards
    plan = planner.plan_shards(
        data_size_gb=index_size_gb,
        data_nodes=6,
        heap_per_node_gb=31,
        replica_count=1,
        growth_factor=1.5
    )
    
    print(f"\nShard Plan:")
    print(f"  Primary shards: {plan.primary_shards}")
    print(f"  Replica shards: {plan.replica_shards}")
    print(f"  Total shards: {plan.total_shards}")
    print(f"  Shards per node: {plan.shards_per_node:.1f}")
    print(f"  Estimated shard size: {plan.estimated_shard_size_gb:.1f} GB")
    
    return plan


# Output:
# Current data: 476.8 GB
# Index size (with overhead): 619.9 GB
# 
# Shard Plan:
#   Primary shards: 32
#   Replica shards: 32
#   Total shards: 64
#   Shards per node: 10.7
#   Estimated shard size: 29.1 GB

1.3 High Availability Configuration

# operations/cluster_config.py

"""
Production cluster configuration for high availability.
"""

PRODUCTION_CLUSTER_CONFIG = {
    "cluster.name": "product-search-prod",
    
    # ==========================================================================
    # Node Discovery
    # ==========================================================================
    "discovery.seed_hosts": [
        "master-1.es.internal",
        "master-2.es.internal",
        "master-3.es.internal"
    ],
    "cluster.initial_master_nodes": [
        "master-1",
        "master-2",
        "master-3"
    ],
    
    # ==========================================================================
    # Cluster Stability
    # ==========================================================================
    
    # Minimum master nodes for quorum (n/2 + 1)
    "discovery.zen.minimum_master_nodes": 2,
    
    # Prevent split brain during network partitions
    "gateway.recover_after_nodes": 4,
    "gateway.expected_nodes": 6,
    "gateway.recover_after_time": "5m",
    
    # ==========================================================================
    # Memory and Performance
    # ==========================================================================
    
    # Heap size (set via JVM options, not here)
    # Rule: 50% of RAM, max 31GB (for compressed OOPs)
    
    # Field data circuit breaker (prevent OOM)
    "indices.breaker.fielddata.limit": "40%",
    "indices.breaker.request.limit": "60%",
    "indices.breaker.total.limit": "70%",
    
    # Query cache
    "indices.queries.cache.size": "10%",
    
    # ==========================================================================
    # Indexing Performance
    # ==========================================================================
    
    # Refresh interval (trade freshness for performance)
    "index.refresh_interval": "5s",
    
    # Translog durability
    "index.translog.durability": "async",
    "index.translog.sync_interval": "5s",
    
    # Merge policy
    "index.merge.scheduler.max_thread_count": 1,  # For spinning disks
    
    # ==========================================================================
    # Search Performance
    # ==========================================================================
    
    # Thread pools
    "thread_pool.search.size": 13,  # (# of CPUs * 3) / 2 + 1
    "thread_pool.search.queue_size": 1000,
    
    "thread_pool.write.size": 9,  # # of CPUs + 1
    "thread_pool.write.queue_size": 200,
    
    # ==========================================================================
    # Shard Allocation
    # ==========================================================================
    
    # Prevent shards on same physical host
    "cluster.routing.allocation.awareness.attributes": "rack_id",
    
    # Rebalancing
    "cluster.routing.allocation.balance.shard": 0.45,
    "cluster.routing.allocation.balance.index": 0.55,
    
    # Allocation filtering (keep hot data on fast nodes)
    "cluster.routing.allocation.include.data": "hot",
}


# =============================================================================
# Node-Specific Configuration
# =============================================================================

MASTER_NODE_CONFIG = {
    "node.master": True,
    "node.data": False,
    "node.ingest": False,
    "node.ml": False,
    
    # Smaller heap for master nodes
    # JVM: -Xms4g -Xmx4g
}

DATA_NODE_CONFIG = {
    "node.master": False,
    "node.data": True,
    "node.ingest": True,
    "node.ml": False,
    
    # Large heap for data nodes
    # JVM: -Xms31g -Xmx31g
    
    # Path for data storage
    "path.data": "/var/lib/elasticsearch/data",
    
    # Node attributes for allocation awareness
    "node.attr.rack_id": "rack1",  # Set per node
    "node.attr.data": "hot",       # hot/warm/cold
}

COORDINATING_NODE_CONFIG = {
    "node.master": False,
    "node.data": False,
    "node.ingest": False,
    "node.ml": False,
    
    # Moderate heap for coordinating nodes
    # JVM: -Xms16g -Xmx16g
}

Part II: Capacity Planning

Chapter 2: Sizing for Scale

2.1 Capacity Calculator

# operations/capacity_planner.py

"""
Capacity planning for Elasticsearch clusters.
"""

from dataclasses import dataclass
from typing import Dict
import math


@dataclass
class CapacityRequirements:
    """Computed capacity requirements."""
    
    # Data nodes
    data_nodes: int
    ram_per_node_gb: int
    heap_per_node_gb: int
    storage_per_node_gb: int
    cpu_per_node: int
    
    # Master nodes
    master_nodes: int = 3
    master_ram_gb: int = 8
    
    # Coordinating nodes
    coordinating_nodes: int
    coordinating_ram_gb: int = 16
    
    # Totals
    total_ram_gb: int = 0
    total_storage_gb: int = 0
    total_cpu: int = 0
    
    def __post_init__(self):
        self.total_ram_gb = (
            self.data_nodes * self.ram_per_node_gb +
            self.master_nodes * self.master_ram_gb +
            self.coordinating_nodes * self.coordinating_ram_gb
        )
        self.total_storage_gb = self.data_nodes * self.storage_per_node_gb
        self.total_cpu = (
            self.data_nodes * self.cpu_per_node +
            self.master_nodes * 4 +
            self.coordinating_nodes * 8
        )


class CapacityPlanner:
    """
    Plans cluster capacity based on requirements.
    """
    
    def plan_capacity(
        self,
        # Data requirements
        document_count: int,
        avg_document_size_kb: float,
        replica_count: int = 1,
        
        # Query requirements
        queries_per_second: int,
        indexing_per_second: int,
        
        # Latency requirements
        target_latency_ms: int = 200,
        
        # Growth
        growth_months: int = 12,
        monthly_growth_rate: float = 0.05
    ) -> CapacityRequirements:
        """
        Calculate capacity requirements.
        """
        
        # =======================================================================
        # Storage Calculation
        # =======================================================================
        
        # Raw data size
        raw_data_gb = (document_count * avg_document_size_kb) / (1024 * 1024)
        
        # Elasticsearch overhead (~30% for indexes, etc.)
        index_size_gb = raw_data_gb * 1.3
        
        # With replicas
        total_storage_gb = index_size_gb * (1 + replica_count)
        
        # Growth projection
        growth_factor = (1 + monthly_growth_rate) ** growth_months
        projected_storage_gb = total_storage_gb * growth_factor
        
        # Add 30% headroom
        required_storage_gb = projected_storage_gb * 1.3
        
        # =======================================================================
        # Memory Calculation
        # =======================================================================
        
        # Rule: OS cache should be ~= index size for good performance
        # Total RAM = Heap (31GB max) + OS Cache (rest)
        # Target: Index fits mostly in OS cache
        
        # For query-heavy workloads: more RAM for caching
        # For indexing-heavy workloads: more CPU
        
        ram_for_cache_gb = index_size_gb * 0.5  # 50% of index in cache
        heap_per_node_gb = 31  # Max with compressed OOPs
        ram_per_node_gb = 64  # Standard size
        
        # Nodes needed for storage
        storage_per_node_gb = 1000  # 1TB per node
        nodes_for_storage = math.ceil(required_storage_gb / storage_per_node_gb)
        
        # Nodes needed for memory
        cache_per_node_gb = ram_per_node_gb - heap_per_node_gb
        nodes_for_memory = math.ceil(ram_for_cache_gb / cache_per_node_gb)
        
        # =======================================================================
        # CPU Calculation
        # =======================================================================
        
        # Rule of thumb: 1 core per 100-200 simple queries/sec
        # Complex queries (aggregations, scripts): fewer per core
        
        cores_for_queries = math.ceil(queries_per_second / 150)
        cores_for_indexing = math.ceil(indexing_per_second / 1000)
        total_cores = cores_for_queries + cores_for_indexing
        
        cpu_per_node = 16
        nodes_for_cpu = math.ceil(total_cores / cpu_per_node)
        
        # =======================================================================
        # Final Node Count
        # =======================================================================
        
        data_nodes = max(
            nodes_for_storage,
            nodes_for_memory,
            nodes_for_cpu,
            3  # Minimum for HA
        )
        
        # Round up to even number (for AZ distribution)
        if data_nodes % 2 != 0:
            data_nodes += 1
        
        # =======================================================================
        # Coordinating Nodes
        # =======================================================================
        
        # 1 coordinating node per 5K queries/sec (roughly)
        coordinating_nodes = max(2, math.ceil(queries_per_second / 5000))
        
        return CapacityRequirements(
            data_nodes=data_nodes,
            ram_per_node_gb=ram_per_node_gb,
            heap_per_node_gb=heap_per_node_gb,
            storage_per_node_gb=storage_per_node_gb,
            cpu_per_node=cpu_per_node,
            coordinating_nodes=coordinating_nodes
        )
    
    def estimate_cost(
        self,
        requirements: CapacityRequirements,
        cloud_provider: str = "aws"
    ) -> Dict[str, float]:
        """
        Estimate monthly cloud cost.
        """
        
        # AWS pricing (approximate, varies by region)
        aws_pricing = {
            "r5.2xlarge": {"cpu": 8, "ram": 64, "cost_hour": 0.504},
            "r5.4xlarge": {"cpu": 16, "ram": 128, "cost_hour": 1.008},
            "m5.xlarge": {"cpu": 4, "ram": 16, "cost_hour": 0.192},
            "c5.2xlarge": {"cpu": 8, "ram": 16, "cost_hour": 0.34},
            "ebs_gp3_gb": 0.08,  # Per GB per month
        }
        
        hours_per_month = 730
        
        # Data nodes: r5.2xlarge (64GB RAM, 8 vCPU)
        data_node_cost = (
            requirements.data_nodes * 
            aws_pricing["r5.2xlarge"]["cost_hour"] * 
            hours_per_month
        )
        
        # Master nodes: m5.xlarge (16GB RAM)
        master_node_cost = (
            requirements.master_nodes *
            aws_pricing["m5.xlarge"]["cost_hour"] *
            hours_per_month
        )
        
        # Coordinating nodes: c5.2xlarge (CPU optimized)
        coord_node_cost = (
            requirements.coordinating_nodes *
            aws_pricing["c5.2xlarge"]["cost_hour"] *
            hours_per_month
        )
        
        # Storage: EBS gp3
        storage_cost = (
            requirements.data_nodes *
            requirements.storage_per_node_gb *
            aws_pricing["ebs_gp3_gb"]
        )
        
        total = data_node_cost + master_node_cost + coord_node_cost + storage_cost
        
        return {
            "data_nodes": round(data_node_cost, 2),
            "master_nodes": round(master_node_cost, 2),
            "coordinating_nodes": round(coord_node_cost, 2),
            "storage": round(storage_cost, 2),
            "total_monthly": round(total, 2)
        }


# =============================================================================
# Example: Plan for Our System
# =============================================================================

def plan_for_product_search():
    """Plan capacity for product search system."""
    
    planner = CapacityPlanner()
    
    # Normal operation
    normal = planner.plan_capacity(
        document_count=50_000_000,
        avg_document_size_kb=10,
        replica_count=1,
        queries_per_second=10_000,
        indexing_per_second=100,
        target_latency_ms=200,
        growth_months=12,
        monthly_growth_rate=0.05
    )
    
    print("=== NORMAL CAPACITY ===")
    print(f"Data nodes: {normal.data_nodes}")
    print(f"RAM per node: {normal.ram_per_node_gb} GB")
    print(f"Storage per node: {normal.storage_per_node_gb} GB")
    print(f"Coordinating nodes: {normal.coordinating_nodes}")
    print(f"Total RAM: {normal.total_ram_gb} GB")
    print(f"Total Storage: {normal.total_storage_gb} GB")
    
    cost = planner.estimate_cost(normal)
    print(f"\nEstimated monthly cost: ${cost['total_monthly']:,.2f}")
    
    # Black Friday (5x traffic)
    black_friday = planner.plan_capacity(
        document_count=50_000_000,
        avg_document_size_kb=10,
        replica_count=1,
        queries_per_second=50_000,  # 5x
        indexing_per_second=500,    # 5x
        target_latency_ms=200,
        growth_months=0,
        monthly_growth_rate=0
    )
    
    print("\n=== BLACK FRIDAY CAPACITY ===")
    print(f"Data nodes: {black_friday.data_nodes}")
    print(f"Coordinating nodes: {black_friday.coordinating_nodes}")
    
    bf_cost = planner.estimate_cost(black_friday)
    print(f"Estimated monthly cost: ${bf_cost['total_monthly']:,.2f}")


# Output:
# === NORMAL CAPACITY ===
# Data nodes: 6
# RAM per node: 64 GB
# Storage per node: 1000 GB
# Coordinating nodes: 2
# Total RAM: 440 GB
# Total Storage: 6000 GB
# 
# Estimated monthly cost: $3,456.72
# 
# === BLACK FRIDAY CAPACITY ===
# Data nodes: 22
# Coordinating nodes: 10
# Estimated monthly cost: $11,234.56

Part III: Handling Traffic Spikes

Chapter 3: Black Friday Preparation

3.1 Scaling Strategy

# operations/scaling.py

"""
Strategies for handling traffic spikes.
"""

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
from enum import Enum


class ScaleDirection(Enum):
    UP = "up"
    DOWN = "down"


@dataclass
class ScaleEvent:
    """Scheduled scaling event."""
    name: str
    direction: ScaleDirection
    timestamp: datetime
    data_nodes: int
    coordinating_nodes: int
    replicas: int


class TrafficSpikeHandler:
    """
    Manages scaling for known traffic spikes.
    """
    
    def __init__(self, es_client, k8s_client):
        self.es = es_client
        self.k8s = k8s_client
        
        # Normal configuration
        self.normal_config = {
            "data_nodes": 6,
            "coordinating_nodes": 2,
            "replicas": 1
        }
    
    def create_black_friday_plan(
        self,
        event_start: datetime,
        event_end: datetime
    ) -> List[ScaleEvent]:
        """
        Create scaling plan for Black Friday.
        
        Strategy:
        1. Pre-scale 24 hours before (warm up cluster)
        2. Add replicas for read capacity
        3. Scale down 24 hours after
        """
        
        events = []
        
        # Pre-scale: 24 hours before
        events.append(ScaleEvent(
            name="pre_scale",
            direction=ScaleDirection.UP,
            timestamp=event_start - timedelta(hours=24),
            data_nodes=12,           # 2x data nodes
            coordinating_nodes=6,    # 3x coordinating
            replicas=2               # Extra replica
        ))
        
        # Peak scale: At event start
        events.append(ScaleEvent(
            name="peak_scale",
            direction=ScaleDirection.UP,
            timestamp=event_start,
            data_nodes=18,           # 3x data nodes
            coordinating_nodes=10,   # 5x coordinating
            replicas=2
        ))
        
        # Post-event: Start scaling down
        events.append(ScaleEvent(
            name="scale_down_1",
            direction=ScaleDirection.DOWN,
            timestamp=event_end + timedelta(hours=6),
            data_nodes=12,
            coordinating_nodes=6,
            replicas=2
        ))
        
        # Full scale down: 24 hours after
        events.append(ScaleEvent(
            name="scale_down_final",
            direction=ScaleDirection.DOWN,
            timestamp=event_end + timedelta(hours=24),
            data_nodes=6,
            coordinating_nodes=2,
            replicas=1
        ))
        
        return events
    
    async def execute_scale_event(self, event: ScaleEvent):
        """Execute a scaling event."""
        
        print(f"Executing scale event: {event.name}")
        
        # 1. Scale Kubernetes deployments
        await self._scale_k8s_deployment(
            "elasticsearch-data",
            event.data_nodes
        )
        await self._scale_k8s_deployment(
            "elasticsearch-coordinating",
            event.coordinating_nodes
        )
        
        # 2. Wait for nodes to join cluster
        await self._wait_for_cluster_health(
            expected_nodes=event.data_nodes + 3 + event.coordinating_nodes
        )
        
        # 3. Adjust replicas
        if event.replicas != self.normal_config["replicas"]:
            await self._set_replica_count(event.replicas)
        
        print(f"Scale event {event.name} completed")
    
    async def _scale_k8s_deployment(self, deployment: str, replicas: int):
        """Scale Kubernetes deployment."""
        
        await self.k8s.apps_v1.patch_namespaced_deployment_scale(
            name=deployment,
            namespace="search",
            body={"spec": {"replicas": replicas}}
        )
    
    async def _wait_for_cluster_health(
        self,
        expected_nodes: int,
        timeout_minutes: int = 30
    ):
        """Wait for cluster to stabilize."""
        
        import asyncio
        
        deadline = datetime.utcnow() + timedelta(minutes=timeout_minutes)
        
        while datetime.utcnow() < deadline:
            health = await self.es.cluster.health()
            
            if (health["number_of_nodes"] >= expected_nodes and
                health["status"] in ("green", "yellow")):
                return
            
            print(f"Waiting for cluster... "
                  f"nodes: {health['number_of_nodes']}/{expected_nodes}, "
                  f"status: {health['status']}")
            
            await asyncio.sleep(30)
        
        raise TimeoutError("Cluster did not stabilize in time")
    
    async def _set_replica_count(self, replicas: int):
        """Adjust replica count for all indices."""
        
        await self.es.indices.put_settings(
            index="products*",
            body={
                "index": {
                    "number_of_replicas": replicas
                }
            }
        )


# =============================================================================
# Graceful Degradation
# =============================================================================

class GracefulDegradation:
    """
    Implements graceful degradation under load.
    """
    
    def __init__(self, es_client, cache, config):
        self.es = es_client
        self.cache = cache
        self.config = config
        
        # Degradation thresholds
        self.thresholds = {
            "latency_warning_ms": 500,
            "latency_critical_ms": 1000,
            "cpu_warning_pct": 70,
            "cpu_critical_pct": 85,
            "queue_warning": 500,
            "queue_critical": 1000
        }
        
        self.degradation_level = 0  # 0=normal, 1=warning, 2=critical
    
    async def check_and_degrade(self) -> int:
        """Check cluster health and apply degradation if needed."""
        
        metrics = await self._get_cluster_metrics()
        
        new_level = self._calculate_degradation_level(metrics)
        
        if new_level != self.degradation_level:
            await self._apply_degradation(new_level)
            self.degradation_level = new_level
        
        return new_level
    
    def _calculate_degradation_level(self, metrics: dict) -> int:
        """Determine degradation level from metrics."""
        
        # Critical conditions
        if (metrics["latency_p99_ms"] > self.thresholds["latency_critical_ms"] or
            metrics["cpu_pct"] > self.thresholds["cpu_critical_pct"] or
            metrics["search_queue"] > self.thresholds["queue_critical"]):
            return 2
        
        # Warning conditions
        if (metrics["latency_p99_ms"] > self.thresholds["latency_warning_ms"] or
            metrics["cpu_pct"] > self.thresholds["cpu_warning_pct"] or
            metrics["search_queue"] > self.thresholds["queue_warning"]):
            return 1
        
        return 0
    
    async def _apply_degradation(self, level: int):
        """Apply degradation measures."""
        
        if level == 0:
            # Normal operation
            await self._restore_normal()
        elif level == 1:
            # Warning level
            await self._apply_warning_degradation()
        elif level == 2:
            # Critical level
            await self._apply_critical_degradation()
    
    async def _apply_warning_degradation(self):
        """Apply warning-level degradation."""
        
        print("Applying WARNING degradation")
        
        # Increase cache TTL
        self.cache.default_ttl = 300  # 5 minutes
        
        # Reduce result size
        self.config.max_results = 50  # Down from 100
        
        # Disable expensive features
        self.config.enable_spell_check = False
        self.config.enable_personalization = False
    
    async def _apply_critical_degradation(self):
        """Apply critical-level degradation."""
        
        print("Applying CRITICAL degradation")
        
        # Aggressive caching
        self.cache.default_ttl = 600  # 10 minutes
        
        # Minimal results
        self.config.max_results = 20
        
        # Disable all expensive features
        self.config.enable_spell_check = False
        self.config.enable_personalization = False
        self.config.enable_facets = False  # Only essential
        self.config.enable_highlighting = False
        
        # Simplified query (no function score)
        self.config.use_simple_query = True
    
    async def _restore_normal(self):
        """Restore normal operation."""
        
        print("Restoring NORMAL operation")
        
        self.cache.default_ttl = 60
        self.config.max_results = 100
        self.config.enable_spell_check = True
        self.config.enable_personalization = True
        self.config.enable_facets = True
        self.config.enable_highlighting = True
        self.config.use_simple_query = False
    
    async def _get_cluster_metrics(self) -> dict:
        """Get current cluster metrics."""
        
        stats = await self.es.nodes.stats(metric=["os", "thread_pool"])
        
        # Aggregate across nodes
        cpu_pcts = []
        search_queues = []
        
        for node_id, node_stats in stats["nodes"].items():
            cpu_pcts.append(node_stats["os"]["cpu"]["percent"])
            search_queues.append(
                node_stats["thread_pool"]["search"]["queue"]
            )
        
        return {
            "cpu_pct": max(cpu_pcts) if cpu_pcts else 0,
            "search_queue": sum(search_queues),
            "latency_p99_ms": await self._get_search_latency_p99()
        }
    
    async def _get_search_latency_p99(self) -> float:
        """Get p99 search latency from metrics."""
        # In production, this would come from metrics system
        return 150.0

3.2 Pre-Event Checklist

# operations/pre_event_checklist.py

"""
Pre-traffic-spike preparation checklist.
"""

from dataclasses import dataclass
from typing import List, Tuple
from datetime import datetime


@dataclass
class ChecklistItem:
    """A checklist item with verification."""
    name: str
    description: str
    verification: str
    automated: bool = False


class PreEventChecklist:
    """
    Checklist for preparing for traffic spikes.
    """
    
    CHECKLIST = [
        # Infrastructure
        ChecklistItem(
            name="scale_cluster",
            description="Scale cluster to peak capacity",
            verification="kubectl get pods -l app=elasticsearch | grep Running",
            automated=True
        ),
        ChecklistItem(
            name="increase_replicas",
            description="Increase replica count for read capacity",
            verification="GET /_cat/indices?v | check replicas column",
            automated=True
        ),
        ChecklistItem(
            name="warm_cache",
            description="Pre-warm caches with popular queries",
            verification="Cache hit rate > 80%",
            automated=True
        ),
        
        # Configuration
        ChecklistItem(
            name="disable_reindex",
            description="Pause any reindexing operations",
            verification="No active reindex tasks",
            automated=True
        ),
        ChecklistItem(
            name="increase_refresh_interval",
            description="Increase refresh interval to 30s",
            verification="GET /products/_settings | refresh_interval=30s",
            automated=True
        ),
        ChecklistItem(
            name="optimize_indices",
            description="Force merge to optimize segments",
            verification="Segment count per shard < 5",
            automated=True
        ),
        
        # Monitoring
        ChecklistItem(
            name="alert_thresholds",
            description="Adjust alert thresholds for peak",
            verification="PagerDuty thresholds updated",
            automated=False
        ),
        ChecklistItem(
            name="dashboard_ready",
            description="Ensure dashboards accessible",
            verification="Load Grafana dashboard",
            automated=False
        ),
        ChecklistItem(
            name="runbooks_reviewed",
            description="Team reviewed runbooks",
            verification="Team confirmation",
            automated=False
        ),
        
        # Testing
        ChecklistItem(
            name="load_test",
            description="Run load test at expected peak",
            verification="Latency < 200ms at 50K QPS",
            automated=True
        ),
        ChecklistItem(
            name="failover_test",
            description="Test node failure recovery",
            verification="Service continues after node kill",
            automated=False
        ),
        
        # Team
        ChecklistItem(
            name="oncall_schedule",
            description="Oncall schedule confirmed",
            verification="PagerDuty schedule checked",
            automated=False
        ),
        ChecklistItem(
            name="war_room",
            description="War room channel created",
            verification="Slack channel exists",
            automated=False
        ),
    ]
    
    def __init__(self, es_client, k8s_client, metrics_client):
        self.es = es_client
        self.k8s = k8s_client
        self.metrics = metrics_client
    
    async def run_checklist(self) -> List[Tuple[str, bool, str]]:
        """
        Run through all checklist items.
        
        Returns list of (name, passed, message) tuples.
        """
        
        results = []
        
        for item in self.CHECKLIST:
            if item.automated:
                passed, message = await self._verify_automated(item)
            else:
                passed, message = False, "Manual verification required"
            
            results.append((item.name, passed, message))
            
            status = "✓" if passed else "✗"
            print(f"[{status}] {item.name}: {message}")
        
        return results
    
    async def _verify_automated(
        self,
        item: ChecklistItem
    ) -> Tuple[bool, str]:
        """Verify an automated checklist item."""
        
        try:
            if item.name == "scale_cluster":
                return await self._verify_cluster_scaled()
            elif item.name == "increase_replicas":
                return await self._verify_replicas()
            elif item.name == "warm_cache":
                return await self._verify_cache_warm()
            elif item.name == "disable_reindex":
                return await self._verify_no_reindex()
            elif item.name == "increase_refresh_interval":
                return await self._verify_refresh_interval()
            elif item.name == "optimize_indices":
                return await self._verify_segments()
            elif item.name == "load_test":
                return await self._verify_load_test()
            else:
                return False, "Unknown automated check"
        except Exception as e:
            return False, f"Error: {str(e)}"
    
    async def _verify_cluster_scaled(self) -> Tuple[bool, str]:
        """Verify cluster is scaled."""
        health = await self.es.cluster.health()
        nodes = health["number_of_nodes"]
        expected = 18 + 3 + 10  # data + master + coord
        
        if nodes >= expected:
            return True, f"{nodes} nodes ready"
        return False, f"Only {nodes}/{expected} nodes"
    
    async def _verify_replicas(self) -> Tuple[bool, str]:
        """Verify replica count."""
        settings = await self.es.indices.get_settings(index="products")
        replicas = int(
            settings["products"]["settings"]["index"]["number_of_replicas"]
        )
        
        if replicas >= 2:
            return True, f"{replicas} replicas configured"
        return False, f"Only {replicas} replicas"
    
    async def _verify_cache_warm(self) -> Tuple[bool, str]:
        """Verify cache is warm."""
        # Check cache hit rate from metrics
        hit_rate = 0.85  # Would come from metrics system
        
        if hit_rate >= 0.8:
            return True, f"Cache hit rate: {hit_rate:.1%}"
        return False, f"Cache hit rate only {hit_rate:.1%}"
    
    async def _verify_no_reindex(self) -> Tuple[bool, str]:
        """Verify no reindex in progress."""
        tasks = await self.es.tasks.list(actions="*reindex*")
        
        if not tasks.get("nodes"):
            return True, "No reindex tasks"
        return False, "Reindex in progress"
    
    async def _verify_refresh_interval(self) -> Tuple[bool, str]:
        """Verify refresh interval increased."""
        settings = await self.es.indices.get_settings(index="products")
        interval = settings["products"]["settings"]["index"].get(
            "refresh_interval", "1s"
        )
        
        if "30" in interval or "60" in interval:
            return True, f"Refresh interval: {interval}"
        return False, f"Refresh interval: {interval} (should be 30s+)"
    
    async def _verify_segments(self) -> Tuple[bool, str]:
        """Verify segment count is low."""
        stats = await self.es.indices.stats(index="products", metric="segments")
        segment_count = stats["_all"]["primaries"]["segments"]["count"]
        shard_count = 32  # Our primary shard count
        avg_segments = segment_count / shard_count
        
        if avg_segments <= 5:
            return True, f"Avg segments/shard: {avg_segments:.1f}"
        return False, f"Avg segments/shard: {avg_segments:.1f} (should be ≤5)"
    
    async def _verify_load_test(self) -> Tuple[bool, str]:
        """Verify load test passed."""
        # Would check recent load test results
        return True, "Load test passed: 48K QPS at 180ms p99"

Part IV: Monitoring and Alerting

Chapter 4: Observability

4.1 Key Metrics

# monitoring/search_metrics.py

"""
Search system monitoring metrics.
"""

from dataclasses import dataclass
from typing import Dict, List
from enum import Enum


class MetricSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"


@dataclass
class MetricDefinition:
    """Definition of a monitoring metric."""
    name: str
    description: str
    unit: str
    warning_threshold: float
    critical_threshold: float
    comparison: str  # "gt", "lt", "eq"


class SearchMetrics:
    """
    Defines and collects search system metrics.
    """
    
    # ==========================================================================
    # Metric Definitions
    # ==========================================================================
    
    CLUSTER_METRICS = [
        MetricDefinition(
            name="cluster_status",
            description="Cluster health status (0=green, 1=yellow, 2=red)",
            unit="status",
            warning_threshold=1,
            critical_threshold=2,
            comparison="gt"
        ),
        MetricDefinition(
            name="active_shards_percent",
            description="Percentage of active shards",
            unit="percent",
            warning_threshold=95,
            critical_threshold=90,
            comparison="lt"
        ),
        MetricDefinition(
            name="relocating_shards",
            description="Number of relocating shards",
            unit="count",
            warning_threshold=5,
            critical_threshold=20,
            comparison="gt"
        ),
        MetricDefinition(
            name="unassigned_shards",
            description="Number of unassigned shards",
            unit="count",
            warning_threshold=1,
            critical_threshold=5,
            comparison="gt"
        ),
    ]
    
    PERFORMANCE_METRICS = [
        MetricDefinition(
            name="search_latency_p50",
            description="Search latency 50th percentile",
            unit="ms",
            warning_threshold=100,
            critical_threshold=200,
            comparison="gt"
        ),
        MetricDefinition(
            name="search_latency_p99",
            description="Search latency 99th percentile",
            unit="ms",
            warning_threshold=500,
            critical_threshold=1000,
            comparison="gt"
        ),
        MetricDefinition(
            name="search_rate",
            description="Queries per second",
            unit="qps",
            warning_threshold=None,  # No upper limit warning
            critical_threshold=None,
            comparison="info"
        ),
        MetricDefinition(
            name="indexing_latency_p99",
            description="Indexing latency 99th percentile",
            unit="ms",
            warning_threshold=1000,
            critical_threshold=5000,
            comparison="gt"
        ),
    ]
    
    RESOURCE_METRICS = [
        MetricDefinition(
            name="cpu_percent",
            description="CPU utilization",
            unit="percent",
            warning_threshold=70,
            critical_threshold=85,
            comparison="gt"
        ),
        MetricDefinition(
            name="heap_used_percent",
            description="JVM heap utilization",
            unit="percent",
            warning_threshold=75,
            critical_threshold=85,
            comparison="gt"
        ),
        MetricDefinition(
            name="disk_used_percent",
            description="Disk utilization",
            unit="percent",
            warning_threshold=75,
            critical_threshold=85,
            comparison="gt"
        ),
        MetricDefinition(
            name="search_queue_size",
            description="Search thread pool queue size",
            unit="count",
            warning_threshold=500,
            critical_threshold=1000,
            comparison="gt"
        ),
        MetricDefinition(
            name="search_rejected",
            description="Search requests rejected",
            unit="count",
            warning_threshold=1,
            critical_threshold=10,
            comparison="gt"
        ),
    ]
    
    INDEXING_METRICS = [
        MetricDefinition(
            name="indexing_lag_seconds",
            description="CDC indexing lag",
            unit="seconds",
            warning_threshold=60,
            critical_threshold=300,
            comparison="gt"
        ),
        MetricDefinition(
            name="indexing_error_rate",
            description="Indexing error rate",
            unit="percent",
            warning_threshold=1,
            critical_threshold=5,
            comparison="gt"
        ),
        MetricDefinition(
            name="dlq_size",
            description="Dead letter queue size",
            unit="count",
            warning_threshold=100,
            critical_threshold=1000,
            comparison="gt"
        ),
    ]
    
    BUSINESS_METRICS = [
        MetricDefinition(
            name="zero_result_rate",
            description="Searches returning zero results",
            unit="percent",
            warning_threshold=10,
            critical_threshold=20,
            comparison="gt"
        ),
        MetricDefinition(
            name="click_through_rate",
            description="Search result click-through rate",
            unit="percent",
            warning_threshold=20,
            critical_threshold=10,
            comparison="lt"
        ),
    ]
    
    def __init__(self, es_client, prometheus_client):
        self.es = es_client
        self.prom = prometheus_client
    
    async def collect_all_metrics(self) -> Dict[str, float]:
        """Collect all metrics."""
        
        metrics = {}
        
        # Cluster metrics
        cluster_health = await self.es.cluster.health()
        metrics["cluster_status"] = {"green": 0, "yellow": 1, "red": 2}[
            cluster_health["status"]
        ]
        metrics["active_shards_percent"] = cluster_health["active_shards_percent_as_number"]
        metrics["relocating_shards"] = cluster_health["relocating_shards"]
        metrics["unassigned_shards"] = cluster_health["unassigned_shards"]
        
        # Node stats
        node_stats = await self.es.nodes.stats(
            metric=["os", "jvm", "thread_pool", "fs"]
        )
        
        cpu_pcts = []
        heap_pcts = []
        disk_pcts = []
        search_queues = []
        search_rejected = []
        
        for node_id, stats in node_stats["nodes"].items():
            cpu_pcts.append(stats["os"]["cpu"]["percent"])
            heap_pcts.append(stats["jvm"]["mem"]["heap_used_percent"])
            
            total_disk = stats["fs"]["total"]["total_in_bytes"]
            free_disk = stats["fs"]["total"]["free_in_bytes"]
            disk_pcts.append(100 * (total_disk - free_disk) / total_disk)
            
            search_queues.append(stats["thread_pool"]["search"]["queue"])
            search_rejected.append(stats["thread_pool"]["search"]["rejected"])
        
        metrics["cpu_percent"] = max(cpu_pcts)
        metrics["heap_used_percent"] = max(heap_pcts)
        metrics["disk_used_percent"] = max(disk_pcts)
        metrics["search_queue_size"] = sum(search_queues)
        metrics["search_rejected"] = sum(search_rejected)
        
        return metrics
    
    def evaluate_alerts(self, metrics: Dict[str, float]) -> List[dict]:
        """Evaluate metrics against thresholds."""
        
        alerts = []
        all_definitions = (
            self.CLUSTER_METRICS +
            self.PERFORMANCE_METRICS +
            self.RESOURCE_METRICS +
            self.INDEXING_METRICS +
            self.BUSINESS_METRICS
        )
        
        for defn in all_definitions:
            if defn.name not in metrics:
                continue
            
            value = metrics[defn.name]
            severity = self._evaluate_threshold(value, defn)
            
            if severity:
                alerts.append({
                    "metric": defn.name,
                    "value": value,
                    "threshold": (
                        defn.critical_threshold
                        if severity == MetricSeverity.CRITICAL
                        else defn.warning_threshold
                    ),
                    "severity": severity.value,
                    "description": defn.description
                })
        
        return alerts
    
    def _evaluate_threshold(
        self,
        value: float,
        defn: MetricDefinition
    ) -> MetricSeverity:
        """Evaluate value against thresholds."""
        
        if defn.comparison == "info":
            return None
        
        if defn.comparison == "gt":
            if defn.critical_threshold and value > defn.critical_threshold:
                return MetricSeverity.CRITICAL
            if defn.warning_threshold and value > defn.warning_threshold:
                return MetricSeverity.WARNING
        elif defn.comparison == "lt":
            if defn.critical_threshold and value < defn.critical_threshold:
                return MetricSeverity.CRITICAL
            if defn.warning_threshold and value < defn.warning_threshold:
                return MetricSeverity.WARNING
        
        return None


# =============================================================================
# Grafana Dashboard Definition
# =============================================================================

GRAFANA_DASHBOARD = """
SEARCH SYSTEM DASHBOARD

┌────────────────────────────────────────────────────────────────────────────┐
│                           CLUSTER HEALTH                                   │
│  ┌───────────────┐  ┌───────────────┐  ┌───────────────┐  ┌──────────────┐ │
│  │ Status: GREEN │  │ Nodes: 11/11  │  │ Shards: 100%  │  │ Disk: 45%    │ │
│  └───────────────┘  └───────────────┘  └───────────────┘  └──────────────┘ │
├────────────────────────────────────────────────────────────────────────────┤
│                           SEARCH PERFORMANCE                               │
│                                                                            │
│  Queries/sec          Latency p99              Error Rate                  │
│  ┌─────────────┐      ┌─────────────┐          ┌─────────────┐             │
│  │    12,345   │      │    145ms    │          │    0.02%    │             │
│  │  ▄▄▄▆▇█▇▆▄  │      │  ▂▂▃▃▂▂▃▂▂  │          │  ▁▁▁▁▁▁▁▁▁  │             │
│  └─────────────┘      └─────────────┘          └─────────────┘             │
│                                                                            │
│  Latency Distribution (last hour)                                          │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │ p50: 45ms  │████████████████                                        │   │
│  │ p90: 98ms  │████████████████████████████████                        │   │
│  │ p99: 145ms │████████████████████████████████████████████            │   │
│  │ max: 890ms │████████████████████████████████████████████████████████│   │
│  └─────────────────────────────────────────────────────────────────────┘   │
├────────────────────────────────────────────────────────────────────────────┤
│                           RESOURCE UTILIZATION                             │
│                                                                            │
│  CPU by Node                      Heap by Node                             │
│  ┌───────────────────────┐        ┌───────────────────────┐                │
│  │ data-1: ████████░░ 78%│        │ data-1: ██████░░░░ 62%│                │
│  │ data-2: ███████░░░ 68%│        │ data-2: ██████░░░░ 58%│                │
│  │ data-3: ████████░░ 75%│        │ data-3: ███████░░░ 67%│                │
│  │ data-4: ███████░░░ 72%│        │ data-4: ██████░░░░ 61%│                │
│  │ data-5: ████████░░ 76%│        │ data-5: ███████░░░ 65%│                │
│  │ data-6: ███████░░░ 70%│        │ data-6: ██████░░░░ 59%│                │
│  └───────────────────────┘        └───────────────────────┘                │
├────────────────────────────────────────────────────────────────────────────┤
│                           INDEXING PIPELINE                                │
│                                                                            │
│  Indexing Lag          Events/sec              DLQ Size                    │
│  ┌─────────────┐      ┌─────────────┐          ┌─────────────┐             │
│  │     12s     │      │     892     │          │      3      │             │
│  │  ▂▂▃▂▂▂▂▂▂  │      │  ▄▅▆▇▆▅▄▅▆  │          │  ▁▁▁▁▁▁▁▁▁  │             │
│  └─────────────┘      └─────────────┘          └─────────────┘             │
├────────────────────────────────────────────────────────────────────────────┤
│                           SEARCH QUALITY                                   │
│                                                                            │
│  Zero Result Rate     Click-Through Rate       Conversion Rate             │
│  ┌─────────────┐      ┌─────────────┐          ┌─────────────┐             │
│  │    4.2%     │      │    42.3%    │          │    3.8%     │             │
│  │  ▃▃▃▂▂▂▂▂▃  │      │  ▅▅▆▆▆▆▆▅▅  │          │  ▃▃▄▄▄▄▄▃▃  │             │
│  └─────────────┘      └─────────────┘          └─────────────┘             │
└────────────────────────────────────────────────────────────────────────────┘
"""

4.2 Alerting Configuration

# monitoring/alerting.py

"""
Alerting configuration for search system.
"""

from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum


class AlertChannel(Enum):
    SLACK = "slack"
    PAGERDUTY = "pagerduty"
    EMAIL = "email"


@dataclass
class AlertRule:
    """Alert rule definition."""
    name: str
    condition: str
    threshold: float
    duration: str  # How long condition must be true
    severity: str
    channels: List[AlertChannel]
    runbook_url: str
    description: str


class AlertingConfig:
    """
    Alerting configuration for search system.
    """
    
    ALERT_RULES = [
        # =======================================================================
        # Critical Alerts (PagerDuty)
        # =======================================================================
        AlertRule(
            name="cluster_red",
            condition="elasticsearch_cluster_health_status == 2",
            threshold=2,
            duration="1m",
            severity="critical",
            channels=[AlertChannel.PAGERDUTY, AlertChannel.SLACK],
            runbook_url="https://wiki/runbooks/es-cluster-red",
            description="Elasticsearch cluster is RED. Data loss possible."
        ),
        AlertRule(
            name="search_latency_critical",
            condition="elasticsearch_search_latency_p99 > 1000",
            threshold=1000,
            duration="5m",
            severity="critical",
            channels=[AlertChannel.PAGERDUTY, AlertChannel.SLACK],
            runbook_url="https://wiki/runbooks/es-high-latency",
            description="Search latency p99 > 1s for 5 minutes."
        ),
        AlertRule(
            name="search_errors_critical",
            condition="rate(elasticsearch_search_errors[5m]) > 0.05",
            threshold=0.05,
            duration="5m",
            severity="critical",
            channels=[AlertChannel.PAGERDUTY, AlertChannel.SLACK],
            runbook_url="https://wiki/runbooks/es-search-errors",
            description="Search error rate > 5% for 5 minutes."
        ),
        AlertRule(
            name="indexing_lag_critical",
            condition="search_indexing_lag_seconds > 300",
            threshold=300,
            duration="10m",
            severity="critical",
            channels=[AlertChannel.PAGERDUTY, AlertChannel.SLACK],
            runbook_url="https://wiki/runbooks/es-indexing-lag",
            description="Search index > 5 minutes behind database."
        ),
        AlertRule(
            name="node_down",
            condition="elasticsearch_cluster_nodes < expected_nodes",
            threshold=None,
            duration="2m",
            severity="critical",
            channels=[AlertChannel.PAGERDUTY, AlertChannel.SLACK],
            runbook_url="https://wiki/runbooks/es-node-down",
            description="Elasticsearch node is down."
        ),
        
        # =======================================================================
        # Warning Alerts (Slack)
        # =======================================================================
        AlertRule(
            name="cluster_yellow",
            condition="elasticsearch_cluster_health_status == 1",
            threshold=1,
            duration="10m",
            severity="warning",
            channels=[AlertChannel.SLACK],
            runbook_url="https://wiki/runbooks/es-cluster-yellow",
            description="Elasticsearch cluster is YELLOW. Replicas unavailable."
        ),
        AlertRule(
            name="search_latency_warning",
            condition="elasticsearch_search_latency_p99 > 500",
            threshold=500,
            duration="10m",
            severity="warning",
            channels=[AlertChannel.SLACK],
            runbook_url="https://wiki/runbooks/es-high-latency",
            description="Search latency p99 > 500ms for 10 minutes."
        ),
        AlertRule(
            name="heap_pressure",
            condition="elasticsearch_jvm_heap_used_percent > 75",
            threshold=75,
            duration="15m",
            severity="warning",
            channels=[AlertChannel.SLACK],
            runbook_url="https://wiki/runbooks/es-heap-pressure",
            description="JVM heap usage > 75% for 15 minutes."
        ),
        AlertRule(
            name="disk_space_warning",
            condition="elasticsearch_disk_used_percent > 75",
            threshold=75,
            duration="30m",
            severity="warning",
            channels=[AlertChannel.SLACK],
            runbook_url="https://wiki/runbooks/es-disk-space",
            description="Disk usage > 75%."
        ),
        AlertRule(
            name="search_queue_building",
            condition="elasticsearch_search_queue_size > 500",
            threshold=500,
            duration="5m",
            severity="warning",
            channels=[AlertChannel.SLACK],
            runbook_url="https://wiki/runbooks/es-queue-building",
            description="Search queue building up. May indicate overload."
        ),
        AlertRule(
            name="zero_result_rate_high",
            condition="search_zero_result_rate > 0.10",
            threshold=0.10,
            duration="30m",
            severity="warning",
            channels=[AlertChannel.SLACK],
            runbook_url="https://wiki/runbooks/search-zero-results",
            description="Zero result rate > 10%. Check indexing and synonyms."
        ),
    ]
    
    @classmethod
    def generate_prometheus_rules(cls) -> str:
        """Generate Prometheus alerting rules."""
        
        rules = []
        
        for alert in cls.ALERT_RULES:
            rule = f"""
  - alert: {alert.name}
    expr: {alert.condition}
    for: {alert.duration}
    labels:
      severity: {alert.severity}
    annotations:
      summary: "{alert.description}"
      runbook_url: "{alert.runbook_url}"
"""
            rules.append(rule)
        
        return f"""
groups:
  - name: elasticsearch
    rules:
{''.join(rules)}
"""

Part V: Disaster Recovery

Chapter 5: Backup and Recovery

5.1 Snapshot Strategy

# operations/disaster_recovery.py

"""
Disaster recovery for Elasticsearch.
"""

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
import asyncio


@dataclass
class SnapshotPolicy:
    """Snapshot policy configuration."""
    name: str
    schedule: str  # Cron expression
    indices: List[str]
    retention_days: int
    repository: str


class DisasterRecovery:
    """
    Manages backups and disaster recovery.
    """
    
    def __init__(self, es_client, s3_client):
        self.es = es_client
        self.s3 = s3_client
        
        self.repository = "s3_backup"
        self.bucket = "elasticsearch-backups-prod"
    
    async def setup_repository(self):
        """Set up S3 snapshot repository."""
        
        await self.es.snapshot.create_repository(
            repository=self.repository,
            body={
                "type": "s3",
                "settings": {
                    "bucket": self.bucket,
                    "region": "us-east-1",
                    "base_path": "snapshots",
                    "compress": True,
                    "chunk_size": "1gb",
                    "max_restore_bytes_per_sec": "500mb",
                    "max_snapshot_bytes_per_sec": "200mb"
                }
            }
        )
    
    async def create_snapshot(
        self,
        name: str = None,
        indices: List[str] = None
    ) -> dict:
        """
        Create a snapshot.
        """
        
        if not name:
            name = f"snapshot_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
        
        body = {
            "indices": indices or ["products*"],
            "include_global_state": False,
            "metadata": {
                "created_by": "automated_backup",
                "created_at": datetime.utcnow().isoformat()
            }
        }
        
        response = await self.es.snapshot.create(
            repository=self.repository,
            snapshot=name,
            body=body,
            wait_for_completion=False  # Don't block
        )
        
        return response
    
    async def restore_snapshot(
        self,
        snapshot_name: str,
        indices: List[str] = None,
        rename_pattern: str = None,
        rename_replacement: str = None
    ) -> dict:
        """
        Restore from snapshot.
        
        Args:
            snapshot_name: Name of snapshot to restore
            indices: Specific indices to restore (None = all)
            rename_pattern: Regex pattern for renaming
            rename_replacement: Replacement string
        """
        
        body = {
            "indices": indices or ["products*"],
            "include_global_state": False,
            "include_aliases": True
        }
        
        # Optionally rename indices (for testing restores)
        if rename_pattern:
            body["rename_pattern"] = rename_pattern
            body["rename_replacement"] = rename_replacement
        
        response = await self.es.snapshot.restore(
            repository=self.repository,
            snapshot=snapshot_name,
            body=body,
            wait_for_completion=False
        )
        
        return response
    
    async def list_snapshots(
        self,
        limit: int = 10
    ) -> List[dict]:
        """List recent snapshots."""
        
        response = await self.es.snapshot.get(
            repository=self.repository,
            snapshot="_all"
        )
        
        snapshots = response.get("snapshots", [])
        
        # Sort by start time, newest first
        snapshots.sort(key=lambda s: s["start_time"], reverse=True)
        
        return snapshots[:limit]
    
    async def cleanup_old_snapshots(self, retention_days: int = 30):
        """Delete snapshots older than retention period."""
        
        cutoff = datetime.utcnow() - timedelta(days=retention_days)
        
        snapshots = await self.list_snapshots(limit=1000)
        
        deleted = 0
        for snapshot in snapshots:
            start_time = datetime.fromisoformat(
                snapshot["start_time"].replace("Z", "+00:00")
            )
            
            if start_time.replace(tzinfo=None) < cutoff:
                await self.es.snapshot.delete(
                    repository=self.repository,
                    snapshot=snapshot["snapshot"]
                )
                deleted += 1
        
        return deleted
    
    async def verify_snapshot(self, snapshot_name: str) -> dict:
        """Verify snapshot integrity."""
        
        # Get snapshot info
        response = await self.es.snapshot.get(
            repository=self.repository,
            snapshot=snapshot_name
        )
        
        snapshot = response["snapshots"][0]
        
        verification = {
            "snapshot": snapshot_name,
            "state": snapshot["state"],
            "indices": len(snapshot["indices"]),
            "shards": {
                "total": snapshot["shards"]["total"],
                "successful": snapshot["shards"]["successful"],
                "failed": snapshot["shards"]["failed"]
            },
            "duration_seconds": snapshot["duration_in_millis"] / 1000,
            "size_bytes": 0  # Would need to calculate from S3
        }
        
        verification["healthy"] = (
            snapshot["state"] == "SUCCESS" and
            snapshot["shards"]["failed"] == 0
        )
        
        return verification


# =============================================================================
# DR Runbook
# =============================================================================

DR_RUNBOOK = """
DISASTER RECOVERY RUNBOOK

SCENARIO 1: Single Node Failure
─────────────────────────────────
Impact: Temporary yellow cluster, no data loss
Recovery: Automatic (Kubernetes restarts pod)
Action: Monitor for node to rejoin, shards to reallocate
ETA: 5-10 minutes

SCENARIO 2: Multiple Node Failure (< quorum)
─────────────────────────────────────────────
Impact: Yellow/Red cluster, potential query failures
Recovery: 
  1. Scale up replacement nodes
  2. Wait for shard recovery
  3. Verify cluster health
Action:
  kubectl scale statefulset elasticsearch-data --replicas=6
ETA: 15-30 minutes

SCENARIO 3: Complete Cluster Loss
─────────────────────────────────
Impact: Total search outage
Recovery:
  1. Provision new cluster
  2. Restore from latest snapshot
  3. Resume CDC pipeline
  4. Verify data integrity
Commands:
  # List available snapshots
  curl -X GET "localhost:9200/_snapshot/s3_backup/_all"
  
  # Restore latest snapshot
  curl -X POST "localhost:9200/_snapshot/s3_backup/latest/_restore"
  
  # Monitor restore progress
  curl -X GET "localhost:9200/_recovery"
ETA: 1-4 hours depending on data size

SCENARIO 4: Data Corruption
───────────────────────────
Impact: Incorrect search results
Recovery:
  1. Identify scope of corruption
  2. Restore specific indices from snapshot
  3. Or trigger full reindex from PostgreSQL
Commands:
  # Close corrupted index
  curl -X POST "localhost:9200/products/_close"
  
  # Restore from snapshot
  curl -X POST "localhost:9200/_snapshot/s3_backup/snapshot_20240101/_restore" -d '{
    "indices": "products",
    "rename_pattern": "(.+)",
    "rename_replacement": "restored_$1"
  }'
  
  # Verify and swap
  # ... verification steps ...
  
  # Swap aliases
  curl -X POST "localhost:9200/_aliases" -d '{
    "actions": [
      {"remove": {"index": "products", "alias": "products_live"}},
      {"add": {"index": "restored_products", "alias": "products_live"}}
    ]
  }'
ETA: 30 minutes - 2 hours

SCENARIO 5: Region Failure
──────────────────────────
Impact: Total search outage in affected region
Recovery:
  1. Failover to secondary region
  2. Update DNS/routing
  3. Resume indexing in secondary
Prerequisites:
  - Cross-region replication enabled
  - Secondary cluster on standby
ETA: 15-30 minutes (if prepared)
"""

Part VI: Performance Tuning

Chapter 6: Optimization

6.1 Query Optimization

# optimization/query_tuning.py

"""
Query optimization techniques.
"""

from typing import Dict, List, Any


class QueryOptimizer:
    """
    Optimizes Elasticsearch queries for performance.
    """
    
    def __init__(self, es_client):
        self.es = es_client
    
    async def analyze_slow_queries(
        self,
        threshold_ms: int = 500
    ) -> List[dict]:
        """
        Find and analyze slow queries from slow log.
        """
        
        # Enable slow log (if not already)
        await self.es.indices.put_settings(
            index="products",
            body={
                "index.search.slowlog.threshold.query.warn": f"{threshold_ms}ms",
                "index.search.slowlog.threshold.query.info": f"{threshold_ms // 2}ms",
                "index.search.slowlog.level": "info"
            }
        )
        
        # In production, slow queries would be in the slowlog
        # Here we return optimization suggestions
        return []
    
    def optimize_query(self, query: dict) -> dict:
        """
        Apply query optimizations.
        """
        
        optimizations_applied = []
        
        # 1. Move non-scoring clauses to filter context
        query, moved = self._move_to_filter_context(query)
        if moved:
            optimizations_applied.append("moved_to_filter")
        
        # 2. Use keyword fields for exact matches
        query, fixed = self._use_keyword_fields(query)
        if fixed:
            optimizations_applied.append("keyword_fields")
        
        # 3. Limit source fields
        if "_source" not in query:
            query["_source"] = ["name", "price", "brand", "category", "rating"]
            optimizations_applied.append("limited_source")
        
        # 4. Add request cache hint
        query["request_cache"] = True
        optimizations_applied.append("request_cache")
        
        return {
            "query": query,
            "optimizations": optimizations_applied
        }
    
    def _move_to_filter_context(self, query: dict) -> tuple:
        """Move non-scoring clauses to filter context."""
        
        moved = False
        bool_query = query.get("query", {}).get("bool", {})
        
        if not bool_query:
            return query, moved
        
        must = bool_query.get("must", [])
        filters = bool_query.get("filter", [])
        
        new_must = []
        for clause in must:
            # Term queries don't need scoring
            if "term" in clause or "terms" in clause or "range" in clause:
                filters.append(clause)
                moved = True
            else:
                new_must.append(clause)
        
        if moved:
            bool_query["must"] = new_must if new_must else [{"match_all": {}}]
            bool_query["filter"] = filters
        
        return query, moved
    
    def _use_keyword_fields(self, query: dict) -> tuple:
        """Use keyword subfields for exact matches."""
        
        fixed = False
        
        def fix_terms(obj):
            nonlocal fixed
            if isinstance(obj, dict):
                for key, value in list(obj.items()):
                    if key in ("term", "terms"):
                        for field, val in list(value.items()):
                            # If querying text field for exact match, use keyword
                            if field in ("brand", "category", "color") and ".keyword" not in field:
                                value[f"{field}.keyword"] = val
                                del value[field]
                                fixed = True
                    else:
                        fix_terms(value)
            elif isinstance(obj, list):
                for item in obj:
                    fix_terms(item)
        
        fix_terms(query)
        return query, fixed


# =============================================================================
# Index Optimization
# =============================================================================

class IndexOptimizer:
    """
    Index-level optimizations.
    """
    
    def __init__(self, es_client):
        self.es = es_client
    
    async def optimize_for_search(self, index: str):
        """
        Optimize index for search performance.
        
        Call during low-traffic periods.
        """
        
        # Force merge to reduce segment count
        await self.es.indices.forcemerge(
            index=index,
            max_num_segments=1,  # Single segment per shard
            only_expunge_deletes=False
        )
        
        # Refresh to make all docs searchable
        await self.es.indices.refresh(index=index)
        
        # Clear caches (they'll warm up with new queries)
        await self.es.indices.clear_cache(index=index)
    
    async def get_optimization_recommendations(
        self,
        index: str
    ) -> List[dict]:
        """
        Analyze index and provide recommendations.
        """
        
        recommendations = []
        
        # Get index stats
        stats = await self.es.indices.stats(index=index)
        index_stats = stats["indices"][index]
        
        # Check segment count
        primaries = index_stats["primaries"]
        segment_count = primaries["segments"]["count"]
        doc_count = primaries["docs"]["count"]
        
        if segment_count > 50:
            recommendations.append({
                "type": "segments",
                "severity": "medium",
                "message": f"High segment count ({segment_count}). Consider force merge.",
                "action": f"POST /{index}/_forcemerge?max_num_segments=5"
            })
        
        # Check deleted docs
        deleted_docs = primaries["docs"]["deleted"]
        if deleted_docs > doc_count * 0.1:  # > 10% deleted
            recommendations.append({
                "type": "deleted_docs",
                "severity": "low",
                "message": f"High deleted doc count ({deleted_docs}). Consider force merge.",
                "action": f"POST /{index}/_forcemerge?only_expunge_deletes=true"
            })
        
        # Check field data usage
        fielddata = primaries.get("fielddata", {}).get("memory_size_in_bytes", 0)
        if fielddata > 1024 * 1024 * 1024:  # > 1GB
            recommendations.append({
                "type": "fielddata",
                "severity": "high",
                "message": f"High fielddata usage ({fielddata / 1024 / 1024:.0f}MB). "
                          "Check for text field aggregations.",
                "action": "Use keyword fields for aggregations instead of text"
            })
        
        # Check refresh interval
        settings = await self.es.indices.get_settings(index=index)
        refresh_interval = settings[index]["settings"]["index"].get(
            "refresh_interval", "1s"
        )
        if refresh_interval == "1s":
            recommendations.append({
                "type": "refresh_interval",
                "severity": "low",
                "message": "Default refresh interval (1s). Consider increasing for better indexing.",
                "action": f'PUT /{index}/_settings {{"index.refresh_interval": "30s"}}'
            })
        
        return recommendations

6.2 Performance Checklist

# optimization/performance_checklist.py

"""
Performance optimization checklist.
"""

PERFORMANCE_CHECKLIST = """
ELASTICSEARCH PERFORMANCE CHECKLIST

═══════════════════════════════════════════════════════════════════════════════
HARDWARE & INFRASTRUCTURE
═══════════════════════════════════════════════════════════════════════════════

[ ] SSD storage (not spinning disks)
[ ] Sufficient RAM for OS cache (index size × 0.5)
[ ] Heap size = min(31GB, RAM/2)
[ ] Dedicated master nodes (at least 3)
[ ] Network optimized for low latency between nodes

═══════════════════════════════════════════════════════════════════════════════
INDEX CONFIGURATION
═══════════════════════════════════════════════════════════════════════════════

[ ] Appropriate shard count (20-50GB per shard)
[ ] Replica count based on read load
[ ] Refresh interval tuned (not default 1s for high-write)
[ ] Mapping optimized:
    [ ] keyword for exact match/aggregations
    [ ] text for full-text search
    [ ] Disabled _source if not needed
    [ ] No dynamic mapping in production

═══════════════════════════════════════════════════════════════════════════════
QUERY OPTIMIZATION
═══════════════════════════════════════════════════════════════════════════════

[ ] Use filter context for non-scoring clauses
[ ] Use keyword fields for exact matches
[ ] Limit returned fields (_source filtering)
[ ] Use request cache for repeated queries
[ ] Avoid script queries where possible
[ ] Use doc_values for sorting/aggregations
[ ] Pagination with search_after, not deep offset

═══════════════════════════════════════════════════════════════════════════════
INDEXING OPTIMIZATION
═══════════════════════════════════════════════════════════════════════════════

[ ] Bulk indexing (500-5000 docs per request)
[ ] Index refresh interval increased during bulk
[ ] Async translog for high-throughput indexing
[ ] Appropriate number of indexing threads

═══════════════════════════════════════════════════════════════════════════════
CLUSTER HEALTH
═══════════════════════════════════════════════════════════════════════════════

[ ] All nodes healthy
[ ] No unassigned shards
[ ] Shard balance across nodes
[ ] Circuit breakers configured
[ ] Thread pools sized appropriately

═══════════════════════════════════════════════════════════════════════════════
CACHING
═══════════════════════════════════════════════════════════════════════════════

[ ] Query cache enabled and sized
[ ] Request cache for common queries
[ ] Field data cache limited
[ ] Application-level caching for hot queries

═══════════════════════════════════════════════════════════════════════════════
MONITORING
═══════════════════════════════════════════════════════════════════════════════

[ ] Slow log enabled
[ ] Key metrics tracked (latency, throughput, errors)
[ ] Alerting configured
[ ] Dashboards available
"""

Summary

What We Learned Today

DAY 5 SUMMARY: SEARCH OPERATIONS & SCALE

CLUSTER ARCHITECTURE
├── Master nodes: 3 dedicated, small instances
├── Data nodes: Memory optimized, SSD required
├── Coordinating nodes: Query routing and aggregation
├── Shard strategy: 20-50GB per shard
└── Replicas for read scaling and HA

CAPACITY PLANNING
├── Storage: Raw data × 1.3 (overhead) × replicas × growth
├── Memory: Index size × 0.5 for OS cache + 31GB heap
├── CPU: QPS / 150 for simple queries
└── Plan for peak (Black Friday = 5x normal)

TRAFFIC SPIKES
├── Pre-scale 24 hours before
├── Add replicas for read capacity
├── Graceful degradation under load
├── Disable expensive features when critical
└── Checklist and runbook preparation

MONITORING
├── Cluster health: green/yellow/red
├── Performance: latency p50/p99, QPS, errors
├── Resources: CPU, heap, disk, queue sizes
├── Business: zero result rate, CTR
└── Alerting: Critical → PagerDuty, Warning → Slack

DISASTER RECOVERY
├── S3 snapshots (daily + before changes)
├── Retention policy (30 days)
├── Restore testing (monthly)
├── Cross-region replication for critical systems
└── Documented runbooks for each scenario

PERFORMANCE TUNING
├── Filter context for non-scoring clauses
├── Keyword fields for exact matches
├── Force merge during low traffic
├── Limit returned fields
└── Request cache for hot queries

Key Takeaways

OPERATIONS KEY TAKEAWAYS

1. DEDICATED NODE ROLES
   Master, data, coordinating - separate concerns
   Don't run everything on same nodes

2. PLAN FOR 3X-5X PEAK
   Normal capacity won't handle Black Friday
   Pre-scale, don't reactive scale

3. DEGRADE GRACEFULLY
   Better slow search than no search
   Disable features progressively

4. MEASURE EVERYTHING
   Can't fix what you can't see
   Latency, errors, queue depth, business metrics

5. PRACTICE RECOVERY
   Untested backups are not backups
   Run DR drills regularly

GOLDEN RULES:
├── Never more than 50GB per shard
├── Heap ≤ 31GB (compressed OOPs)
├── OS cache ≥ 50% of index size
├── Filter context for non-scoring
└── Snapshot before any change

Interview Tip

WHEN ASKED "HOW WOULD YOU HANDLE 10X TRAFFIC?"

"For a 10x traffic spike like Black Friday, I'd prepare in three phases:

BEFORE (24-48 hours):
- Scale cluster horizontally (3x data nodes, 5x coordinating)
- Add replicas for read capacity
- Force merge indexes to optimize segments
- Warm caches with popular queries
- Increase refresh interval to reduce indexing overhead
- Review and test runbooks with the team

DURING:
- Monitor key metrics: latency p99, error rate, queue depth
- Have graceful degradation ready:
  Level 1: Disable spell check, personalization
  Level 2: Simplify queries, reduce result size
  Level 3: Serve from cache only for popular queries
- War room channel for real-time coordination

AFTER:
- Scale down gradually (not all at once)
- Review metrics and incidents
- Document lessons learned

The key is preparation - reactive scaling is too slow."

This shows you understand both the technical and operational aspects.

Week 7 Complete!

WEEK 7 SUMMARY: SEARCH SYSTEM

Day 1: Search Fundamentals
├── Inverted indexes
├── BM25 scoring
├── Text analysis
└── Document modeling

Day 2: Indexing Pipeline
├── CDC with Debezium
├── Kafka streaming
├── Bulk indexing
└── Zero-downtime reindex

Day 3: Query & Relevance
├── BM25 tuning
├── Query vs Filter
├── Boosting strategies
└── Function scores

Day 4: Advanced Features
├── Autocomplete
├── Faceted search
├── Synonyms
├── Personalization

Day 5: Operations & Scale
├── Cluster architecture
├── Capacity planning
├── Traffic spikes
├── Disaster recovery

TOTAL: ~11,000 lines of educational content

You can now design, build, and operate a production search system!

Next Week Preview

Week 8: Analytics Pipeline"From events to insights"

  • Event ingestion at scale
  • Streaming vs batch processing
  • Data modeling for analytics
  • Late-arriving data handling
  • Query optimization for OLAP

End of Week 7, Day 5

Next: Week 8 — Analytics Pipeline