Day 05
Week 7 — Day 5: Search Operations & Scale
System Design Mastery Series — Building Blocks Week
Introduction
We've built a complete search system this week: inverted indexes, CDC pipelines, relevance tuning, autocomplete, and personalization. But building is only half the battle.
THE OPERATIONS REALITY
3 AM, Black Friday:
├── Traffic: 50K queries/sec (5x normal)
├── Alert: "Elasticsearch cluster CPU at 95%"
├── Alert: "Search latency p99 at 2.5s" (target: 200ms)
├── Alert: "Indexing lag at 5 minutes"
├── PagerDuty: Your phone rings
Questions racing through your mind:
├── What's actually broken?
├── Can we shed load? How?
├── Should we add nodes? How fast?
├── Is there a rollback option?
├── Who else do I need to wake up?
This is what separates system design from operations.
Today's Theme: "Running search in production, at scale, under pressure"
We'll cover:
- Cluster architecture for high availability
- Capacity planning and scaling
- Handling traffic spikes
- Monitoring, alerting, and debugging
- Disaster recovery
- Performance tuning and optimization
Part I: Cluster Architecture
Chapter 1: Production Topology
1.1 Node Roles and Responsibilities
ELASTICSEARCH NODE ROLES
┌────────────────────────────────────────────────────────────────────────┐
│ PRODUCTION CLUSTER TOPOLOGY │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ MASTER NODES (3) │ │
│ │ Dedicated, Small │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Master 1 │ │Master 2 │ │Master 3 │ │ │
│ │ │(Active) │ │(Standby)│ │(Standby)│ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ Cluster state, shard allocation, index management │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ COORDINATING NODES (2-4) │ │
│ │ CPU optimized, Medium │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Coord 1 │ │Coord 2 │ │Coord 3 │ │Coord 4 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ Query routing, result aggregation, reduce phase │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ DATA NODES (6+) │ │
│ │ Memory + Storage optimized │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Data 1 │ │ Data 2 │ │ Data 3 │ │ Data 4 │ │ │
│ │ │ 64GB │ │ 64GB │ │ 64GB │ │ 64GB │ │ │
│ │ │ 1TB SSD │ │ 1TB SSD │ │ 1TB SSD │ │ 1TB SSD │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Data 5 │ │ Data 6 │ ... more as needed │ │
│ │ └─────────┘ └─────────┘ │ │
│ │ Stores shards, executes queries and indexing │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────┘
NODE ROLE SUMMARY:
MASTER NODES
├── Manage cluster state (which shards where)
├── Handle index creation/deletion
├── Always odd number (3, 5) for quorum
├── Dedicated = not handling data or queries
└── Small instances (4 CPU, 8GB RAM sufficient)
COORDINATING NODES
├── Receive client requests
├── Route to appropriate data nodes
├── Aggregate results from shards
├── Shield data nodes from client connections
└── CPU-optimized instances
DATA NODES
├── Store index shards
├── Execute search and indexing
├── Need RAM for caching (heap + OS cache)
├── Need fast storage (SSD required)
└── Memory-optimized instances
1.2 Shard Strategy
# operations/shard_strategy.py
"""
Shard planning and management.
"""
from dataclasses import dataclass
from typing import List
import math
@dataclass
class ShardPlan:
"""Shard configuration plan."""
primary_shards: int
replica_shards: int
total_shards: int
shards_per_node: float
estimated_shard_size_gb: float
class ShardPlanner:
"""
Plans shard configuration based on data size and query patterns.
Rules of thumb:
- Target 20-50GB per shard (sweet spot ~30GB)
- Max 20 shards per GB of heap per node
- More shards = more parallelism but more overhead
- Can't change primary shard count after creation
"""
def __init__(
self,
target_shard_size_gb: float = 30.0,
max_shards_per_heap_gb: int = 20
):
self.target_shard_size = target_shard_size_gb
self.max_shards_per_heap_gb = max_shards_per_heap_gb
def plan_shards(
self,
data_size_gb: float,
data_nodes: int,
heap_per_node_gb: int,
replica_count: int = 1,
growth_factor: float = 1.5
) -> ShardPlan:
"""
Plan shard configuration.
Args:
data_size_gb: Current data size
data_nodes: Number of data nodes
heap_per_node_gb: Heap per data node
replica_count: Number of replicas
growth_factor: Expected data growth multiplier
"""
# Account for growth
projected_size = data_size_gb * growth_factor
# Calculate primary shards based on size
primary_shards = max(1, math.ceil(projected_size / self.target_shard_size))
# Ensure enough parallelism (at least 1 shard per node)
primary_shards = max(primary_shards, data_nodes)
# Round to nice number
primary_shards = self._round_to_nice_number(primary_shards)
# Check shard limit per node
total_shards = primary_shards * (1 + replica_count)
shards_per_node = total_shards / data_nodes
max_shards_per_node = heap_per_node_gb * self.max_shards_per_heap_gb
if shards_per_node > max_shards_per_node:
# Need more nodes or fewer shards
recommended_nodes = math.ceil(total_shards / max_shards_per_node)
print(f"Warning: {shards_per_node:.1f} shards/node exceeds limit. "
f"Recommend {recommended_nodes} data nodes.")
return ShardPlan(
primary_shards=primary_shards,
replica_shards=primary_shards * replica_count,
total_shards=total_shards,
shards_per_node=shards_per_node,
estimated_shard_size_gb=projected_size / primary_shards
)
def _round_to_nice_number(self, n: int) -> int:
"""Round to a nice number for shard count."""
nice_numbers = [1, 2, 3, 4, 5, 6, 8, 10, 12, 15, 16, 20, 24, 30, 32, 40, 48, 50, 60, 64]
for nice in nice_numbers:
if nice >= n:
return nice
return n
# =============================================================================
# Example: Our Product Search System
# =============================================================================
def plan_product_search_shards():
"""Plan shards for 50M products."""
planner = ShardPlanner()
# Current state
current_products = 50_000_000
avg_doc_size_kb = 10
current_data_gb = (current_products * avg_doc_size_kb) / (1024 * 1024)
# With ES overhead (~30%)
index_size_gb = current_data_gb * 1.3
print(f"Current data: {current_data_gb:.1f} GB")
print(f"Index size (with overhead): {index_size_gb:.1f} GB")
# Plan shards
plan = planner.plan_shards(
data_size_gb=index_size_gb,
data_nodes=6,
heap_per_node_gb=31,
replica_count=1,
growth_factor=1.5
)
print(f"\nShard Plan:")
print(f" Primary shards: {plan.primary_shards}")
print(f" Replica shards: {plan.replica_shards}")
print(f" Total shards: {plan.total_shards}")
print(f" Shards per node: {plan.shards_per_node:.1f}")
print(f" Estimated shard size: {plan.estimated_shard_size_gb:.1f} GB")
return plan
# Output:
# Current data: 476.8 GB
# Index size (with overhead): 619.9 GB
#
# Shard Plan:
# Primary shards: 32
# Replica shards: 32
# Total shards: 64
# Shards per node: 10.7
# Estimated shard size: 29.1 GB
1.3 High Availability Configuration
# operations/cluster_config.py
"""
Production cluster configuration for high availability.
"""
PRODUCTION_CLUSTER_CONFIG = {
"cluster.name": "product-search-prod",
# ==========================================================================
# Node Discovery
# ==========================================================================
"discovery.seed_hosts": [
"master-1.es.internal",
"master-2.es.internal",
"master-3.es.internal"
],
"cluster.initial_master_nodes": [
"master-1",
"master-2",
"master-3"
],
# ==========================================================================
# Cluster Stability
# ==========================================================================
# Minimum master nodes for quorum (n/2 + 1)
"discovery.zen.minimum_master_nodes": 2,
# Prevent split brain during network partitions
"gateway.recover_after_nodes": 4,
"gateway.expected_nodes": 6,
"gateway.recover_after_time": "5m",
# ==========================================================================
# Memory and Performance
# ==========================================================================
# Heap size (set via JVM options, not here)
# Rule: 50% of RAM, max 31GB (for compressed OOPs)
# Field data circuit breaker (prevent OOM)
"indices.breaker.fielddata.limit": "40%",
"indices.breaker.request.limit": "60%",
"indices.breaker.total.limit": "70%",
# Query cache
"indices.queries.cache.size": "10%",
# ==========================================================================
# Indexing Performance
# ==========================================================================
# Refresh interval (trade freshness for performance)
"index.refresh_interval": "5s",
# Translog durability
"index.translog.durability": "async",
"index.translog.sync_interval": "5s",
# Merge policy
"index.merge.scheduler.max_thread_count": 1, # For spinning disks
# ==========================================================================
# Search Performance
# ==========================================================================
# Thread pools
"thread_pool.search.size": 13, # (# of CPUs * 3) / 2 + 1
"thread_pool.search.queue_size": 1000,
"thread_pool.write.size": 9, # # of CPUs + 1
"thread_pool.write.queue_size": 200,
# ==========================================================================
# Shard Allocation
# ==========================================================================
# Prevent shards on same physical host
"cluster.routing.allocation.awareness.attributes": "rack_id",
# Rebalancing
"cluster.routing.allocation.balance.shard": 0.45,
"cluster.routing.allocation.balance.index": 0.55,
# Allocation filtering (keep hot data on fast nodes)
"cluster.routing.allocation.include.data": "hot",
}
# =============================================================================
# Node-Specific Configuration
# =============================================================================
MASTER_NODE_CONFIG = {
"node.master": True,
"node.data": False,
"node.ingest": False,
"node.ml": False,
# Smaller heap for master nodes
# JVM: -Xms4g -Xmx4g
}
DATA_NODE_CONFIG = {
"node.master": False,
"node.data": True,
"node.ingest": True,
"node.ml": False,
# Large heap for data nodes
# JVM: -Xms31g -Xmx31g
# Path for data storage
"path.data": "/var/lib/elasticsearch/data",
# Node attributes for allocation awareness
"node.attr.rack_id": "rack1", # Set per node
"node.attr.data": "hot", # hot/warm/cold
}
COORDINATING_NODE_CONFIG = {
"node.master": False,
"node.data": False,
"node.ingest": False,
"node.ml": False,
# Moderate heap for coordinating nodes
# JVM: -Xms16g -Xmx16g
}
Part II: Capacity Planning
Chapter 2: Sizing for Scale
2.1 Capacity Calculator
# operations/capacity_planner.py
"""
Capacity planning for Elasticsearch clusters.
"""
from dataclasses import dataclass
from typing import Dict
import math
@dataclass
class CapacityRequirements:
"""Computed capacity requirements."""
# Data nodes
data_nodes: int
ram_per_node_gb: int
heap_per_node_gb: int
storage_per_node_gb: int
cpu_per_node: int
# Master nodes
master_nodes: int = 3
master_ram_gb: int = 8
# Coordinating nodes
coordinating_nodes: int
coordinating_ram_gb: int = 16
# Totals
total_ram_gb: int = 0
total_storage_gb: int = 0
total_cpu: int = 0
def __post_init__(self):
self.total_ram_gb = (
self.data_nodes * self.ram_per_node_gb +
self.master_nodes * self.master_ram_gb +
self.coordinating_nodes * self.coordinating_ram_gb
)
self.total_storage_gb = self.data_nodes * self.storage_per_node_gb
self.total_cpu = (
self.data_nodes * self.cpu_per_node +
self.master_nodes * 4 +
self.coordinating_nodes * 8
)
class CapacityPlanner:
"""
Plans cluster capacity based on requirements.
"""
def plan_capacity(
self,
# Data requirements
document_count: int,
avg_document_size_kb: float,
replica_count: int = 1,
# Query requirements
queries_per_second: int,
indexing_per_second: int,
# Latency requirements
target_latency_ms: int = 200,
# Growth
growth_months: int = 12,
monthly_growth_rate: float = 0.05
) -> CapacityRequirements:
"""
Calculate capacity requirements.
"""
# =======================================================================
# Storage Calculation
# =======================================================================
# Raw data size
raw_data_gb = (document_count * avg_document_size_kb) / (1024 * 1024)
# Elasticsearch overhead (~30% for indexes, etc.)
index_size_gb = raw_data_gb * 1.3
# With replicas
total_storage_gb = index_size_gb * (1 + replica_count)
# Growth projection
growth_factor = (1 + monthly_growth_rate) ** growth_months
projected_storage_gb = total_storage_gb * growth_factor
# Add 30% headroom
required_storage_gb = projected_storage_gb * 1.3
# =======================================================================
# Memory Calculation
# =======================================================================
# Rule: OS cache should be ~= index size for good performance
# Total RAM = Heap (31GB max) + OS Cache (rest)
# Target: Index fits mostly in OS cache
# For query-heavy workloads: more RAM for caching
# For indexing-heavy workloads: more CPU
ram_for_cache_gb = index_size_gb * 0.5 # 50% of index in cache
heap_per_node_gb = 31 # Max with compressed OOPs
ram_per_node_gb = 64 # Standard size
# Nodes needed for storage
storage_per_node_gb = 1000 # 1TB per node
nodes_for_storage = math.ceil(required_storage_gb / storage_per_node_gb)
# Nodes needed for memory
cache_per_node_gb = ram_per_node_gb - heap_per_node_gb
nodes_for_memory = math.ceil(ram_for_cache_gb / cache_per_node_gb)
# =======================================================================
# CPU Calculation
# =======================================================================
# Rule of thumb: 1 core per 100-200 simple queries/sec
# Complex queries (aggregations, scripts): fewer per core
cores_for_queries = math.ceil(queries_per_second / 150)
cores_for_indexing = math.ceil(indexing_per_second / 1000)
total_cores = cores_for_queries + cores_for_indexing
cpu_per_node = 16
nodes_for_cpu = math.ceil(total_cores / cpu_per_node)
# =======================================================================
# Final Node Count
# =======================================================================
data_nodes = max(
nodes_for_storage,
nodes_for_memory,
nodes_for_cpu,
3 # Minimum for HA
)
# Round up to even number (for AZ distribution)
if data_nodes % 2 != 0:
data_nodes += 1
# =======================================================================
# Coordinating Nodes
# =======================================================================
# 1 coordinating node per 5K queries/sec (roughly)
coordinating_nodes = max(2, math.ceil(queries_per_second / 5000))
return CapacityRequirements(
data_nodes=data_nodes,
ram_per_node_gb=ram_per_node_gb,
heap_per_node_gb=heap_per_node_gb,
storage_per_node_gb=storage_per_node_gb,
cpu_per_node=cpu_per_node,
coordinating_nodes=coordinating_nodes
)
def estimate_cost(
self,
requirements: CapacityRequirements,
cloud_provider: str = "aws"
) -> Dict[str, float]:
"""
Estimate monthly cloud cost.
"""
# AWS pricing (approximate, varies by region)
aws_pricing = {
"r5.2xlarge": {"cpu": 8, "ram": 64, "cost_hour": 0.504},
"r5.4xlarge": {"cpu": 16, "ram": 128, "cost_hour": 1.008},
"m5.xlarge": {"cpu": 4, "ram": 16, "cost_hour": 0.192},
"c5.2xlarge": {"cpu": 8, "ram": 16, "cost_hour": 0.34},
"ebs_gp3_gb": 0.08, # Per GB per month
}
hours_per_month = 730
# Data nodes: r5.2xlarge (64GB RAM, 8 vCPU)
data_node_cost = (
requirements.data_nodes *
aws_pricing["r5.2xlarge"]["cost_hour"] *
hours_per_month
)
# Master nodes: m5.xlarge (16GB RAM)
master_node_cost = (
requirements.master_nodes *
aws_pricing["m5.xlarge"]["cost_hour"] *
hours_per_month
)
# Coordinating nodes: c5.2xlarge (CPU optimized)
coord_node_cost = (
requirements.coordinating_nodes *
aws_pricing["c5.2xlarge"]["cost_hour"] *
hours_per_month
)
# Storage: EBS gp3
storage_cost = (
requirements.data_nodes *
requirements.storage_per_node_gb *
aws_pricing["ebs_gp3_gb"]
)
total = data_node_cost + master_node_cost + coord_node_cost + storage_cost
return {
"data_nodes": round(data_node_cost, 2),
"master_nodes": round(master_node_cost, 2),
"coordinating_nodes": round(coord_node_cost, 2),
"storage": round(storage_cost, 2),
"total_monthly": round(total, 2)
}
# =============================================================================
# Example: Plan for Our System
# =============================================================================
def plan_for_product_search():
"""Plan capacity for product search system."""
planner = CapacityPlanner()
# Normal operation
normal = planner.plan_capacity(
document_count=50_000_000,
avg_document_size_kb=10,
replica_count=1,
queries_per_second=10_000,
indexing_per_second=100,
target_latency_ms=200,
growth_months=12,
monthly_growth_rate=0.05
)
print("=== NORMAL CAPACITY ===")
print(f"Data nodes: {normal.data_nodes}")
print(f"RAM per node: {normal.ram_per_node_gb} GB")
print(f"Storage per node: {normal.storage_per_node_gb} GB")
print(f"Coordinating nodes: {normal.coordinating_nodes}")
print(f"Total RAM: {normal.total_ram_gb} GB")
print(f"Total Storage: {normal.total_storage_gb} GB")
cost = planner.estimate_cost(normal)
print(f"\nEstimated monthly cost: ${cost['total_monthly']:,.2f}")
# Black Friday (5x traffic)
black_friday = planner.plan_capacity(
document_count=50_000_000,
avg_document_size_kb=10,
replica_count=1,
queries_per_second=50_000, # 5x
indexing_per_second=500, # 5x
target_latency_ms=200,
growth_months=0,
monthly_growth_rate=0
)
print("\n=== BLACK FRIDAY CAPACITY ===")
print(f"Data nodes: {black_friday.data_nodes}")
print(f"Coordinating nodes: {black_friday.coordinating_nodes}")
bf_cost = planner.estimate_cost(black_friday)
print(f"Estimated monthly cost: ${bf_cost['total_monthly']:,.2f}")
# Output:
# === NORMAL CAPACITY ===
# Data nodes: 6
# RAM per node: 64 GB
# Storage per node: 1000 GB
# Coordinating nodes: 2
# Total RAM: 440 GB
# Total Storage: 6000 GB
#
# Estimated monthly cost: $3,456.72
#
# === BLACK FRIDAY CAPACITY ===
# Data nodes: 22
# Coordinating nodes: 10
# Estimated monthly cost: $11,234.56
Part III: Handling Traffic Spikes
Chapter 3: Black Friday Preparation
3.1 Scaling Strategy
# operations/scaling.py
"""
Strategies for handling traffic spikes.
"""
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
from enum import Enum
class ScaleDirection(Enum):
UP = "up"
DOWN = "down"
@dataclass
class ScaleEvent:
"""Scheduled scaling event."""
name: str
direction: ScaleDirection
timestamp: datetime
data_nodes: int
coordinating_nodes: int
replicas: int
class TrafficSpikeHandler:
"""
Manages scaling for known traffic spikes.
"""
def __init__(self, es_client, k8s_client):
self.es = es_client
self.k8s = k8s_client
# Normal configuration
self.normal_config = {
"data_nodes": 6,
"coordinating_nodes": 2,
"replicas": 1
}
def create_black_friday_plan(
self,
event_start: datetime,
event_end: datetime
) -> List[ScaleEvent]:
"""
Create scaling plan for Black Friday.
Strategy:
1. Pre-scale 24 hours before (warm up cluster)
2. Add replicas for read capacity
3. Scale down 24 hours after
"""
events = []
# Pre-scale: 24 hours before
events.append(ScaleEvent(
name="pre_scale",
direction=ScaleDirection.UP,
timestamp=event_start - timedelta(hours=24),
data_nodes=12, # 2x data nodes
coordinating_nodes=6, # 3x coordinating
replicas=2 # Extra replica
))
# Peak scale: At event start
events.append(ScaleEvent(
name="peak_scale",
direction=ScaleDirection.UP,
timestamp=event_start,
data_nodes=18, # 3x data nodes
coordinating_nodes=10, # 5x coordinating
replicas=2
))
# Post-event: Start scaling down
events.append(ScaleEvent(
name="scale_down_1",
direction=ScaleDirection.DOWN,
timestamp=event_end + timedelta(hours=6),
data_nodes=12,
coordinating_nodes=6,
replicas=2
))
# Full scale down: 24 hours after
events.append(ScaleEvent(
name="scale_down_final",
direction=ScaleDirection.DOWN,
timestamp=event_end + timedelta(hours=24),
data_nodes=6,
coordinating_nodes=2,
replicas=1
))
return events
async def execute_scale_event(self, event: ScaleEvent):
"""Execute a scaling event."""
print(f"Executing scale event: {event.name}")
# 1. Scale Kubernetes deployments
await self._scale_k8s_deployment(
"elasticsearch-data",
event.data_nodes
)
await self._scale_k8s_deployment(
"elasticsearch-coordinating",
event.coordinating_nodes
)
# 2. Wait for nodes to join cluster
await self._wait_for_cluster_health(
expected_nodes=event.data_nodes + 3 + event.coordinating_nodes
)
# 3. Adjust replicas
if event.replicas != self.normal_config["replicas"]:
await self._set_replica_count(event.replicas)
print(f"Scale event {event.name} completed")
async def _scale_k8s_deployment(self, deployment: str, replicas: int):
"""Scale Kubernetes deployment."""
await self.k8s.apps_v1.patch_namespaced_deployment_scale(
name=deployment,
namespace="search",
body={"spec": {"replicas": replicas}}
)
async def _wait_for_cluster_health(
self,
expected_nodes: int,
timeout_minutes: int = 30
):
"""Wait for cluster to stabilize."""
import asyncio
deadline = datetime.utcnow() + timedelta(minutes=timeout_minutes)
while datetime.utcnow() < deadline:
health = await self.es.cluster.health()
if (health["number_of_nodes"] >= expected_nodes and
health["status"] in ("green", "yellow")):
return
print(f"Waiting for cluster... "
f"nodes: {health['number_of_nodes']}/{expected_nodes}, "
f"status: {health['status']}")
await asyncio.sleep(30)
raise TimeoutError("Cluster did not stabilize in time")
async def _set_replica_count(self, replicas: int):
"""Adjust replica count for all indices."""
await self.es.indices.put_settings(
index="products*",
body={
"index": {
"number_of_replicas": replicas
}
}
)
# =============================================================================
# Graceful Degradation
# =============================================================================
class GracefulDegradation:
"""
Implements graceful degradation under load.
"""
def __init__(self, es_client, cache, config):
self.es = es_client
self.cache = cache
self.config = config
# Degradation thresholds
self.thresholds = {
"latency_warning_ms": 500,
"latency_critical_ms": 1000,
"cpu_warning_pct": 70,
"cpu_critical_pct": 85,
"queue_warning": 500,
"queue_critical": 1000
}
self.degradation_level = 0 # 0=normal, 1=warning, 2=critical
async def check_and_degrade(self) -> int:
"""Check cluster health and apply degradation if needed."""
metrics = await self._get_cluster_metrics()
new_level = self._calculate_degradation_level(metrics)
if new_level != self.degradation_level:
await self._apply_degradation(new_level)
self.degradation_level = new_level
return new_level
def _calculate_degradation_level(self, metrics: dict) -> int:
"""Determine degradation level from metrics."""
# Critical conditions
if (metrics["latency_p99_ms"] > self.thresholds["latency_critical_ms"] or
metrics["cpu_pct"] > self.thresholds["cpu_critical_pct"] or
metrics["search_queue"] > self.thresholds["queue_critical"]):
return 2
# Warning conditions
if (metrics["latency_p99_ms"] > self.thresholds["latency_warning_ms"] or
metrics["cpu_pct"] > self.thresholds["cpu_warning_pct"] or
metrics["search_queue"] > self.thresholds["queue_warning"]):
return 1
return 0
async def _apply_degradation(self, level: int):
"""Apply degradation measures."""
if level == 0:
# Normal operation
await self._restore_normal()
elif level == 1:
# Warning level
await self._apply_warning_degradation()
elif level == 2:
# Critical level
await self._apply_critical_degradation()
async def _apply_warning_degradation(self):
"""Apply warning-level degradation."""
print("Applying WARNING degradation")
# Increase cache TTL
self.cache.default_ttl = 300 # 5 minutes
# Reduce result size
self.config.max_results = 50 # Down from 100
# Disable expensive features
self.config.enable_spell_check = False
self.config.enable_personalization = False
async def _apply_critical_degradation(self):
"""Apply critical-level degradation."""
print("Applying CRITICAL degradation")
# Aggressive caching
self.cache.default_ttl = 600 # 10 minutes
# Minimal results
self.config.max_results = 20
# Disable all expensive features
self.config.enable_spell_check = False
self.config.enable_personalization = False
self.config.enable_facets = False # Only essential
self.config.enable_highlighting = False
# Simplified query (no function score)
self.config.use_simple_query = True
async def _restore_normal(self):
"""Restore normal operation."""
print("Restoring NORMAL operation")
self.cache.default_ttl = 60
self.config.max_results = 100
self.config.enable_spell_check = True
self.config.enable_personalization = True
self.config.enable_facets = True
self.config.enable_highlighting = True
self.config.use_simple_query = False
async def _get_cluster_metrics(self) -> dict:
"""Get current cluster metrics."""
stats = await self.es.nodes.stats(metric=["os", "thread_pool"])
# Aggregate across nodes
cpu_pcts = []
search_queues = []
for node_id, node_stats in stats["nodes"].items():
cpu_pcts.append(node_stats["os"]["cpu"]["percent"])
search_queues.append(
node_stats["thread_pool"]["search"]["queue"]
)
return {
"cpu_pct": max(cpu_pcts) if cpu_pcts else 0,
"search_queue": sum(search_queues),
"latency_p99_ms": await self._get_search_latency_p99()
}
async def _get_search_latency_p99(self) -> float:
"""Get p99 search latency from metrics."""
# In production, this would come from metrics system
return 150.0
3.2 Pre-Event Checklist
# operations/pre_event_checklist.py
"""
Pre-traffic-spike preparation checklist.
"""
from dataclasses import dataclass
from typing import List, Tuple
from datetime import datetime
@dataclass
class ChecklistItem:
"""A checklist item with verification."""
name: str
description: str
verification: str
automated: bool = False
class PreEventChecklist:
"""
Checklist for preparing for traffic spikes.
"""
CHECKLIST = [
# Infrastructure
ChecklistItem(
name="scale_cluster",
description="Scale cluster to peak capacity",
verification="kubectl get pods -l app=elasticsearch | grep Running",
automated=True
),
ChecklistItem(
name="increase_replicas",
description="Increase replica count for read capacity",
verification="GET /_cat/indices?v | check replicas column",
automated=True
),
ChecklistItem(
name="warm_cache",
description="Pre-warm caches with popular queries",
verification="Cache hit rate > 80%",
automated=True
),
# Configuration
ChecklistItem(
name="disable_reindex",
description="Pause any reindexing operations",
verification="No active reindex tasks",
automated=True
),
ChecklistItem(
name="increase_refresh_interval",
description="Increase refresh interval to 30s",
verification="GET /products/_settings | refresh_interval=30s",
automated=True
),
ChecklistItem(
name="optimize_indices",
description="Force merge to optimize segments",
verification="Segment count per shard < 5",
automated=True
),
# Monitoring
ChecklistItem(
name="alert_thresholds",
description="Adjust alert thresholds for peak",
verification="PagerDuty thresholds updated",
automated=False
),
ChecklistItem(
name="dashboard_ready",
description="Ensure dashboards accessible",
verification="Load Grafana dashboard",
automated=False
),
ChecklistItem(
name="runbooks_reviewed",
description="Team reviewed runbooks",
verification="Team confirmation",
automated=False
),
# Testing
ChecklistItem(
name="load_test",
description="Run load test at expected peak",
verification="Latency < 200ms at 50K QPS",
automated=True
),
ChecklistItem(
name="failover_test",
description="Test node failure recovery",
verification="Service continues after node kill",
automated=False
),
# Team
ChecklistItem(
name="oncall_schedule",
description="Oncall schedule confirmed",
verification="PagerDuty schedule checked",
automated=False
),
ChecklistItem(
name="war_room",
description="War room channel created",
verification="Slack channel exists",
automated=False
),
]
def __init__(self, es_client, k8s_client, metrics_client):
self.es = es_client
self.k8s = k8s_client
self.metrics = metrics_client
async def run_checklist(self) -> List[Tuple[str, bool, str]]:
"""
Run through all checklist items.
Returns list of (name, passed, message) tuples.
"""
results = []
for item in self.CHECKLIST:
if item.automated:
passed, message = await self._verify_automated(item)
else:
passed, message = False, "Manual verification required"
results.append((item.name, passed, message))
status = "✓" if passed else "✗"
print(f"[{status}] {item.name}: {message}")
return results
async def _verify_automated(
self,
item: ChecklistItem
) -> Tuple[bool, str]:
"""Verify an automated checklist item."""
try:
if item.name == "scale_cluster":
return await self._verify_cluster_scaled()
elif item.name == "increase_replicas":
return await self._verify_replicas()
elif item.name == "warm_cache":
return await self._verify_cache_warm()
elif item.name == "disable_reindex":
return await self._verify_no_reindex()
elif item.name == "increase_refresh_interval":
return await self._verify_refresh_interval()
elif item.name == "optimize_indices":
return await self._verify_segments()
elif item.name == "load_test":
return await self._verify_load_test()
else:
return False, "Unknown automated check"
except Exception as e:
return False, f"Error: {str(e)}"
async def _verify_cluster_scaled(self) -> Tuple[bool, str]:
"""Verify cluster is scaled."""
health = await self.es.cluster.health()
nodes = health["number_of_nodes"]
expected = 18 + 3 + 10 # data + master + coord
if nodes >= expected:
return True, f"{nodes} nodes ready"
return False, f"Only {nodes}/{expected} nodes"
async def _verify_replicas(self) -> Tuple[bool, str]:
"""Verify replica count."""
settings = await self.es.indices.get_settings(index="products")
replicas = int(
settings["products"]["settings"]["index"]["number_of_replicas"]
)
if replicas >= 2:
return True, f"{replicas} replicas configured"
return False, f"Only {replicas} replicas"
async def _verify_cache_warm(self) -> Tuple[bool, str]:
"""Verify cache is warm."""
# Check cache hit rate from metrics
hit_rate = 0.85 # Would come from metrics system
if hit_rate >= 0.8:
return True, f"Cache hit rate: {hit_rate:.1%}"
return False, f"Cache hit rate only {hit_rate:.1%}"
async def _verify_no_reindex(self) -> Tuple[bool, str]:
"""Verify no reindex in progress."""
tasks = await self.es.tasks.list(actions="*reindex*")
if not tasks.get("nodes"):
return True, "No reindex tasks"
return False, "Reindex in progress"
async def _verify_refresh_interval(self) -> Tuple[bool, str]:
"""Verify refresh interval increased."""
settings = await self.es.indices.get_settings(index="products")
interval = settings["products"]["settings"]["index"].get(
"refresh_interval", "1s"
)
if "30" in interval or "60" in interval:
return True, f"Refresh interval: {interval}"
return False, f"Refresh interval: {interval} (should be 30s+)"
async def _verify_segments(self) -> Tuple[bool, str]:
"""Verify segment count is low."""
stats = await self.es.indices.stats(index="products", metric="segments")
segment_count = stats["_all"]["primaries"]["segments"]["count"]
shard_count = 32 # Our primary shard count
avg_segments = segment_count / shard_count
if avg_segments <= 5:
return True, f"Avg segments/shard: {avg_segments:.1f}"
return False, f"Avg segments/shard: {avg_segments:.1f} (should be ≤5)"
async def _verify_load_test(self) -> Tuple[bool, str]:
"""Verify load test passed."""
# Would check recent load test results
return True, "Load test passed: 48K QPS at 180ms p99"
Part IV: Monitoring and Alerting
Chapter 4: Observability
4.1 Key Metrics
# monitoring/search_metrics.py
"""
Search system monitoring metrics.
"""
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
class MetricSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class MetricDefinition:
"""Definition of a monitoring metric."""
name: str
description: str
unit: str
warning_threshold: float
critical_threshold: float
comparison: str # "gt", "lt", "eq"
class SearchMetrics:
"""
Defines and collects search system metrics.
"""
# ==========================================================================
# Metric Definitions
# ==========================================================================
CLUSTER_METRICS = [
MetricDefinition(
name="cluster_status",
description="Cluster health status (0=green, 1=yellow, 2=red)",
unit="status",
warning_threshold=1,
critical_threshold=2,
comparison="gt"
),
MetricDefinition(
name="active_shards_percent",
description="Percentage of active shards",
unit="percent",
warning_threshold=95,
critical_threshold=90,
comparison="lt"
),
MetricDefinition(
name="relocating_shards",
description="Number of relocating shards",
unit="count",
warning_threshold=5,
critical_threshold=20,
comparison="gt"
),
MetricDefinition(
name="unassigned_shards",
description="Number of unassigned shards",
unit="count",
warning_threshold=1,
critical_threshold=5,
comparison="gt"
),
]
PERFORMANCE_METRICS = [
MetricDefinition(
name="search_latency_p50",
description="Search latency 50th percentile",
unit="ms",
warning_threshold=100,
critical_threshold=200,
comparison="gt"
),
MetricDefinition(
name="search_latency_p99",
description="Search latency 99th percentile",
unit="ms",
warning_threshold=500,
critical_threshold=1000,
comparison="gt"
),
MetricDefinition(
name="search_rate",
description="Queries per second",
unit="qps",
warning_threshold=None, # No upper limit warning
critical_threshold=None,
comparison="info"
),
MetricDefinition(
name="indexing_latency_p99",
description="Indexing latency 99th percentile",
unit="ms",
warning_threshold=1000,
critical_threshold=5000,
comparison="gt"
),
]
RESOURCE_METRICS = [
MetricDefinition(
name="cpu_percent",
description="CPU utilization",
unit="percent",
warning_threshold=70,
critical_threshold=85,
comparison="gt"
),
MetricDefinition(
name="heap_used_percent",
description="JVM heap utilization",
unit="percent",
warning_threshold=75,
critical_threshold=85,
comparison="gt"
),
MetricDefinition(
name="disk_used_percent",
description="Disk utilization",
unit="percent",
warning_threshold=75,
critical_threshold=85,
comparison="gt"
),
MetricDefinition(
name="search_queue_size",
description="Search thread pool queue size",
unit="count",
warning_threshold=500,
critical_threshold=1000,
comparison="gt"
),
MetricDefinition(
name="search_rejected",
description="Search requests rejected",
unit="count",
warning_threshold=1,
critical_threshold=10,
comparison="gt"
),
]
INDEXING_METRICS = [
MetricDefinition(
name="indexing_lag_seconds",
description="CDC indexing lag",
unit="seconds",
warning_threshold=60,
critical_threshold=300,
comparison="gt"
),
MetricDefinition(
name="indexing_error_rate",
description="Indexing error rate",
unit="percent",
warning_threshold=1,
critical_threshold=5,
comparison="gt"
),
MetricDefinition(
name="dlq_size",
description="Dead letter queue size",
unit="count",
warning_threshold=100,
critical_threshold=1000,
comparison="gt"
),
]
BUSINESS_METRICS = [
MetricDefinition(
name="zero_result_rate",
description="Searches returning zero results",
unit="percent",
warning_threshold=10,
critical_threshold=20,
comparison="gt"
),
MetricDefinition(
name="click_through_rate",
description="Search result click-through rate",
unit="percent",
warning_threshold=20,
critical_threshold=10,
comparison="lt"
),
]
def __init__(self, es_client, prometheus_client):
self.es = es_client
self.prom = prometheus_client
async def collect_all_metrics(self) -> Dict[str, float]:
"""Collect all metrics."""
metrics = {}
# Cluster metrics
cluster_health = await self.es.cluster.health()
metrics["cluster_status"] = {"green": 0, "yellow": 1, "red": 2}[
cluster_health["status"]
]
metrics["active_shards_percent"] = cluster_health["active_shards_percent_as_number"]
metrics["relocating_shards"] = cluster_health["relocating_shards"]
metrics["unassigned_shards"] = cluster_health["unassigned_shards"]
# Node stats
node_stats = await self.es.nodes.stats(
metric=["os", "jvm", "thread_pool", "fs"]
)
cpu_pcts = []
heap_pcts = []
disk_pcts = []
search_queues = []
search_rejected = []
for node_id, stats in node_stats["nodes"].items():
cpu_pcts.append(stats["os"]["cpu"]["percent"])
heap_pcts.append(stats["jvm"]["mem"]["heap_used_percent"])
total_disk = stats["fs"]["total"]["total_in_bytes"]
free_disk = stats["fs"]["total"]["free_in_bytes"]
disk_pcts.append(100 * (total_disk - free_disk) / total_disk)
search_queues.append(stats["thread_pool"]["search"]["queue"])
search_rejected.append(stats["thread_pool"]["search"]["rejected"])
metrics["cpu_percent"] = max(cpu_pcts)
metrics["heap_used_percent"] = max(heap_pcts)
metrics["disk_used_percent"] = max(disk_pcts)
metrics["search_queue_size"] = sum(search_queues)
metrics["search_rejected"] = sum(search_rejected)
return metrics
def evaluate_alerts(self, metrics: Dict[str, float]) -> List[dict]:
"""Evaluate metrics against thresholds."""
alerts = []
all_definitions = (
self.CLUSTER_METRICS +
self.PERFORMANCE_METRICS +
self.RESOURCE_METRICS +
self.INDEXING_METRICS +
self.BUSINESS_METRICS
)
for defn in all_definitions:
if defn.name not in metrics:
continue
value = metrics[defn.name]
severity = self._evaluate_threshold(value, defn)
if severity:
alerts.append({
"metric": defn.name,
"value": value,
"threshold": (
defn.critical_threshold
if severity == MetricSeverity.CRITICAL
else defn.warning_threshold
),
"severity": severity.value,
"description": defn.description
})
return alerts
def _evaluate_threshold(
self,
value: float,
defn: MetricDefinition
) -> MetricSeverity:
"""Evaluate value against thresholds."""
if defn.comparison == "info":
return None
if defn.comparison == "gt":
if defn.critical_threshold and value > defn.critical_threshold:
return MetricSeverity.CRITICAL
if defn.warning_threshold and value > defn.warning_threshold:
return MetricSeverity.WARNING
elif defn.comparison == "lt":
if defn.critical_threshold and value < defn.critical_threshold:
return MetricSeverity.CRITICAL
if defn.warning_threshold and value < defn.warning_threshold:
return MetricSeverity.WARNING
return None
# =============================================================================
# Grafana Dashboard Definition
# =============================================================================
GRAFANA_DASHBOARD = """
SEARCH SYSTEM DASHBOARD
┌────────────────────────────────────────────────────────────────────────────┐
│ CLUSTER HEALTH │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ ┌──────────────┐ │
│ │ Status: GREEN │ │ Nodes: 11/11 │ │ Shards: 100% │ │ Disk: 45% │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ └──────────────┘ │
├────────────────────────────────────────────────────────────────────────────┤
│ SEARCH PERFORMANCE │
│ │
│ Queries/sec Latency p99 Error Rate │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 12,345 │ │ 145ms │ │ 0.02% │ │
│ │ ▄▄▄▆▇█▇▆▄ │ │ ▂▂▃▃▂▂▃▂▂ │ │ ▁▁▁▁▁▁▁▁▁ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Latency Distribution (last hour) │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ p50: 45ms │████████████████ │ │
│ │ p90: 98ms │████████████████████████████████ │ │
│ │ p99: 145ms │████████████████████████████████████████████ │ │
│ │ max: 890ms │████████████████████████████████████████████████████████│ │
│ └─────────────────────────────────────────────────────────────────────┘ │
├────────────────────────────────────────────────────────────────────────────┤
│ RESOURCE UTILIZATION │
│ │
│ CPU by Node Heap by Node │
│ ┌───────────────────────┐ ┌───────────────────────┐ │
│ │ data-1: ████████░░ 78%│ │ data-1: ██████░░░░ 62%│ │
│ │ data-2: ███████░░░ 68%│ │ data-2: ██████░░░░ 58%│ │
│ │ data-3: ████████░░ 75%│ │ data-3: ███████░░░ 67%│ │
│ │ data-4: ███████░░░ 72%│ │ data-4: ██████░░░░ 61%│ │
│ │ data-5: ████████░░ 76%│ │ data-5: ███████░░░ 65%│ │
│ │ data-6: ███████░░░ 70%│ │ data-6: ██████░░░░ 59%│ │
│ └───────────────────────┘ └───────────────────────┘ │
├────────────────────────────────────────────────────────────────────────────┤
│ INDEXING PIPELINE │
│ │
│ Indexing Lag Events/sec DLQ Size │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 12s │ │ 892 │ │ 3 │ │
│ │ ▂▂▃▂▂▂▂▂▂ │ │ ▄▅▆▇▆▅▄▅▆ │ │ ▁▁▁▁▁▁▁▁▁ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├────────────────────────────────────────────────────────────────────────────┤
│ SEARCH QUALITY │
│ │
│ Zero Result Rate Click-Through Rate Conversion Rate │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 4.2% │ │ 42.3% │ │ 3.8% │ │
│ │ ▃▃▃▂▂▂▂▂▃ │ │ ▅▅▆▆▆▆▆▅▅ │ │ ▃▃▄▄▄▄▄▃▃ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└────────────────────────────────────────────────────────────────────────────┘
"""
4.2 Alerting Configuration
# monitoring/alerting.py
"""
Alerting configuration for search system.
"""
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
class AlertChannel(Enum):
SLACK = "slack"
PAGERDUTY = "pagerduty"
EMAIL = "email"
@dataclass
class AlertRule:
"""Alert rule definition."""
name: str
condition: str
threshold: float
duration: str # How long condition must be true
severity: str
channels: List[AlertChannel]
runbook_url: str
description: str
class AlertingConfig:
"""
Alerting configuration for search system.
"""
ALERT_RULES = [
# =======================================================================
# Critical Alerts (PagerDuty)
# =======================================================================
AlertRule(
name="cluster_red",
condition="elasticsearch_cluster_health_status == 2",
threshold=2,
duration="1m",
severity="critical",
channels=[AlertChannel.PAGERDUTY, AlertChannel.SLACK],
runbook_url="https://wiki/runbooks/es-cluster-red",
description="Elasticsearch cluster is RED. Data loss possible."
),
AlertRule(
name="search_latency_critical",
condition="elasticsearch_search_latency_p99 > 1000",
threshold=1000,
duration="5m",
severity="critical",
channels=[AlertChannel.PAGERDUTY, AlertChannel.SLACK],
runbook_url="https://wiki/runbooks/es-high-latency",
description="Search latency p99 > 1s for 5 minutes."
),
AlertRule(
name="search_errors_critical",
condition="rate(elasticsearch_search_errors[5m]) > 0.05",
threshold=0.05,
duration="5m",
severity="critical",
channels=[AlertChannel.PAGERDUTY, AlertChannel.SLACK],
runbook_url="https://wiki/runbooks/es-search-errors",
description="Search error rate > 5% for 5 minutes."
),
AlertRule(
name="indexing_lag_critical",
condition="search_indexing_lag_seconds > 300",
threshold=300,
duration="10m",
severity="critical",
channels=[AlertChannel.PAGERDUTY, AlertChannel.SLACK],
runbook_url="https://wiki/runbooks/es-indexing-lag",
description="Search index > 5 minutes behind database."
),
AlertRule(
name="node_down",
condition="elasticsearch_cluster_nodes < expected_nodes",
threshold=None,
duration="2m",
severity="critical",
channels=[AlertChannel.PAGERDUTY, AlertChannel.SLACK],
runbook_url="https://wiki/runbooks/es-node-down",
description="Elasticsearch node is down."
),
# =======================================================================
# Warning Alerts (Slack)
# =======================================================================
AlertRule(
name="cluster_yellow",
condition="elasticsearch_cluster_health_status == 1",
threshold=1,
duration="10m",
severity="warning",
channels=[AlertChannel.SLACK],
runbook_url="https://wiki/runbooks/es-cluster-yellow",
description="Elasticsearch cluster is YELLOW. Replicas unavailable."
),
AlertRule(
name="search_latency_warning",
condition="elasticsearch_search_latency_p99 > 500",
threshold=500,
duration="10m",
severity="warning",
channels=[AlertChannel.SLACK],
runbook_url="https://wiki/runbooks/es-high-latency",
description="Search latency p99 > 500ms for 10 minutes."
),
AlertRule(
name="heap_pressure",
condition="elasticsearch_jvm_heap_used_percent > 75",
threshold=75,
duration="15m",
severity="warning",
channels=[AlertChannel.SLACK],
runbook_url="https://wiki/runbooks/es-heap-pressure",
description="JVM heap usage > 75% for 15 minutes."
),
AlertRule(
name="disk_space_warning",
condition="elasticsearch_disk_used_percent > 75",
threshold=75,
duration="30m",
severity="warning",
channels=[AlertChannel.SLACK],
runbook_url="https://wiki/runbooks/es-disk-space",
description="Disk usage > 75%."
),
AlertRule(
name="search_queue_building",
condition="elasticsearch_search_queue_size > 500",
threshold=500,
duration="5m",
severity="warning",
channels=[AlertChannel.SLACK],
runbook_url="https://wiki/runbooks/es-queue-building",
description="Search queue building up. May indicate overload."
),
AlertRule(
name="zero_result_rate_high",
condition="search_zero_result_rate > 0.10",
threshold=0.10,
duration="30m",
severity="warning",
channels=[AlertChannel.SLACK],
runbook_url="https://wiki/runbooks/search-zero-results",
description="Zero result rate > 10%. Check indexing and synonyms."
),
]
@classmethod
def generate_prometheus_rules(cls) -> str:
"""Generate Prometheus alerting rules."""
rules = []
for alert in cls.ALERT_RULES:
rule = f"""
- alert: {alert.name}
expr: {alert.condition}
for: {alert.duration}
labels:
severity: {alert.severity}
annotations:
summary: "{alert.description}"
runbook_url: "{alert.runbook_url}"
"""
rules.append(rule)
return f"""
groups:
- name: elasticsearch
rules:
{''.join(rules)}
"""
Part V: Disaster Recovery
Chapter 5: Backup and Recovery
5.1 Snapshot Strategy
# operations/disaster_recovery.py
"""
Disaster recovery for Elasticsearch.
"""
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
import asyncio
@dataclass
class SnapshotPolicy:
"""Snapshot policy configuration."""
name: str
schedule: str # Cron expression
indices: List[str]
retention_days: int
repository: str
class DisasterRecovery:
"""
Manages backups and disaster recovery.
"""
def __init__(self, es_client, s3_client):
self.es = es_client
self.s3 = s3_client
self.repository = "s3_backup"
self.bucket = "elasticsearch-backups-prod"
async def setup_repository(self):
"""Set up S3 snapshot repository."""
await self.es.snapshot.create_repository(
repository=self.repository,
body={
"type": "s3",
"settings": {
"bucket": self.bucket,
"region": "us-east-1",
"base_path": "snapshots",
"compress": True,
"chunk_size": "1gb",
"max_restore_bytes_per_sec": "500mb",
"max_snapshot_bytes_per_sec": "200mb"
}
}
)
async def create_snapshot(
self,
name: str = None,
indices: List[str] = None
) -> dict:
"""
Create a snapshot.
"""
if not name:
name = f"snapshot_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
body = {
"indices": indices or ["products*"],
"include_global_state": False,
"metadata": {
"created_by": "automated_backup",
"created_at": datetime.utcnow().isoformat()
}
}
response = await self.es.snapshot.create(
repository=self.repository,
snapshot=name,
body=body,
wait_for_completion=False # Don't block
)
return response
async def restore_snapshot(
self,
snapshot_name: str,
indices: List[str] = None,
rename_pattern: str = None,
rename_replacement: str = None
) -> dict:
"""
Restore from snapshot.
Args:
snapshot_name: Name of snapshot to restore
indices: Specific indices to restore (None = all)
rename_pattern: Regex pattern for renaming
rename_replacement: Replacement string
"""
body = {
"indices": indices or ["products*"],
"include_global_state": False,
"include_aliases": True
}
# Optionally rename indices (for testing restores)
if rename_pattern:
body["rename_pattern"] = rename_pattern
body["rename_replacement"] = rename_replacement
response = await self.es.snapshot.restore(
repository=self.repository,
snapshot=snapshot_name,
body=body,
wait_for_completion=False
)
return response
async def list_snapshots(
self,
limit: int = 10
) -> List[dict]:
"""List recent snapshots."""
response = await self.es.snapshot.get(
repository=self.repository,
snapshot="_all"
)
snapshots = response.get("snapshots", [])
# Sort by start time, newest first
snapshots.sort(key=lambda s: s["start_time"], reverse=True)
return snapshots[:limit]
async def cleanup_old_snapshots(self, retention_days: int = 30):
"""Delete snapshots older than retention period."""
cutoff = datetime.utcnow() - timedelta(days=retention_days)
snapshots = await self.list_snapshots(limit=1000)
deleted = 0
for snapshot in snapshots:
start_time = datetime.fromisoformat(
snapshot["start_time"].replace("Z", "+00:00")
)
if start_time.replace(tzinfo=None) < cutoff:
await self.es.snapshot.delete(
repository=self.repository,
snapshot=snapshot["snapshot"]
)
deleted += 1
return deleted
async def verify_snapshot(self, snapshot_name: str) -> dict:
"""Verify snapshot integrity."""
# Get snapshot info
response = await self.es.snapshot.get(
repository=self.repository,
snapshot=snapshot_name
)
snapshot = response["snapshots"][0]
verification = {
"snapshot": snapshot_name,
"state": snapshot["state"],
"indices": len(snapshot["indices"]),
"shards": {
"total": snapshot["shards"]["total"],
"successful": snapshot["shards"]["successful"],
"failed": snapshot["shards"]["failed"]
},
"duration_seconds": snapshot["duration_in_millis"] / 1000,
"size_bytes": 0 # Would need to calculate from S3
}
verification["healthy"] = (
snapshot["state"] == "SUCCESS" and
snapshot["shards"]["failed"] == 0
)
return verification
# =============================================================================
# DR Runbook
# =============================================================================
DR_RUNBOOK = """
DISASTER RECOVERY RUNBOOK
SCENARIO 1: Single Node Failure
─────────────────────────────────
Impact: Temporary yellow cluster, no data loss
Recovery: Automatic (Kubernetes restarts pod)
Action: Monitor for node to rejoin, shards to reallocate
ETA: 5-10 minutes
SCENARIO 2: Multiple Node Failure (< quorum)
─────────────────────────────────────────────
Impact: Yellow/Red cluster, potential query failures
Recovery:
1. Scale up replacement nodes
2. Wait for shard recovery
3. Verify cluster health
Action:
kubectl scale statefulset elasticsearch-data --replicas=6
ETA: 15-30 minutes
SCENARIO 3: Complete Cluster Loss
─────────────────────────────────
Impact: Total search outage
Recovery:
1. Provision new cluster
2. Restore from latest snapshot
3. Resume CDC pipeline
4. Verify data integrity
Commands:
# List available snapshots
curl -X GET "localhost:9200/_snapshot/s3_backup/_all"
# Restore latest snapshot
curl -X POST "localhost:9200/_snapshot/s3_backup/latest/_restore"
# Monitor restore progress
curl -X GET "localhost:9200/_recovery"
ETA: 1-4 hours depending on data size
SCENARIO 4: Data Corruption
───────────────────────────
Impact: Incorrect search results
Recovery:
1. Identify scope of corruption
2. Restore specific indices from snapshot
3. Or trigger full reindex from PostgreSQL
Commands:
# Close corrupted index
curl -X POST "localhost:9200/products/_close"
# Restore from snapshot
curl -X POST "localhost:9200/_snapshot/s3_backup/snapshot_20240101/_restore" -d '{
"indices": "products",
"rename_pattern": "(.+)",
"rename_replacement": "restored_$1"
}'
# Verify and swap
# ... verification steps ...
# Swap aliases
curl -X POST "localhost:9200/_aliases" -d '{
"actions": [
{"remove": {"index": "products", "alias": "products_live"}},
{"add": {"index": "restored_products", "alias": "products_live"}}
]
}'
ETA: 30 minutes - 2 hours
SCENARIO 5: Region Failure
──────────────────────────
Impact: Total search outage in affected region
Recovery:
1. Failover to secondary region
2. Update DNS/routing
3. Resume indexing in secondary
Prerequisites:
- Cross-region replication enabled
- Secondary cluster on standby
ETA: 15-30 minutes (if prepared)
"""
Part VI: Performance Tuning
Chapter 6: Optimization
6.1 Query Optimization
# optimization/query_tuning.py
"""
Query optimization techniques.
"""
from typing import Dict, List, Any
class QueryOptimizer:
"""
Optimizes Elasticsearch queries for performance.
"""
def __init__(self, es_client):
self.es = es_client
async def analyze_slow_queries(
self,
threshold_ms: int = 500
) -> List[dict]:
"""
Find and analyze slow queries from slow log.
"""
# Enable slow log (if not already)
await self.es.indices.put_settings(
index="products",
body={
"index.search.slowlog.threshold.query.warn": f"{threshold_ms}ms",
"index.search.slowlog.threshold.query.info": f"{threshold_ms // 2}ms",
"index.search.slowlog.level": "info"
}
)
# In production, slow queries would be in the slowlog
# Here we return optimization suggestions
return []
def optimize_query(self, query: dict) -> dict:
"""
Apply query optimizations.
"""
optimizations_applied = []
# 1. Move non-scoring clauses to filter context
query, moved = self._move_to_filter_context(query)
if moved:
optimizations_applied.append("moved_to_filter")
# 2. Use keyword fields for exact matches
query, fixed = self._use_keyword_fields(query)
if fixed:
optimizations_applied.append("keyword_fields")
# 3. Limit source fields
if "_source" not in query:
query["_source"] = ["name", "price", "brand", "category", "rating"]
optimizations_applied.append("limited_source")
# 4. Add request cache hint
query["request_cache"] = True
optimizations_applied.append("request_cache")
return {
"query": query,
"optimizations": optimizations_applied
}
def _move_to_filter_context(self, query: dict) -> tuple:
"""Move non-scoring clauses to filter context."""
moved = False
bool_query = query.get("query", {}).get("bool", {})
if not bool_query:
return query, moved
must = bool_query.get("must", [])
filters = bool_query.get("filter", [])
new_must = []
for clause in must:
# Term queries don't need scoring
if "term" in clause or "terms" in clause or "range" in clause:
filters.append(clause)
moved = True
else:
new_must.append(clause)
if moved:
bool_query["must"] = new_must if new_must else [{"match_all": {}}]
bool_query["filter"] = filters
return query, moved
def _use_keyword_fields(self, query: dict) -> tuple:
"""Use keyword subfields for exact matches."""
fixed = False
def fix_terms(obj):
nonlocal fixed
if isinstance(obj, dict):
for key, value in list(obj.items()):
if key in ("term", "terms"):
for field, val in list(value.items()):
# If querying text field for exact match, use keyword
if field in ("brand", "category", "color") and ".keyword" not in field:
value[f"{field}.keyword"] = val
del value[field]
fixed = True
else:
fix_terms(value)
elif isinstance(obj, list):
for item in obj:
fix_terms(item)
fix_terms(query)
return query, fixed
# =============================================================================
# Index Optimization
# =============================================================================
class IndexOptimizer:
"""
Index-level optimizations.
"""
def __init__(self, es_client):
self.es = es_client
async def optimize_for_search(self, index: str):
"""
Optimize index for search performance.
Call during low-traffic periods.
"""
# Force merge to reduce segment count
await self.es.indices.forcemerge(
index=index,
max_num_segments=1, # Single segment per shard
only_expunge_deletes=False
)
# Refresh to make all docs searchable
await self.es.indices.refresh(index=index)
# Clear caches (they'll warm up with new queries)
await self.es.indices.clear_cache(index=index)
async def get_optimization_recommendations(
self,
index: str
) -> List[dict]:
"""
Analyze index and provide recommendations.
"""
recommendations = []
# Get index stats
stats = await self.es.indices.stats(index=index)
index_stats = stats["indices"][index]
# Check segment count
primaries = index_stats["primaries"]
segment_count = primaries["segments"]["count"]
doc_count = primaries["docs"]["count"]
if segment_count > 50:
recommendations.append({
"type": "segments",
"severity": "medium",
"message": f"High segment count ({segment_count}). Consider force merge.",
"action": f"POST /{index}/_forcemerge?max_num_segments=5"
})
# Check deleted docs
deleted_docs = primaries["docs"]["deleted"]
if deleted_docs > doc_count * 0.1: # > 10% deleted
recommendations.append({
"type": "deleted_docs",
"severity": "low",
"message": f"High deleted doc count ({deleted_docs}). Consider force merge.",
"action": f"POST /{index}/_forcemerge?only_expunge_deletes=true"
})
# Check field data usage
fielddata = primaries.get("fielddata", {}).get("memory_size_in_bytes", 0)
if fielddata > 1024 * 1024 * 1024: # > 1GB
recommendations.append({
"type": "fielddata",
"severity": "high",
"message": f"High fielddata usage ({fielddata / 1024 / 1024:.0f}MB). "
"Check for text field aggregations.",
"action": "Use keyword fields for aggregations instead of text"
})
# Check refresh interval
settings = await self.es.indices.get_settings(index=index)
refresh_interval = settings[index]["settings"]["index"].get(
"refresh_interval", "1s"
)
if refresh_interval == "1s":
recommendations.append({
"type": "refresh_interval",
"severity": "low",
"message": "Default refresh interval (1s). Consider increasing for better indexing.",
"action": f'PUT /{index}/_settings {{"index.refresh_interval": "30s"}}'
})
return recommendations
6.2 Performance Checklist
# optimization/performance_checklist.py
"""
Performance optimization checklist.
"""
PERFORMANCE_CHECKLIST = """
ELASTICSEARCH PERFORMANCE CHECKLIST
═══════════════════════════════════════════════════════════════════════════════
HARDWARE & INFRASTRUCTURE
═══════════════════════════════════════════════════════════════════════════════
[ ] SSD storage (not spinning disks)
[ ] Sufficient RAM for OS cache (index size × 0.5)
[ ] Heap size = min(31GB, RAM/2)
[ ] Dedicated master nodes (at least 3)
[ ] Network optimized for low latency between nodes
═══════════════════════════════════════════════════════════════════════════════
INDEX CONFIGURATION
═══════════════════════════════════════════════════════════════════════════════
[ ] Appropriate shard count (20-50GB per shard)
[ ] Replica count based on read load
[ ] Refresh interval tuned (not default 1s for high-write)
[ ] Mapping optimized:
[ ] keyword for exact match/aggregations
[ ] text for full-text search
[ ] Disabled _source if not needed
[ ] No dynamic mapping in production
═══════════════════════════════════════════════════════════════════════════════
QUERY OPTIMIZATION
═══════════════════════════════════════════════════════════════════════════════
[ ] Use filter context for non-scoring clauses
[ ] Use keyword fields for exact matches
[ ] Limit returned fields (_source filtering)
[ ] Use request cache for repeated queries
[ ] Avoid script queries where possible
[ ] Use doc_values for sorting/aggregations
[ ] Pagination with search_after, not deep offset
═══════════════════════════════════════════════════════════════════════════════
INDEXING OPTIMIZATION
═══════════════════════════════════════════════════════════════════════════════
[ ] Bulk indexing (500-5000 docs per request)
[ ] Index refresh interval increased during bulk
[ ] Async translog for high-throughput indexing
[ ] Appropriate number of indexing threads
═══════════════════════════════════════════════════════════════════════════════
CLUSTER HEALTH
═══════════════════════════════════════════════════════════════════════════════
[ ] All nodes healthy
[ ] No unassigned shards
[ ] Shard balance across nodes
[ ] Circuit breakers configured
[ ] Thread pools sized appropriately
═══════════════════════════════════════════════════════════════════════════════
CACHING
═══════════════════════════════════════════════════════════════════════════════
[ ] Query cache enabled and sized
[ ] Request cache for common queries
[ ] Field data cache limited
[ ] Application-level caching for hot queries
═══════════════════════════════════════════════════════════════════════════════
MONITORING
═══════════════════════════════════════════════════════════════════════════════
[ ] Slow log enabled
[ ] Key metrics tracked (latency, throughput, errors)
[ ] Alerting configured
[ ] Dashboards available
"""
Summary
What We Learned Today
DAY 5 SUMMARY: SEARCH OPERATIONS & SCALE
CLUSTER ARCHITECTURE
├── Master nodes: 3 dedicated, small instances
├── Data nodes: Memory optimized, SSD required
├── Coordinating nodes: Query routing and aggregation
├── Shard strategy: 20-50GB per shard
└── Replicas for read scaling and HA
CAPACITY PLANNING
├── Storage: Raw data × 1.3 (overhead) × replicas × growth
├── Memory: Index size × 0.5 for OS cache + 31GB heap
├── CPU: QPS / 150 for simple queries
└── Plan for peak (Black Friday = 5x normal)
TRAFFIC SPIKES
├── Pre-scale 24 hours before
├── Add replicas for read capacity
├── Graceful degradation under load
├── Disable expensive features when critical
└── Checklist and runbook preparation
MONITORING
├── Cluster health: green/yellow/red
├── Performance: latency p50/p99, QPS, errors
├── Resources: CPU, heap, disk, queue sizes
├── Business: zero result rate, CTR
└── Alerting: Critical → PagerDuty, Warning → Slack
DISASTER RECOVERY
├── S3 snapshots (daily + before changes)
├── Retention policy (30 days)
├── Restore testing (monthly)
├── Cross-region replication for critical systems
└── Documented runbooks for each scenario
PERFORMANCE TUNING
├── Filter context for non-scoring clauses
├── Keyword fields for exact matches
├── Force merge during low traffic
├── Limit returned fields
└── Request cache for hot queries
Key Takeaways
OPERATIONS KEY TAKEAWAYS
1. DEDICATED NODE ROLES
Master, data, coordinating - separate concerns
Don't run everything on same nodes
2. PLAN FOR 3X-5X PEAK
Normal capacity won't handle Black Friday
Pre-scale, don't reactive scale
3. DEGRADE GRACEFULLY
Better slow search than no search
Disable features progressively
4. MEASURE EVERYTHING
Can't fix what you can't see
Latency, errors, queue depth, business metrics
5. PRACTICE RECOVERY
Untested backups are not backups
Run DR drills regularly
GOLDEN RULES:
├── Never more than 50GB per shard
├── Heap ≤ 31GB (compressed OOPs)
├── OS cache ≥ 50% of index size
├── Filter context for non-scoring
└── Snapshot before any change
Interview Tip
WHEN ASKED "HOW WOULD YOU HANDLE 10X TRAFFIC?"
"For a 10x traffic spike like Black Friday, I'd prepare in three phases:
BEFORE (24-48 hours):
- Scale cluster horizontally (3x data nodes, 5x coordinating)
- Add replicas for read capacity
- Force merge indexes to optimize segments
- Warm caches with popular queries
- Increase refresh interval to reduce indexing overhead
- Review and test runbooks with the team
DURING:
- Monitor key metrics: latency p99, error rate, queue depth
- Have graceful degradation ready:
Level 1: Disable spell check, personalization
Level 2: Simplify queries, reduce result size
Level 3: Serve from cache only for popular queries
- War room channel for real-time coordination
AFTER:
- Scale down gradually (not all at once)
- Review metrics and incidents
- Document lessons learned
The key is preparation - reactive scaling is too slow."
This shows you understand both the technical and operational aspects.
Week 7 Complete!
WEEK 7 SUMMARY: SEARCH SYSTEM
Day 1: Search Fundamentals
├── Inverted indexes
├── BM25 scoring
├── Text analysis
└── Document modeling
Day 2: Indexing Pipeline
├── CDC with Debezium
├── Kafka streaming
├── Bulk indexing
└── Zero-downtime reindex
Day 3: Query & Relevance
├── BM25 tuning
├── Query vs Filter
├── Boosting strategies
└── Function scores
Day 4: Advanced Features
├── Autocomplete
├── Faceted search
├── Synonyms
├── Personalization
Day 5: Operations & Scale
├── Cluster architecture
├── Capacity planning
├── Traffic spikes
├── Disaster recovery
TOTAL: ~11,000 lines of educational content
You can now design, build, and operate a production search system!
Next Week Preview
Week 8: Analytics Pipeline — "From events to insights"
- Event ingestion at scale
- Streaming vs batch processing
- Data modeling for analytics
- Late-arriving data handling
- Query optimization for OLAP
End of Week 7, Day 5
Next: Week 8 — Analytics Pipeline