Himanshu Kukreja
0%
LearnSystem DesignWeek 7Indexing Pipeline
Day 02

Week 7 — Day 2: Indexing Pipeline

System Design Mastery Series — Building Blocks Week


Introduction

Yesterday we learned how search engines think — inverted indexes, text analysis, and document modeling. But we left a critical question unanswered:

THE FRESHNESS PROBLEM

PostgreSQL (Source of Truth):
  Product "Nike Air Max" price: $150
  Updated at: 10:00:00

Elasticsearch (Search Index):
  Product "Nike Air Max" price: $120 (stale!)
  Last indexed: 09:00:00

Customer searches at 10:00:01:
  Sees price: $120
  Adds to cart
  Checkout shows: $150
  
Customer: "This is a scam!"

Today's Theme: "Keeping search fresh without killing the database"

We'll cover:

  • Change Data Capture (CDC) fundamentals
  • Debezium for PostgreSQL → Kafka streaming
  • Streaming vs batch indexing trade-offs
  • Bulk indexing without overwhelming Elasticsearch
  • Zero-downtime reindexing strategies
  • Handling indexing failures and lag

Part I: The Indexing Challenge

Chapter 1: Why Indexing Is Hard

1.1 The Naive Approach (Don't Do This)

# BAD: Synchronous dual-write

async def update_product(product_id: str, data: dict):
    # Update PostgreSQL
    await db.execute(
        "UPDATE products SET ... WHERE id = $1",
        product_id
    )
    
    # Update Elasticsearch
    await es.index(
        index="products",
        id=product_id,
        document=data
    )

Problems with dual-write:

DUAL-WRITE FAILURES

Scenario 1: ES write fails
├── PostgreSQL: Updated ✓
├── Elasticsearch: Failed ✗
└── Result: Data inconsistent, search shows old data

Scenario 2: Process crashes between writes
├── PostgreSQL: Updated ✓
├── Elasticsearch: Never reached
└── Result: Data inconsistent

Scenario 3: Network partition
├── PostgreSQL: Updated ✓
├── Elasticsearch: Times out, retries...
├── Transaction held open
└── Result: Database connection exhaustion

Scenario 4: Order of operations
├── Request A: Update price to $100
├── Request B: Update price to $120
├── PostgreSQL order: A then B → $120 ✓
├── ES order (due to retries): B then A → $100 ✗
└── Result: Elasticsearch has wrong value!

1.2 What We Need

REQUIREMENTS FOR RELIABLE INDEXING

1. CONSISTENCY
   └── Eventually consistent with source of truth (PostgreSQL)

2. RELIABILITY  
   └── Never lose an update, even during failures

3. ORDERING
   └── Updates applied in correct order

4. PERFORMANCE
   └── Don't slow down the main application

5. SCALABILITY
   └── Handle bulk imports (millions of products)

6. OBSERVABILITY
   └── Know when indexing falls behind

1.3 The Solution: Change Data Capture

CHANGE DATA CAPTURE (CDC) PATTERN

Instead of:
  Application → PostgreSQL
            → Elasticsearch (dual-write)

Use:
  Application → PostgreSQL → CDC → Kafka → Elasticsearch
                    │
                    └── Single source of truth
                              │
                              └── Changes captured as events
                                        │
                                        └── Events processed into index

Benefits:
├── Decoupled: App doesn't know about ES
├── Reliable: Events persisted in Kafka
├── Ordered: Kafka maintains order per partition
├── Scalable: Can replay events, add consumers
└── Observable: Monitor Kafka lag

Chapter 2: Change Data Capture with Debezium

2.1 How CDC Works

CDC ARCHITECTURE

┌─────────────────────────────────────────────────────────────────────────┐
│                    CHANGE DATA CAPTURE FLOW                             │
│                                                                         │
│  ┌──────────────┐                                                       │
│  │  Application │                                                       │
│  └──────┬───────┘                                                       │
│         │ INSERT/UPDATE/DELETE                                          │
│         ▼                                                               │
│  ┌──────────────┐     ┌──────────────┐                                  │
│  │  PostgreSQL  │────▶│  WAL (Write  │                                  │
│  │              │     │  Ahead Log)  │                                  │
│  └──────────────┘     └──────┬───────┘                                  │
│                              │                                          │
│                              │ Logical Replication                      │
│                              ▼                                          │
│                       ┌──────────────┐                                  │
│                       │   Debezium   │                                  │
│                       │  Connector   │                                  │
│                       └──────┬───────┘                                  │
│                              │                                          │
│                              │ CDC Events                               │
│                              ▼                                          │
│                       ┌──────────────┐                                  │
│                       │    Kafka     │                                  │
│                       │   Topics     │                                  │
│                       └──────┬───────┘                                  │
│                              │                                          │
│                              │ Consume                                  │
│                              ▼                                          │
│                       ┌──────────────┐                                  │
│                       │   Indexing   │                                  │
│                       │   Pipeline   │                                  │
│                       └──────┬───────┘                                  │
│                              │                                          │
│                              ▼                                          │
│                       ┌──────────────┐                                  │
│                       │Elasticsearch │                                  │
│                       └──────────────┘                                  │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

KEY INSIGHT:
CDC reads from database's transaction log (WAL)
├── Captures ALL changes (INSERT, UPDATE, DELETE)
├── Maintains transaction order
├── Doesn't impact application performance
└── Can replay from any point in time

2.2 Debezium Configuration

# config/debezium_connector.py

"""
Debezium PostgreSQL connector configuration.

Debezium reads PostgreSQL's Write-Ahead Log (WAL)
and publishes changes to Kafka topics.
"""

DEBEZIUM_CONNECTOR_CONFIG = {
    "name": "products-connector",
    "config": {
        # Connector class
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        
        # Database connection
        "database.hostname": "postgres.internal",
        "database.port": "5432",
        "database.user": "debezium",
        "database.password": "${secrets:debezium-password}",
        "database.dbname": "ecommerce",
        
        # Logical replication slot
        "slot.name": "debezium_products",
        "plugin.name": "pgoutput",  # PostgreSQL 10+ native
        
        # Tables to capture
        "table.include.list": "public.products,public.product_variants,public.product_images",
        
        # Topic configuration
        "topic.prefix": "ecommerce",
        # Results in topics like: ecommerce.public.products
        
        # Message transformations
        "transforms": "unwrap,extractKey",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractKey.field": "id",
        
        # Snapshot configuration
        "snapshot.mode": "initial",  # Full snapshot on first run
        
        # Error handling
        "errors.tolerance": "all",
        "errors.log.enable": "true",
        "errors.deadletterqueue.topic.name": "dlq.products",
        
        # Performance tuning
        "max.batch.size": 2048,
        "max.queue.size": 8192,
        "poll.interval.ms": 100,
    }
}


# PostgreSQL setup required for CDC
POSTGRESQL_CDC_SETUP = """
-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;

-- Create replication slot (if not using Debezium's auto-creation)
SELECT pg_create_logical_replication_slot('debezium_products', 'pgoutput');

-- Create publication for CDC
CREATE PUBLICATION debezium_publication FOR TABLE 
    products, 
    product_variants, 
    product_images;

-- Grant permissions to Debezium user
CREATE USER debezium WITH REPLICATION PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
"""

2.3 CDC Event Structure

# models/cdc_events.py

"""
CDC event structures from Debezium.

Debezium produces events with before/after state,
operation type, and metadata.
"""

from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, Any
from enum import Enum


class Operation(Enum):
    CREATE = "c"  # Insert
    UPDATE = "u"  # Update
    DELETE = "d"  # Delete
    READ = "r"    # Snapshot read


@dataclass
class CDCEvent:
    """
    Debezium CDC event.
    
    Example Kafka message:
    {
        "before": {"id": 123, "name": "Old Name", "price": 100},
        "after": {"id": 123, "name": "New Name", "price": 120},
        "source": {
            "version": "2.0.0",
            "connector": "postgresql",
            "name": "ecommerce",
            "ts_ms": 1672531200000,
            "db": "ecommerce",
            "schema": "public",
            "table": "products",
            "txId": 12345,
            "lsn": 123456789
        },
        "op": "u",
        "ts_ms": 1672531200100
    }
    """
    
    operation: Operation
    before: Optional[Dict[str, Any]]  # State before change (null for inserts)
    after: Optional[Dict[str, Any]]   # State after change (null for deletes)
    table: str
    timestamp: datetime
    transaction_id: int
    lsn: int  # Log Sequence Number (for ordering)
    
    @classmethod
    def from_kafka_message(cls, message: dict) -> "CDCEvent":
        """Parse Debezium Kafka message."""
        
        source = message.get("source", {})
        
        return cls(
            operation=Operation(message["op"]),
            before=message.get("before"),
            after=message.get("after"),
            table=source.get("table", "unknown"),
            timestamp=datetime.fromtimestamp(message["ts_ms"] / 1000),
            transaction_id=source.get("txId", 0),
            lsn=source.get("lsn", 0),
        )
    
    @property
    def is_delete(self) -> bool:
        return self.operation == Operation.DELETE
    
    @property
    def document_id(self) -> Optional[str]:
        """Get the document ID from before or after state."""
        state = self.after or self.before
        return str(state.get("id")) if state else None
    
    @property
    def current_state(self) -> Optional[Dict[str, Any]]:
        """Get current state (after for create/update, before for delete)."""
        return self.after if not self.is_delete else self.before

Part II: Indexing Pipeline Implementation

Chapter 3: Building the Pipeline

3.1 Core Pipeline Architecture

# pipeline/indexing_pipeline.py

"""
Indexing pipeline that consumes CDC events and updates Elasticsearch.

Key responsibilities:
1. Consume CDC events from Kafka
2. Transform to Elasticsearch documents
3. Batch updates for efficiency
4. Handle errors and retries
5. Track indexing progress
"""

import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from collections import defaultdict

logger = logging.getLogger(__name__)


@dataclass
class IndexingConfig:
    """Indexing pipeline configuration."""
    
    # Kafka settings
    kafka_brokers: str = "kafka:9092"
    consumer_group: str = "product-indexer"
    topics: List[str] = field(default_factory=lambda: ["ecommerce.public.products"])
    
    # Batching settings
    batch_size: int = 500  # Documents per bulk request
    batch_timeout_seconds: float = 5.0  # Max wait before flushing
    
    # Elasticsearch settings
    es_index: str = "products"
    es_timeout_seconds: int = 30
    
    # Error handling
    max_retries: int = 3
    retry_backoff_seconds: float = 1.0
    
    # Performance
    max_concurrent_batches: int = 3


@dataclass
class IndexingMetrics:
    """Metrics for monitoring indexing pipeline."""
    
    events_received: int = 0
    documents_indexed: int = 0
    documents_deleted: int = 0
    errors: int = 0
    last_event_timestamp: Optional[datetime] = None
    lag_seconds: float = 0.0
    
    def to_dict(self) -> dict:
        return {
            "events_received": self.events_received,
            "documents_indexed": self.documents_indexed,
            "documents_deleted": self.documents_deleted,
            "errors": self.errors,
            "lag_seconds": self.lag_seconds,
        }


class ProductIndexingPipeline:
    """
    Main indexing pipeline for products.
    
    Consumes CDC events and maintains Elasticsearch index.
    """
    
    def __init__(
        self,
        config: IndexingConfig,
        kafka_consumer,
        es_client,
        document_transformer,
    ):
        self.config = config
        self.kafka = kafka_consumer
        self.es = es_client
        self.transformer = document_transformer
        
        self.metrics = IndexingMetrics()
        self._batch: List[Dict[str, Any]] = []
        self._batch_lock = asyncio.Lock()
        self._last_flush = datetime.utcnow()
        self._running = False
    
    async def start(self):
        """Start the indexing pipeline."""
        
        logger.info("Starting indexing pipeline")
        self._running = True
        
        # Start background tasks
        await asyncio.gather(
            self._consume_events(),
            self._flush_periodically(),
            self._report_metrics(),
        )
    
    async def stop(self):
        """Stop the pipeline gracefully."""
        
        logger.info("Stopping indexing pipeline")
        self._running = False
        
        # Flush remaining batch
        await self._flush_batch()
    
    async def _consume_events(self):
        """Consume CDC events from Kafka."""
        
        async for message in self.kafka.consume(self.config.topics):
            if not self._running:
                break
            
            try:
                await self._process_message(message)
            except Exception as e:
                logger.error(f"Error processing message: {e}", exc_info=True)
                self.metrics.errors += 1
    
    async def _process_message(self, message):
        """Process a single CDC event."""
        
        # Parse CDC event
        event = CDCEvent.from_kafka_message(message.value)
        self.metrics.events_received += 1
        self.metrics.last_event_timestamp = event.timestamp
        
        # Calculate lag
        self.metrics.lag_seconds = (
            datetime.utcnow() - event.timestamp
        ).total_seconds()
        
        # Transform to index operation
        operation = await self._event_to_operation(event)
        
        if operation:
            async with self._batch_lock:
                self._batch.append(operation)
                
                # Flush if batch is full
                if len(self._batch) >= self.config.batch_size:
                    await self._flush_batch()
    
    async def _event_to_operation(self, event: CDCEvent) -> Optional[dict]:
        """Transform CDC event to Elasticsearch bulk operation."""
        
        if event.is_delete:
            return {
                "_op_type": "delete",
                "_index": self.config.es_index,
                "_id": event.document_id,
            }
        
        # Transform to search document
        document = await self.transformer.transform(event.after)
        
        if document:
            return {
                "_op_type": "index",
                "_index": self.config.es_index,
                "_id": event.document_id,
                "_source": document,
            }
        
        return None
    
    async def _flush_batch(self):
        """Flush accumulated batch to Elasticsearch."""
        
        async with self._batch_lock:
            if not self._batch:
                return
            
            batch = self._batch
            self._batch = []
            self._last_flush = datetime.utcnow()
        
        # Execute bulk request with retry
        success = await self._bulk_index_with_retry(batch)
        
        if success:
            # Update metrics
            for op in batch:
                if op["_op_type"] == "delete":
                    self.metrics.documents_deleted += 1
                else:
                    self.metrics.documents_indexed += 1
    
    async def _bulk_index_with_retry(self, operations: List[dict]) -> bool:
        """Execute bulk index with exponential backoff retry."""
        
        for attempt in range(self.config.max_retries):
            try:
                response = await self.es.bulk(
                    operations=operations,
                    timeout=f"{self.config.es_timeout_seconds}s"
                )
                
                # Check for errors
                if response.get("errors"):
                    failed = [
                        item for item in response["items"]
                        if "error" in item.get("index", item.get("delete", {}))
                    ]
                    logger.warning(f"Bulk request had {len(failed)} failures")
                    # Could implement partial retry here
                
                return True
                
            except Exception as e:
                logger.warning(
                    f"Bulk index attempt {attempt + 1} failed: {e}"
                )
                
                if attempt < self.config.max_retries - 1:
                    wait_time = self.config.retry_backoff_seconds * (2 ** attempt)
                    await asyncio.sleep(wait_time)
        
        logger.error(f"Bulk index failed after {self.config.max_retries} attempts")
        self.metrics.errors += len(operations)
        return False
    
    async def _flush_periodically(self):
        """Background task to flush batches on timeout."""
        
        while self._running:
            await asyncio.sleep(1)
            
            async with self._batch_lock:
                time_since_flush = (
                    datetime.utcnow() - self._last_flush
                ).total_seconds()
                
                if (self._batch and 
                    time_since_flush >= self.config.batch_timeout_seconds):
                    await self._flush_batch()
    
    async def _report_metrics(self):
        """Periodically report metrics."""
        
        while self._running:
            await asyncio.sleep(10)
            logger.info(f"Indexing metrics: {self.metrics.to_dict()}")


class DocumentTransformer:
    """
    Transforms database rows to Elasticsearch documents.
    
    Handles:
    - Field mapping
    - Data enrichment
    - Denormalization
    """
    
    def __init__(self, db_pool, cache):
        self.db = db_pool
        self.cache = cache
    
    async def transform(self, row: dict) -> Optional[dict]:
        """Transform database row to search document."""
        
        if not row:
            return None
        
        # Basic field mapping
        document = {
            "product_id": str(row["id"]),
            "name": row["name"],
            "description": row.get("description", ""),
            "brand": row.get("brand", ""),
            "category": row.get("category", ""),
            "price": float(row.get("price", 0)),
            "sale_price": float(row.get("sale_price", 0)) if row.get("sale_price") else None,
            "in_stock": row.get("stock_quantity", 0) > 0,
            "stock_quantity": row.get("stock_quantity", 0),
            "rating": float(row.get("rating", 0)),
            "review_count": row.get("review_count", 0),
            "created_at": row.get("created_at"),
            "updated_at": row.get("updated_at"),
        }
        
        # Enrich with related data (denormalization)
        document = await self._enrich_document(document, row)
        
        # Calculate derived fields
        document = self._calculate_derived_fields(document)
        
        return document
    
    async def _enrich_document(self, document: dict, row: dict) -> dict:
        """Enrich document with related data."""
        
        product_id = row["id"]
        
        # Get categories path (for hierarchical faceting)
        categories = await self._get_category_path(row.get("category_id"))
        if categories:
            document["categories"] = categories
        
        # Get product tags
        tags = await self._get_product_tags(product_id)
        if tags:
            document["tags"] = tags
        
        # Get variant information (colors, sizes)
        variants = await self._get_product_variants(product_id)
        if variants:
            document["colors"] = list(set(v["color"] for v in variants if v.get("color")))
            document["sizes"] = list(set(v["size"] for v in variants if v.get("size")))
        
        return document
    
    def _calculate_derived_fields(self, document: dict) -> dict:
        """Calculate derived/computed fields."""
        
        # Discount percentage
        if document.get("sale_price") and document["price"] > 0:
            discount = (document["price"] - document["sale_price"]) / document["price"]
            document["discount_percent"] = int(discount * 100)
        else:
            document["discount_percent"] = 0
        
        # Popularity score (for boosting)
        document["popularity_score"] = self._calculate_popularity(document)
        
        # Search boost field (combined text for searching)
        document["search_text"] = " ".join([
            document.get("name", ""),
            document.get("brand", ""),
            document.get("description", "")[:500],
            " ".join(document.get("tags", [])),
        ])
        
        return document
    
    def _calculate_popularity(self, document: dict) -> float:
        """Calculate popularity score for ranking."""
        
        # Weighted combination of signals
        rating_score = document.get("rating", 0) / 5.0  # Normalize to 0-1
        review_score = min(1.0, document.get("review_count", 0) / 1000)  # Cap at 1000
        
        # Recent products get a boost
        # (would need actual date comparison in production)
        recency_score = 0.5
        
        return (rating_score * 0.4) + (review_score * 0.4) + (recency_score * 0.2)
    
    async def _get_category_path(self, category_id: int) -> List[str]:
        """Get full category path for hierarchical faceting."""
        
        if not category_id:
            return []
        
        # Check cache first
        cache_key = f"category_path:{category_id}"
        cached = await self.cache.get(cache_key)
        if cached:
            return cached
        
        # Query database
        path = await self.db.fetchval("""
            WITH RECURSIVE cat_path AS (
                SELECT id, name, parent_id, ARRAY[name] as path
                FROM categories WHERE id = $1
                
                UNION ALL
                
                SELECT c.id, c.name, c.parent_id, c.name || cp.path
                FROM categories c
                JOIN cat_path cp ON c.id = cp.parent_id
            )
            SELECT path FROM cat_path WHERE parent_id IS NULL
        """, category_id)
        
        if path:
            await self.cache.set(cache_key, path, ttl=3600)
        
        return path or []
    
    async def _get_product_tags(self, product_id: int) -> List[str]:
        """Get product tags."""
        
        rows = await self.db.fetch("""
            SELECT t.name FROM tags t
            JOIN product_tags pt ON t.id = pt.tag_id
            WHERE pt.product_id = $1
        """, product_id)
        
        return [r["name"] for r in rows]
    
    async def _get_product_variants(self, product_id: int) -> List[dict]:
        """Get product variants (colors, sizes)."""
        
        rows = await self.db.fetch("""
            SELECT color, size, sku, stock_quantity
            FROM product_variants
            WHERE product_id = $1 AND stock_quantity > 0
        """, product_id)
        
        return [dict(r) for r in rows]

3.2 Handling Different Update Patterns

# pipeline/update_handlers.py

"""
Specialized handlers for different types of updates.

Some updates require different handling:
- Price changes: High priority, need fast indexing
- Stock changes: Critical for availability
- Description changes: Lower priority, can batch more
- Bulk imports: Need special handling to not overwhelm
"""

from enum import Enum
from dataclasses import dataclass
from typing import Optional


class UpdatePriority(Enum):
    CRITICAL = 1   # Stock out, price drops
    HIGH = 2       # General updates
    NORMAL = 3     # Description changes
    LOW = 4        # Bulk imports


@dataclass
class PrioritizedUpdate:
    """Update with priority for queue ordering."""
    
    product_id: str
    operation: str  # index, delete
    document: Optional[dict]
    priority: UpdatePriority
    timestamp: float


class UpdateClassifier:
    """
    Classifies updates by priority based on what changed.
    """
    
    def classify(self, event: CDCEvent) -> UpdatePriority:
        """Determine priority of update."""
        
        if event.is_delete:
            return UpdatePriority.HIGH
        
        before = event.before or {}
        after = event.after or {}
        
        # Stock went to zero - critical!
        if before.get("stock_quantity", 0) > 0 and after.get("stock_quantity", 0) == 0:
            return UpdatePriority.CRITICAL
        
        # Stock restored - critical!
        if before.get("stock_quantity", 0) == 0 and after.get("stock_quantity", 0) > 0:
            return UpdatePriority.CRITICAL
        
        # Price changed
        if before.get("price") != after.get("price"):
            return UpdatePriority.HIGH
        
        # Sale price changed
        if before.get("sale_price") != after.get("sale_price"):
            return UpdatePriority.HIGH
        
        # Only metadata changed (description, images, etc.)
        return UpdatePriority.NORMAL
    
    def is_significant_change(self, event: CDCEvent) -> bool:
        """Check if change affects search results."""
        
        if event.is_delete:
            return True
        
        before = event.before or {}
        after = event.after or {}
        
        # Fields that affect search
        significant_fields = {
            "name", "description", "brand", "category",
            "price", "sale_price", "stock_quantity",
            "rating", "review_count"
        }
        
        for field in significant_fields:
            if before.get(field) != after.get(field):
                return True
        
        return False


class PriorityIndexingPipeline(ProductIndexingPipeline):
    """
    Indexing pipeline with priority queues.
    
    Critical updates processed immediately.
    Lower priority updates batched more aggressively.
    """
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.classifier = UpdateClassifier()
        
        # Separate batches by priority
        self._priority_batches = {
            UpdatePriority.CRITICAL: [],
            UpdatePriority.HIGH: [],
            UpdatePriority.NORMAL: [],
            UpdatePriority.LOW: [],
        }
        
        # Different batch sizes per priority
        self._batch_sizes = {
            UpdatePriority.CRITICAL: 10,   # Small batches, fast flush
            UpdatePriority.HIGH: 100,
            UpdatePriority.NORMAL: 500,
            UpdatePriority.LOW: 1000,      # Large batches for bulk
        }
    
    async def _process_message(self, message):
        """Process with priority classification."""
        
        event = CDCEvent.from_kafka_message(message.value)
        self.metrics.events_received += 1
        
        # Skip non-significant changes
        if not self.classifier.is_significant_change(event):
            return
        
        # Classify priority
        priority = self.classifier.classify(event)
        
        # Transform and add to appropriate batch
        operation = await self._event_to_operation(event)
        
        if operation:
            async with self._batch_lock:
                self._priority_batches[priority].append(operation)
                
                # Check if this priority batch should flush
                if len(self._priority_batches[priority]) >= self._batch_sizes[priority]:
                    await self._flush_priority_batch(priority)
    
    async def _flush_priority_batch(self, priority: UpdatePriority):
        """Flush a specific priority batch."""
        
        batch = self._priority_batches[priority]
        if not batch:
            return
        
        self._priority_batches[priority] = []
        
        logger.info(f"Flushing {len(batch)} {priority.name} priority updates")
        await self._bulk_index_with_retry(batch)

Part III: Bulk Indexing Strategies

Chapter 4: Initial Load and Bulk Imports

4.1 The Bulk Import Challenge

BULK IMPORT SCENARIO

Situation:
├── 1 million new products from vendor import
├── Normal indexing: ~100 products/sec from CDC
├── Time to index: 1M / 100 = 10,000 seconds ≈ 3 hours
└── Users searching for products that don't exist yet!

Challenges:
├── Don't overwhelm Elasticsearch cluster
├── Don't starve normal CDC updates
├── Don't cause search latency spikes
└── Handle failures gracefully

4.2 Bulk Indexing Implementation

# pipeline/bulk_indexer.py

"""
Bulk indexing for initial loads and large imports.

Strategies:
1. Rate limiting to protect cluster
2. Parallel workers with coordination
3. Progress tracking and resumability
4. Separate from real-time pipeline
"""

import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, AsyncIterator
import hashlib

logger = logging.getLogger(__name__)


@dataclass
class BulkIndexJob:
    """Represents a bulk indexing job."""
    
    job_id: str
    source: str  # "postgres", "csv", "api"
    query: Optional[str]  # SQL query for postgres source
    total_documents: int
    indexed_documents: int = 0
    failed_documents: int = 0
    status: str = "pending"  # pending, running, completed, failed
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    last_processed_id: Optional[str] = None  # For resumability


class BulkIndexer:
    """
    Handles bulk indexing with rate limiting and progress tracking.
    """
    
    def __init__(
        self,
        es_client,
        db_pool,
        transformer,
        job_store,
        max_rate_per_second: int = 5000,
        batch_size: int = 1000,
        parallel_workers: int = 4,
    ):
        self.es = es_client
        self.db = db_pool
        self.transformer = transformer
        self.jobs = job_store
        
        self.max_rate = max_rate_per_second
        self.batch_size = batch_size
        self.parallel_workers = parallel_workers
        
        # Rate limiter
        self._rate_limiter = asyncio.Semaphore(parallel_workers)
        self._tokens = max_rate_per_second
        self._last_refill = datetime.utcnow()
    
    async def create_job(
        self,
        source: str,
        query: str = None,
        total: int = None
    ) -> BulkIndexJob:
        """Create a new bulk indexing job."""
        
        job_id = hashlib.md5(
            f"{source}:{query}:{datetime.utcnow().isoformat()}".encode()
        ).hexdigest()[:12]
        
        # Count total if not provided
        if total is None and query:
            count_query = f"SELECT COUNT(*) FROM ({query}) AS subq"
            total = await self.db.fetchval(count_query)
        
        job = BulkIndexJob(
            job_id=job_id,
            source=source,
            query=query,
            total_documents=total or 0
        )
        
        await self.jobs.save(job)
        return job
    
    async def run_job(self, job: BulkIndexJob):
        """Execute a bulk indexing job."""
        
        job.status = "running"
        job.started_at = datetime.utcnow()
        await self.jobs.save(job)
        
        logger.info(f"Starting bulk index job {job.job_id}: {job.total_documents} documents")
        
        try:
            # Stream documents and index in batches
            batch = []
            
            async for document in self._stream_documents(job):
                batch.append(document)
                
                if len(batch) >= self.batch_size:
                    await self._index_batch_with_rate_limit(batch)
                    job.indexed_documents += len(batch)
                    job.last_processed_id = batch[-1].get("product_id")
                    await self.jobs.save(job)
                    batch = []
                    
                    # Log progress
                    progress = (job.indexed_documents / job.total_documents) * 100
                    if job.indexed_documents % 10000 == 0:
                        logger.info(
                            f"Job {job.job_id}: {job.indexed_documents}/{job.total_documents} "
                            f"({progress:.1f}%)"
                        )
            
            # Final batch
            if batch:
                await self._index_batch_with_rate_limit(batch)
                job.indexed_documents += len(batch)
            
            job.status = "completed"
            job.completed_at = datetime.utcnow()
            
            duration = (job.completed_at - job.started_at).total_seconds()
            rate = job.indexed_documents / duration
            
            logger.info(
                f"Job {job.job_id} completed: {job.indexed_documents} documents "
                f"in {duration:.1f}s ({rate:.0f} docs/sec)"
            )
            
        except Exception as e:
            logger.error(f"Job {job.job_id} failed: {e}", exc_info=True)
            job.status = "failed"
            raise
        
        finally:
            await self.jobs.save(job)
    
    async def resume_job(self, job: BulkIndexJob):
        """Resume a failed or interrupted job."""
        
        if job.status not in ("failed", "running"):
            raise ValueError(f"Cannot resume job with status {job.status}")
        
        logger.info(
            f"Resuming job {job.job_id} from {job.last_processed_id}, "
            f"{job.indexed_documents}/{job.total_documents} done"
        )
        
        await self.run_job(job)
    
    async def _stream_documents(
        self,
        job: BulkIndexJob
    ) -> AsyncIterator[dict]:
        """Stream documents from source."""
        
        if job.source == "postgres":
            async for doc in self._stream_from_postgres(job):
                yield doc
        elif job.source == "csv":
            async for doc in self._stream_from_csv(job):
                yield doc
        else:
            raise ValueError(f"Unknown source: {job.source}")
    
    async def _stream_from_postgres(
        self,
        job: BulkIndexJob
    ) -> AsyncIterator[dict]:
        """Stream documents from PostgreSQL using cursor."""
        
        query = job.query
        
        # Add resume condition if needed
        if job.last_processed_id:
            # Assume query is ordered by id
            query = f"""
                SELECT * FROM ({query}) AS subq 
                WHERE id > {job.last_processed_id}
                ORDER BY id
            """
        
        # Use server-side cursor for memory efficiency
        async with self.db.transaction():
            async for row in self.db.cursor(query, prefetch=self.batch_size):
                document = await self.transformer.transform(dict(row))
                if document:
                    yield document
    
    async def _index_batch_with_rate_limit(self, batch: list):
        """Index batch with rate limiting to protect cluster."""
        
        # Acquire rate limit tokens
        await self._acquire_tokens(len(batch))
        
        # Index batch
        async with self._rate_limiter:
            operations = [
                {
                    "_op_type": "index",
                    "_index": "products",
                    "_id": doc["product_id"],
                    "_source": doc
                }
                for doc in batch
            ]
            
            response = await self.es.bulk(operations=operations)
            
            if response.get("errors"):
                error_count = sum(
                    1 for item in response["items"]
                    if "error" in item.get("index", {})
                )
                logger.warning(f"Batch had {error_count} errors")
    
    async def _acquire_tokens(self, count: int):
        """Token bucket rate limiting."""
        
        while True:
            now = datetime.utcnow()
            elapsed = (now - self._last_refill).total_seconds()
            
            # Refill tokens
            self._tokens = min(
                self.max_rate,
                self._tokens + elapsed * self.max_rate
            )
            self._last_refill = now
            
            if self._tokens >= count:
                self._tokens -= count
                return
            
            # Wait for more tokens
            wait_time = (count - self._tokens) / self.max_rate
            await asyncio.sleep(wait_time)


# =============================================================================
# Bulk Index API
# =============================================================================

async def reindex_all_products(bulk_indexer: BulkIndexer):
    """Full reindex of all products."""
    
    job = await bulk_indexer.create_job(
        source="postgres",
        query="""
            SELECT p.*, 
                   array_agg(DISTINCT pv.color) as colors,
                   array_agg(DISTINCT pv.size) as sizes
            FROM products p
            LEFT JOIN product_variants pv ON p.id = pv.product_id
            WHERE p.deleted_at IS NULL
            GROUP BY p.id
            ORDER BY p.id
        """
    )
    
    await bulk_indexer.run_job(job)
    return job


async def index_vendor_import(
    bulk_indexer: BulkIndexer,
    vendor_id: str,
    import_batch_id: str
):
    """Index products from a vendor import."""
    
    job = await bulk_indexer.create_job(
        source="postgres",
        query=f"""
            SELECT * FROM products
            WHERE vendor_id = '{vendor_id}'
              AND import_batch_id = '{import_batch_id}'
            ORDER BY id
        """
    )
    
    await bulk_indexer.run_job(job)
    return job

Part IV: Zero-Downtime Reindexing

Chapter 5: The Reindexing Problem

5.1 Why Reindex?

SCENARIOS REQUIRING REINDEX

1. MAPPING CHANGES
   ├── Adding new fields
   ├── Changing field types
   └── Modifying analyzers

2. ANALYSIS CHANGES
   ├── Adding new synonyms
   ├── Changing tokenizer
   └── Adding new filters

3. SHARD RESTRUCTURING
   ├── Increasing shard count for growth
   └── Rebalancing data

4. DATA FIXES
   ├── Fixing corrupted documents
   └── Re-enriching with new logic

PROBLEM:
You can't modify mappings of existing fields.
You must create a new index and migrate data.
But search must keep working during migration!

5.2 Index Alias Pattern

INDEX ALIAS PATTERN

Instead of:
  Application → "products" index

Use:
  Application → "products" alias → "products_v1" index

Benefits:
├── Alias is a pointer, can be changed atomically
├── Can point to multiple indexes (for migration)
├── Application doesn't know actual index name
└── Zero-downtime switches

Migration flow:
┌────────────────────────────────────────────────────────────────────────┐
│                                                                        │
│  BEFORE:                                                               │
│    "products" alias → "products_v1" (current)                          │
│                                                                        │
│  DURING MIGRATION:                                                     │
│    "products" alias → "products_v1" (current, still serving reads)     │
│    "products_write" alias → "products_v2" (new, receiving updates)     │
│                                                                        │
│  AFTER SWITCH:                                                         │
│    "products" alias → "products_v2" (new)                              │
│    "products_v1" → (deleted after verification)                        │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

5.3 Zero-Downtime Reindex Implementation

# pipeline/reindex.py

"""
Zero-downtime reindexing with index aliases.

Process:
1. Create new index with updated mapping
2. Set up dual-write (CDC writes to both)
3. Bulk copy existing data
4. Verify counts match
5. Atomic alias switch
6. Clean up old index
"""

import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

logger = logging.getLogger(__name__)


@dataclass
class ReindexPlan:
    """Plan for reindex operation."""
    
    plan_id: str
    old_index: str
    new_index: str
    alias: str
    new_mapping: dict
    new_settings: dict
    status: str = "planned"
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    
    # Progress tracking
    documents_copied: int = 0
    documents_total: int = 0


class ZeroDowntimeReindexer:
    """
    Handles zero-downtime reindexing.
    """
    
    def __init__(
        self,
        es_client,
        indexing_pipeline,
        bulk_indexer,
    ):
        self.es = es_client
        self.pipeline = indexing_pipeline
        self.bulk = bulk_indexer
    
    async def create_plan(
        self,
        alias: str,
        new_mapping: dict,
        new_settings: dict = None
    ) -> ReindexPlan:
        """Create a reindex plan."""
        
        # Get current index behind alias
        alias_info = await self.es.indices.get_alias(name=alias)
        old_index = list(alias_info.keys())[0]
        
        # Generate new index name
        version = int(old_index.split("_v")[-1]) if "_v" in old_index else 1
        new_index = f"{alias}_v{version + 1}"
        
        # Get document count
        count = await self.es.count(index=old_index)
        
        return ReindexPlan(
            plan_id=f"reindex_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}",
            old_index=old_index,
            new_index=new_index,
            alias=alias,
            new_mapping=new_mapping,
            new_settings=new_settings or {},
            documents_total=count["count"]
        )
    
    async def execute(self, plan: ReindexPlan) -> ReindexPlan:
        """
        Execute reindex plan with zero downtime.
        """
        
        plan.status = "running"
        plan.started_at = datetime.utcnow()
        
        try:
            # Step 1: Create new index
            logger.info(f"Creating new index: {plan.new_index}")
            await self._create_new_index(plan)
            
            # Step 2: Set up dual write
            logger.info("Setting up dual write")
            await self._setup_dual_write(plan)
            
            # Step 3: Copy existing data
            logger.info(f"Copying {plan.documents_total} documents")
            await self._copy_data(plan)
            
            # Step 4: Verify counts
            logger.info("Verifying document counts")
            await self._verify_counts(plan)
            
            # Step 5: Atomic alias switch
            logger.info("Switching alias")
            await self._switch_alias(plan)
            
            # Step 6: Clean up
            logger.info("Cleaning up old index")
            await self._cleanup(plan)
            
            plan.status = "completed"
            plan.completed_at = datetime.utcnow()
            
            duration = (plan.completed_at - plan.started_at).total_seconds()
            logger.info(
                f"Reindex completed in {duration:.1f}s: "
                f"{plan.documents_copied} documents migrated"
            )
            
        except Exception as e:
            plan.status = "failed"
            logger.error(f"Reindex failed: {e}", exc_info=True)
            
            # Attempt rollback
            await self._rollback(plan)
            raise
        
        return plan
    
    async def _create_new_index(self, plan: ReindexPlan):
        """Create new index with updated mapping."""
        
        body = {
            "mappings": plan.new_mapping,
            "settings": plan.new_settings
        }
        
        await self.es.indices.create(index=plan.new_index, body=body)
    
    async def _setup_dual_write(self, plan: ReindexPlan):
        """Configure pipeline to write to both indexes."""
        
        # Add write alias pointing to new index
        await self.es.indices.put_alias(
            index=plan.new_index,
            name=f"{plan.alias}_write"
        )
        
        # Configure indexing pipeline to use write alias
        # This ensures new updates go to both old and new index
        self.pipeline.set_write_indexes([plan.old_index, plan.new_index])
    
    async def _copy_data(self, plan: ReindexPlan):
        """Copy data from old index to new index."""
        
        # Use Elasticsearch's reindex API with scroll
        body = {
            "source": {
                "index": plan.old_index,
                "size": 1000  # Batch size
            },
            "dest": {
                "index": plan.new_index
            }
        }
        
        # Start reindex task
        task = await self.es.reindex(
            body=body,
            wait_for_completion=False,
            refresh=True
        )
        
        task_id = task["task"]
        
        # Monitor progress
        while True:
            status = await self.es.tasks.get(task_id=task_id)
            
            if status["completed"]:
                plan.documents_copied = status["task"]["status"]["created"]
                break
            
            plan.documents_copied = status["task"]["status"].get("created", 0)
            progress = (plan.documents_copied / plan.documents_total) * 100
            logger.info(f"Reindex progress: {progress:.1f}%")
            
            await asyncio.sleep(5)
    
    async def _verify_counts(self, plan: ReindexPlan):
        """Verify new index has all documents."""
        
        # Refresh to ensure all documents are searchable
        await self.es.indices.refresh(index=plan.new_index)
        
        old_count = (await self.es.count(index=plan.old_index))["count"]
        new_count = (await self.es.count(index=plan.new_index))["count"]
        
        # Allow small difference due to concurrent writes
        diff_pct = abs(old_count - new_count) / old_count * 100
        
        if diff_pct > 0.1:  # More than 0.1% difference
            raise ValueError(
                f"Count mismatch: old={old_count}, new={new_count} "
                f"({diff_pct:.2f}% difference)"
            )
        
        logger.info(f"Count verified: old={old_count}, new={new_count}")
    
    async def _switch_alias(self, plan: ReindexPlan):
        """Atomic alias switch."""
        
        # Atomic operation: remove from old, add to new
        await self.es.indices.update_aliases(body={
            "actions": [
                {
                    "remove": {
                        "index": plan.old_index,
                        "alias": plan.alias
                    }
                },
                {
                    "add": {
                        "index": plan.new_index,
                        "alias": plan.alias
                    }
                }
            ]
        })
        
        # Remove write alias
        await self.es.indices.delete_alias(
            index=plan.new_index,
            name=f"{plan.alias}_write"
        )
        
        # Update pipeline to write only to new index
        self.pipeline.set_write_indexes([plan.new_index])
    
    async def _cleanup(self, plan: ReindexPlan):
        """Clean up old index after successful migration."""
        
        # Wait a bit to ensure no in-flight requests
        await asyncio.sleep(30)
        
        # Delete old index
        await self.es.indices.delete(index=plan.old_index)
        
        logger.info(f"Deleted old index: {plan.old_index}")
    
    async def _rollback(self, plan: ReindexPlan):
        """Rollback on failure."""
        
        logger.warning("Rolling back reindex")
        
        try:
            # Remove write alias if it exists
            try:
                await self.es.indices.delete_alias(
                    index=plan.new_index,
                    name=f"{plan.alias}_write"
                )
            except:
                pass
            
            # Delete new index
            if await self.es.indices.exists(index=plan.new_index):
                await self.es.indices.delete(index=plan.new_index)
            
            # Reset pipeline to only write to old index
            self.pipeline.set_write_indexes([plan.old_index])
            
            logger.info("Rollback completed")
            
        except Exception as e:
            logger.error(f"Rollback failed: {e}")


# =============================================================================
# Usage Example
# =============================================================================

async def add_new_field_to_products():
    """Example: Adding a new 'brand_id' field to products."""
    
    # New mapping with additional field
    new_mapping = {
        "properties": {
            # ... existing fields ...
            "brand_id": {
                "type": "keyword"
            },
            "brand_popularity": {
                "type": "float"
            }
        }
    }
    
    reindexer = ZeroDowntimeReindexer(es, pipeline, bulk_indexer)
    
    # Create plan
    plan = await reindexer.create_plan(
        alias="products",
        new_mapping=new_mapping
    )
    
    print(f"Reindex plan: {plan.old_index} → {plan.new_index}")
    print(f"Documents to migrate: {plan.documents_total}")
    
    # Execute
    await reindexer.execute(plan)

Part V: Monitoring and Troubleshooting

Chapter 6: Indexing Observability

6.1 Key Metrics

# monitoring/indexing_metrics.py

"""
Metrics for monitoring indexing pipeline health.
"""

from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List


@dataclass
class IndexingHealthMetrics:
    """Core metrics for indexing health."""
    
    # Lag metrics
    kafka_consumer_lag: int  # Messages behind
    indexing_lag_seconds: float  # Time behind real-time
    
    # Throughput metrics
    events_per_second: float
    documents_per_second: float
    
    # Error metrics
    error_rate: float
    dlq_size: int
    
    # Resource metrics
    batch_queue_depth: int
    es_bulk_queue_size: int
    
    def is_healthy(self) -> bool:
        """Check if indexing is healthy."""
        return (
            self.indexing_lag_seconds < 60 and  # Less than 1 minute behind
            self.error_rate < 0.01 and           # Less than 1% errors
            self.dlq_size < 1000                  # DLQ not growing
        )


class IndexingMonitor:
    """
    Monitors indexing pipeline health.
    """
    
    def __init__(
        self,
        kafka_admin,
        es_client,
        pipeline,
        alerter
    ):
        self.kafka = kafka_admin
        self.es = es_client
        self.pipeline = pipeline
        self.alerter = alerter
        
        # Alert thresholds
        self.lag_warning_seconds = 60
        self.lag_critical_seconds = 300
        self.error_rate_warning = 0.01
        self.error_rate_critical = 0.05
    
    async def collect_metrics(self) -> IndexingHealthMetrics:
        """Collect current metrics."""
        
        # Get Kafka consumer lag
        lag = await self._get_consumer_lag()
        
        # Get pipeline metrics
        pipeline_metrics = self.pipeline.metrics
        
        # Calculate rates
        events_per_second = self._calculate_rate(
            pipeline_metrics.events_received
        )
        
        # Get ES metrics
        es_stats = await self.es.nodes.stats(metric="thread_pool")
        bulk_queue = es_stats["nodes"][list(es_stats["nodes"].keys())[0]]["thread_pool"]["bulk"]["queue"]
        
        return IndexingHealthMetrics(
            kafka_consumer_lag=lag,
            indexing_lag_seconds=pipeline_metrics.lag_seconds,
            events_per_second=events_per_second,
            documents_per_second=self._calculate_rate(
                pipeline_metrics.documents_indexed
            ),
            error_rate=pipeline_metrics.errors / max(1, pipeline_metrics.events_received),
            dlq_size=await self._get_dlq_size(),
            batch_queue_depth=len(self.pipeline._batch),
            es_bulk_queue_size=bulk_queue,
        )
    
    async def check_health(self):
        """Check health and alert if needed."""
        
        metrics = await self.collect_metrics()
        
        # Check lag
        if metrics.indexing_lag_seconds > self.lag_critical_seconds:
            await self.alerter.critical(
                "Indexing lag critical",
                f"Lag: {metrics.indexing_lag_seconds:.0f}s"
            )
        elif metrics.indexing_lag_seconds > self.lag_warning_seconds:
            await self.alerter.warning(
                "Indexing lag warning",
                f"Lag: {metrics.indexing_lag_seconds:.0f}s"
            )
        
        # Check error rate
        if metrics.error_rate > self.error_rate_critical:
            await self.alerter.critical(
                "Indexing error rate critical",
                f"Error rate: {metrics.error_rate:.2%}"
            )
        elif metrics.error_rate > self.error_rate_warning:
            await self.alerter.warning(
                "Indexing error rate elevated",
                f"Error rate: {metrics.error_rate:.2%}"
            )
        
        return metrics
    
    async def _get_consumer_lag(self) -> int:
        """Get Kafka consumer lag."""
        
        consumer_group = self.pipeline.config.consumer_group
        
        # Get consumer group offsets
        offsets = await self.kafka.list_consumer_group_offsets(consumer_group)
        
        total_lag = 0
        for tp, offset_meta in offsets.items():
            # Get end offset for partition
            end_offsets = await self.kafka.list_offsets({tp: -1})
            end_offset = end_offsets[tp].offset
            
            lag = end_offset - offset_meta.offset
            total_lag += lag
        
        return total_lag
    
    async def _get_dlq_size(self) -> int:
        """Get dead letter queue size."""
        
        dlq_topic = "dlq.products"
        
        offsets = await self.kafka.list_offsets({
            (dlq_topic, 0): -1  # Get end offset
        })
        
        return offsets[(dlq_topic, 0)].offset


# =============================================================================
# Dashboard Metrics
# =============================================================================

GRAFANA_DASHBOARD = """
INDEXING PIPELINE DASHBOARD

┌────────────────────────────────────────────────────────────────────────┐
│                    INDEXING HEALTH                                     │
│                                                                        │
│  LATENCY                                                               │
│  ├── Indexing lag:        [████░░░░░░] 45s                             │
│  ├── Kafka consumer lag:  [██░░░░░░░░] 1,234 messages                  │
│  └── ES bulk latency:     [███░░░░░░░] 120ms p99                       │
│                                                                        │
│  THROUGHPUT                                                            │
│  ├── CDC events/sec:      [████████░░] 850/sec                         │
│  ├── Docs indexed/sec:    [████████░░] 820/sec                         │
│  └── Bulk requests/sec:   [██████░░░░] 12/sec                          │
│                                                                        │
│  ERRORS                                                                │
│  ├── Error rate:          [█░░░░░░░░░] 0.2%                            │
│  ├── DLQ size:            47 messages                                  │
│  └── ES rejections:       0/sec                                        │
│                                                                        │
│  RESOURCES                                                             │
│  ├── Pipeline batch:      [████░░░░░░] 234 pending                     │
│  ├── ES bulk queue:       [██░░░░░░░░] 45 pending                      │
│  └── Memory usage:        [██████░░░░] 2.1 GB                          │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘
"""

6.2 Troubleshooting Common Issues

# troubleshooting/runbooks.py

"""
Runbooks for common indexing issues.
"""

RUNBOOKS = {
    "high_indexing_lag": """
RUNBOOK: High Indexing Lag

SYMPTOMS:
- Indexing lag > 60 seconds
- Search results stale
- Kafka consumer lag growing

DIAGNOSIS:
1. Check Elasticsearch cluster health:
   GET /_cluster/health
   
2. Check for bulk rejections:
   GET /_nodes/stats/thread_pool/bulk
   
3. Check indexing pipeline logs:
   kubectl logs -l app=indexing-pipeline
   
4. Check Kafka consumer lag:
   kafka-consumer-groups --describe --group product-indexer

COMMON CAUSES:

1. Elasticsearch overloaded:
   - Check cluster CPU/memory
   - Check bulk queue size
   - Solution: Scale data nodes or reduce indexing rate

2. Slow document transformation:
   - Check enrichment query latency
   - Solution: Add caching, optimize queries

3. Network issues:
   - Check ES connection errors
   - Solution: Check network, increase timeouts

4. Kafka partition imbalance:
   - Check partition assignment
   - Solution: Rebalance consumers

RESOLUTION:
1. If ES overloaded: Reduce batch size, add rate limiting
2. If transformation slow: Enable caching, skip enrichment temporarily
3. If Kafka issues: Restart consumers, check broker health
    """,
    
    "indexing_errors": """
RUNBOOK: High Indexing Error Rate

SYMPTOMS:
- Error rate > 1%
- DLQ growing
- Documents missing from search

DIAGNOSIS:
1. Check error types in logs:
   grep -i error indexing-pipeline.log | tail -100
   
2. Sample DLQ messages:
   kafka-console-consumer --topic dlq.products --from-beginning --max-messages 10
   
3. Check ES mapping conflicts:
   GET /products/_mapping

COMMON CAUSES:

1. Mapping conflicts:
   - New field has wrong type
   - Solution: Fix source data or update mapping

2. Document too large:
   - Document exceeds http.max_content_length
   - Solution: Truncate fields, increase limit

3. Invalid data:
   - Null values in required fields
   - Solution: Add validation in transformer

RESOLUTION:
1. Fix root cause
2. Replay DLQ messages:
   kafka-console-consumer --topic dlq.products | 
   kafka-console-producer --topic ecommerce.public.products
    """,
    
    "stale_data": """
RUNBOOK: Stale Data in Search Results

SYMPTOMS:
- Recently updated products show old data
- Customer complaints about wrong prices

DIAGNOSIS:
1. Check indexing lag:
   GET /metrics/indexing_lag
   
2. Verify document in ES:
   GET /products/_doc/{product_id}
   
3. Check CDC events:
   kafka-console-consumer --topic ecommerce.public.products 
   --partition 0 --offset latest --max-messages 10

4. Check PostgreSQL for recent changes:
   SELECT * FROM products WHERE id = {id}

COMMON CAUSES:

1. Indexing lag:
   - Pipeline behind
   - Solution: See "high_indexing_lag" runbook

2. CDC not capturing changes:
   - Replication slot behind
   - Solution: Check Debezium connector status

3. Cache serving stale data:
   - Search result cache not invalidated
   - Solution: Reduce cache TTL, flush cache

RESOLUTION:
1. For individual document: Force reindex
   POST /products/_update/{product_id}
   
2. For many documents: Trigger bulk reindex
    """
}

Part VI: Real-World Patterns

Chapter 7: Production Patterns

7.1 Multi-Table CDC

# pipeline/multi_table_cdc.py

"""
Handling CDC from multiple related tables.

Challenge:
- products, product_variants, product_images are separate tables
- Change to any table should update the search document
- Need to denormalize into single document
"""

class MultiTableIndexer:
    """
    Handles CDC events from multiple tables.
    
    When any related table changes, re-indexes the parent product.
    """
    
    def __init__(self, es_client, db_pool, transformer):
        self.es = es_client
        self.db = db_pool
        self.transformer = transformer
        
        # Map table to parent relationship
        self.table_relationships = {
            "products": {"type": "parent", "id_field": "id"},
            "product_variants": {"type": "child", "parent_field": "product_id"},
            "product_images": {"type": "child", "parent_field": "product_id"},
            "product_tags": {"type": "child", "parent_field": "product_id"},
        }
    
    async def process_event(self, event: CDCEvent):
        """Process CDC event and determine what to reindex."""
        
        table = event.table
        relationship = self.table_relationships.get(table)
        
        if not relationship:
            return  # Unknown table
        
        # Determine product ID to reindex
        if relationship["type"] == "parent":
            product_id = event.document_id
        else:
            # Get parent ID from child record
            state = event.after or event.before
            product_id = str(state.get(relationship["parent_field"]))
        
        if not product_id:
            return
        
        # Fetch and reindex the complete product
        await self._reindex_product(product_id)
    
    async def _reindex_product(self, product_id: str):
        """Fetch full product data and reindex."""
        
        # Fetch complete product with all related data
        row = await self.db.fetchrow("""
            SELECT p.*,
                   COALESCE(json_agg(DISTINCT pv.*) FILTER (WHERE pv.id IS NOT NULL), '[]') as variants,
                   COALESCE(json_agg(DISTINCT pi.*) FILTER (WHERE pi.id IS NOT NULL), '[]') as images,
                   COALESCE(array_agg(DISTINCT t.name) FILTER (WHERE t.id IS NOT NULL), '{}') as tags
            FROM products p
            LEFT JOIN product_variants pv ON p.id = pv.product_id
            LEFT JOIN product_images pi ON p.id = pi.product_id
            LEFT JOIN product_tags pt ON p.id = pt.product_id
            LEFT JOIN tags t ON pt.tag_id = t.id
            WHERE p.id = $1
            GROUP BY p.id
        """, int(product_id))
        
        if not row:
            # Product deleted - remove from index
            await self.es.delete(index="products", id=product_id, ignore=[404])
            return
        
        # Transform and index
        document = await self.transformer.transform(dict(row))
        
        await self.es.index(
            index="products",
            id=product_id,
            document=document
        )

7.2 Handling Deletes Correctly

# pipeline/delete_handler.py

"""
Proper handling of deleted documents.

Challenges:
- Soft deletes vs hard deletes
- Cascading deletes
- Tombstone events
"""

class DeleteHandler:
    """
    Handles document deletion in search index.
    """
    
    def __init__(self, es_client, config):
        self.es = es_client
        
        # Soft delete: Keep in index but mark as deleted
        # Hard delete: Remove from index
        self.soft_delete_enabled = config.get("soft_delete", False)
    
    async def handle_delete(self, event: CDCEvent):
        """Process delete event."""
        
        product_id = event.document_id
        
        if self.soft_delete_enabled:
            await self._soft_delete(product_id)
        else:
            await self._hard_delete(product_id)
    
    async def _hard_delete(self, product_id: str):
        """Remove document from index."""
        
        await self.es.delete(
            index="products",
            id=product_id,
            ignore=[404]  # Ignore if already deleted
        )
    
    async def _soft_delete(self, product_id: str):
        """Mark document as deleted but keep in index."""
        
        # Update document to mark as deleted
        await self.es.update(
            index="products",
            id=product_id,
            body={
                "doc": {
                    "is_deleted": True,
                    "deleted_at": datetime.utcnow().isoformat(),
                    "in_stock": False,  # Also mark out of stock
                }
            },
            ignore=[404]
        )
    
    async def purge_deleted(self, older_than_days: int = 30):
        """Permanently remove soft-deleted documents."""
        
        await self.es.delete_by_query(
            index="products",
            body={
                "query": {
                    "bool": {
                        "must": [
                            {"term": {"is_deleted": True}},
                            {
                                "range": {
                                    "deleted_at": {
                                        "lt": f"now-{older_than_days}d"
                                    }
                                }
                            }
                        ]
                    }
                }
            }
        )

Summary

What We Learned Today

DAY 2 SUMMARY: INDEXING PIPELINE

THE PROBLEM
├── Dual-write is dangerous (inconsistency, failures)
├── Search index can get out of sync
├── Need reliable, ordered updates
└── Must handle bulk imports too

CHANGE DATA CAPTURE
├── Read from database WAL (transaction log)
├── Debezium connector for PostgreSQL
├── Publish changes to Kafka
├── Maintains order, handles failures
└── Decouples app from search

INDEXING PIPELINE
├── Consume CDC events from Kafka
├── Transform to search documents
├── Batch for efficiency
├── Handle errors with DLQ
└── Monitor lag and health

BULK INDEXING
├── Rate limiting to protect cluster
├── Progress tracking
├── Resumability
├── Separate from real-time pipeline

ZERO-DOWNTIME REINDEX
├── Index alias pattern
├── Create new index
├── Dual write during migration
├── Atomic alias switch
└── Clean up old index

MONITORING
├── Indexing lag (seconds behind)
├── Consumer lag (messages behind)
├── Error rate
├── DLQ size
└── ES bulk queue

Key Takeaways

INDEXING PIPELINE KEY TAKEAWAYS

1. NEVER DUAL-WRITE
   Use CDC instead — it's safer and more reliable

2. KAFKA PROVIDES DURABILITY
   Events persisted, can replay, maintains order

3. BATCH FOR EFFICIENCY
   Don't index one at a time — use bulk API

4. MONITOR LAG
   Know how far behind you are at all times

5. PLAN FOR REINDEX
   Use aliases from day one — you WILL need to reindex

DEFAULT PATTERN:
PostgreSQL → Debezium → Kafka → Indexing Pipeline → Elasticsearch

Interview Tip

WHEN ASKED "HOW DO YOU KEEP SEARCH IN SYNC?"

"I would use Change Data Capture rather than dual-write.
Dual-write has consistency issues — if either write fails,
data gets out of sync, and ordering is hard to guarantee.

With CDC, we read from the database's transaction log,
publish changes to Kafka, and have a separate indexing
pipeline consume and update Elasticsearch.

Benefits:
- Single source of truth (PostgreSQL)
- Events persisted and ordered (Kafka)
- Decoupled and scalable
- Can replay from any point

For bulk imports, I'd use a separate rate-limited process
to avoid overwhelming the cluster. And I'd always use
index aliases so we can do zero-downtime reindexing."

This shows you understand production realities.

Tomorrow's Preview

Day 3: Query Processing & Relevance"Finding the needle and ranking the haystack"

We'll cover:

  • How BM25 scoring works
  • Query vs Filter performance
  • Boosting strategies for business rules
  • Function score queries
  • Handling "no results" gracefully
PREVIEW: THE RELEVANCE PROBLEM

User searches: "apple"

Results returned:
1. Apple iPhone 14 Pro (Score: 8.5)
2. Fresh Red Apples (Score: 8.3)
3. Apple Watch Series 8 (Score: 8.1)
4. Apple Pie Recipe Book (Score: 7.9)

Is this the right order?
How do we know?
How do we tune it?

End of Week 7, Day 2

Tomorrow: Day 3 — Query Processing & Relevance: Finding the needle and ranking the haystack