Day 02
Week 7 — Day 2: Indexing Pipeline
System Design Mastery Series — Building Blocks Week
Introduction
Yesterday we learned how search engines think — inverted indexes, text analysis, and document modeling. But we left a critical question unanswered:
THE FRESHNESS PROBLEM
PostgreSQL (Source of Truth):
Product "Nike Air Max" price: $150
Updated at: 10:00:00
Elasticsearch (Search Index):
Product "Nike Air Max" price: $120 (stale!)
Last indexed: 09:00:00
Customer searches at 10:00:01:
Sees price: $120
Adds to cart
Checkout shows: $150
Customer: "This is a scam!"
Today's Theme: "Keeping search fresh without killing the database"
We'll cover:
- Change Data Capture (CDC) fundamentals
- Debezium for PostgreSQL → Kafka streaming
- Streaming vs batch indexing trade-offs
- Bulk indexing without overwhelming Elasticsearch
- Zero-downtime reindexing strategies
- Handling indexing failures and lag
Part I: The Indexing Challenge
Chapter 1: Why Indexing Is Hard
1.1 The Naive Approach (Don't Do This)
# BAD: Synchronous dual-write
async def update_product(product_id: str, data: dict):
# Update PostgreSQL
await db.execute(
"UPDATE products SET ... WHERE id = $1",
product_id
)
# Update Elasticsearch
await es.index(
index="products",
id=product_id,
document=data
)
Problems with dual-write:
DUAL-WRITE FAILURES
Scenario 1: ES write fails
├── PostgreSQL: Updated ✓
├── Elasticsearch: Failed ✗
└── Result: Data inconsistent, search shows old data
Scenario 2: Process crashes between writes
├── PostgreSQL: Updated ✓
├── Elasticsearch: Never reached
└── Result: Data inconsistent
Scenario 3: Network partition
├── PostgreSQL: Updated ✓
├── Elasticsearch: Times out, retries...
├── Transaction held open
└── Result: Database connection exhaustion
Scenario 4: Order of operations
├── Request A: Update price to $100
├── Request B: Update price to $120
├── PostgreSQL order: A then B → $120 ✓
├── ES order (due to retries): B then A → $100 ✗
└── Result: Elasticsearch has wrong value!
1.2 What We Need
REQUIREMENTS FOR RELIABLE INDEXING
1. CONSISTENCY
└── Eventually consistent with source of truth (PostgreSQL)
2. RELIABILITY
└── Never lose an update, even during failures
3. ORDERING
└── Updates applied in correct order
4. PERFORMANCE
└── Don't slow down the main application
5. SCALABILITY
└── Handle bulk imports (millions of products)
6. OBSERVABILITY
└── Know when indexing falls behind
1.3 The Solution: Change Data Capture
CHANGE DATA CAPTURE (CDC) PATTERN
Instead of:
Application → PostgreSQL
→ Elasticsearch (dual-write)
Use:
Application → PostgreSQL → CDC → Kafka → Elasticsearch
│
└── Single source of truth
│
└── Changes captured as events
│
└── Events processed into index
Benefits:
├── Decoupled: App doesn't know about ES
├── Reliable: Events persisted in Kafka
├── Ordered: Kafka maintains order per partition
├── Scalable: Can replay events, add consumers
└── Observable: Monitor Kafka lag
Chapter 2: Change Data Capture with Debezium
2.1 How CDC Works
CDC ARCHITECTURE
┌─────────────────────────────────────────────────────────────────────────┐
│ CHANGE DATA CAPTURE FLOW │
│ │
│ ┌──────────────┐ │
│ │ Application │ │
│ └──────┬───────┘ │
│ │ INSERT/UPDATE/DELETE │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ PostgreSQL │────▶│ WAL (Write │ │
│ │ │ │ Ahead Log) │ │
│ └──────────────┘ └──────┬───────┘ │
│ │ │
│ │ Logical Replication │
│ ▼ │
│ ┌──────────────┐ │
│ │ Debezium │ │
│ │ Connector │ │
│ └──────┬───────┘ │
│ │ │
│ │ CDC Events │
│ ▼ │
│ ┌──────────────┐ │
│ │ Kafka │ │
│ │ Topics │ │
│ └──────┬───────┘ │
│ │ │
│ │ Consume │
│ ▼ │
│ ┌──────────────┐ │
│ │ Indexing │ │
│ │ Pipeline │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │Elasticsearch │ │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
KEY INSIGHT:
CDC reads from database's transaction log (WAL)
├── Captures ALL changes (INSERT, UPDATE, DELETE)
├── Maintains transaction order
├── Doesn't impact application performance
└── Can replay from any point in time
2.2 Debezium Configuration
# config/debezium_connector.py
"""
Debezium PostgreSQL connector configuration.
Debezium reads PostgreSQL's Write-Ahead Log (WAL)
and publishes changes to Kafka topics.
"""
DEBEZIUM_CONNECTOR_CONFIG = {
"name": "products-connector",
"config": {
# Connector class
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
# Database connection
"database.hostname": "postgres.internal",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${secrets:debezium-password}",
"database.dbname": "ecommerce",
# Logical replication slot
"slot.name": "debezium_products",
"plugin.name": "pgoutput", # PostgreSQL 10+ native
# Tables to capture
"table.include.list": "public.products,public.product_variants,public.product_images",
# Topic configuration
"topic.prefix": "ecommerce",
# Results in topics like: ecommerce.public.products
# Message transformations
"transforms": "unwrap,extractKey",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "id",
# Snapshot configuration
"snapshot.mode": "initial", # Full snapshot on first run
# Error handling
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.deadletterqueue.topic.name": "dlq.products",
# Performance tuning
"max.batch.size": 2048,
"max.queue.size": 8192,
"poll.interval.ms": 100,
}
}
# PostgreSQL setup required for CDC
POSTGRESQL_CDC_SETUP = """
-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;
-- Create replication slot (if not using Debezium's auto-creation)
SELECT pg_create_logical_replication_slot('debezium_products', 'pgoutput');
-- Create publication for CDC
CREATE PUBLICATION debezium_publication FOR TABLE
products,
product_variants,
product_images;
-- Grant permissions to Debezium user
CREATE USER debezium WITH REPLICATION PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
"""
2.3 CDC Event Structure
# models/cdc_events.py
"""
CDC event structures from Debezium.
Debezium produces events with before/after state,
operation type, and metadata.
"""
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, Any
from enum import Enum
class Operation(Enum):
CREATE = "c" # Insert
UPDATE = "u" # Update
DELETE = "d" # Delete
READ = "r" # Snapshot read
@dataclass
class CDCEvent:
"""
Debezium CDC event.
Example Kafka message:
{
"before": {"id": 123, "name": "Old Name", "price": 100},
"after": {"id": 123, "name": "New Name", "price": 120},
"source": {
"version": "2.0.0",
"connector": "postgresql",
"name": "ecommerce",
"ts_ms": 1672531200000,
"db": "ecommerce",
"schema": "public",
"table": "products",
"txId": 12345,
"lsn": 123456789
},
"op": "u",
"ts_ms": 1672531200100
}
"""
operation: Operation
before: Optional[Dict[str, Any]] # State before change (null for inserts)
after: Optional[Dict[str, Any]] # State after change (null for deletes)
table: str
timestamp: datetime
transaction_id: int
lsn: int # Log Sequence Number (for ordering)
@classmethod
def from_kafka_message(cls, message: dict) -> "CDCEvent":
"""Parse Debezium Kafka message."""
source = message.get("source", {})
return cls(
operation=Operation(message["op"]),
before=message.get("before"),
after=message.get("after"),
table=source.get("table", "unknown"),
timestamp=datetime.fromtimestamp(message["ts_ms"] / 1000),
transaction_id=source.get("txId", 0),
lsn=source.get("lsn", 0),
)
@property
def is_delete(self) -> bool:
return self.operation == Operation.DELETE
@property
def document_id(self) -> Optional[str]:
"""Get the document ID from before or after state."""
state = self.after or self.before
return str(state.get("id")) if state else None
@property
def current_state(self) -> Optional[Dict[str, Any]]:
"""Get current state (after for create/update, before for delete)."""
return self.after if not self.is_delete else self.before
Part II: Indexing Pipeline Implementation
Chapter 3: Building the Pipeline
3.1 Core Pipeline Architecture
# pipeline/indexing_pipeline.py
"""
Indexing pipeline that consumes CDC events and updates Elasticsearch.
Key responsibilities:
1. Consume CDC events from Kafka
2. Transform to Elasticsearch documents
3. Batch updates for efficiency
4. Handle errors and retries
5. Track indexing progress
"""
import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from collections import defaultdict
logger = logging.getLogger(__name__)
@dataclass
class IndexingConfig:
"""Indexing pipeline configuration."""
# Kafka settings
kafka_brokers: str = "kafka:9092"
consumer_group: str = "product-indexer"
topics: List[str] = field(default_factory=lambda: ["ecommerce.public.products"])
# Batching settings
batch_size: int = 500 # Documents per bulk request
batch_timeout_seconds: float = 5.0 # Max wait before flushing
# Elasticsearch settings
es_index: str = "products"
es_timeout_seconds: int = 30
# Error handling
max_retries: int = 3
retry_backoff_seconds: float = 1.0
# Performance
max_concurrent_batches: int = 3
@dataclass
class IndexingMetrics:
"""Metrics for monitoring indexing pipeline."""
events_received: int = 0
documents_indexed: int = 0
documents_deleted: int = 0
errors: int = 0
last_event_timestamp: Optional[datetime] = None
lag_seconds: float = 0.0
def to_dict(self) -> dict:
return {
"events_received": self.events_received,
"documents_indexed": self.documents_indexed,
"documents_deleted": self.documents_deleted,
"errors": self.errors,
"lag_seconds": self.lag_seconds,
}
class ProductIndexingPipeline:
"""
Main indexing pipeline for products.
Consumes CDC events and maintains Elasticsearch index.
"""
def __init__(
self,
config: IndexingConfig,
kafka_consumer,
es_client,
document_transformer,
):
self.config = config
self.kafka = kafka_consumer
self.es = es_client
self.transformer = document_transformer
self.metrics = IndexingMetrics()
self._batch: List[Dict[str, Any]] = []
self._batch_lock = asyncio.Lock()
self._last_flush = datetime.utcnow()
self._running = False
async def start(self):
"""Start the indexing pipeline."""
logger.info("Starting indexing pipeline")
self._running = True
# Start background tasks
await asyncio.gather(
self._consume_events(),
self._flush_periodically(),
self._report_metrics(),
)
async def stop(self):
"""Stop the pipeline gracefully."""
logger.info("Stopping indexing pipeline")
self._running = False
# Flush remaining batch
await self._flush_batch()
async def _consume_events(self):
"""Consume CDC events from Kafka."""
async for message in self.kafka.consume(self.config.topics):
if not self._running:
break
try:
await self._process_message(message)
except Exception as e:
logger.error(f"Error processing message: {e}", exc_info=True)
self.metrics.errors += 1
async def _process_message(self, message):
"""Process a single CDC event."""
# Parse CDC event
event = CDCEvent.from_kafka_message(message.value)
self.metrics.events_received += 1
self.metrics.last_event_timestamp = event.timestamp
# Calculate lag
self.metrics.lag_seconds = (
datetime.utcnow() - event.timestamp
).total_seconds()
# Transform to index operation
operation = await self._event_to_operation(event)
if operation:
async with self._batch_lock:
self._batch.append(operation)
# Flush if batch is full
if len(self._batch) >= self.config.batch_size:
await self._flush_batch()
async def _event_to_operation(self, event: CDCEvent) -> Optional[dict]:
"""Transform CDC event to Elasticsearch bulk operation."""
if event.is_delete:
return {
"_op_type": "delete",
"_index": self.config.es_index,
"_id": event.document_id,
}
# Transform to search document
document = await self.transformer.transform(event.after)
if document:
return {
"_op_type": "index",
"_index": self.config.es_index,
"_id": event.document_id,
"_source": document,
}
return None
async def _flush_batch(self):
"""Flush accumulated batch to Elasticsearch."""
async with self._batch_lock:
if not self._batch:
return
batch = self._batch
self._batch = []
self._last_flush = datetime.utcnow()
# Execute bulk request with retry
success = await self._bulk_index_with_retry(batch)
if success:
# Update metrics
for op in batch:
if op["_op_type"] == "delete":
self.metrics.documents_deleted += 1
else:
self.metrics.documents_indexed += 1
async def _bulk_index_with_retry(self, operations: List[dict]) -> bool:
"""Execute bulk index with exponential backoff retry."""
for attempt in range(self.config.max_retries):
try:
response = await self.es.bulk(
operations=operations,
timeout=f"{self.config.es_timeout_seconds}s"
)
# Check for errors
if response.get("errors"):
failed = [
item for item in response["items"]
if "error" in item.get("index", item.get("delete", {}))
]
logger.warning(f"Bulk request had {len(failed)} failures")
# Could implement partial retry here
return True
except Exception as e:
logger.warning(
f"Bulk index attempt {attempt + 1} failed: {e}"
)
if attempt < self.config.max_retries - 1:
wait_time = self.config.retry_backoff_seconds * (2 ** attempt)
await asyncio.sleep(wait_time)
logger.error(f"Bulk index failed after {self.config.max_retries} attempts")
self.metrics.errors += len(operations)
return False
async def _flush_periodically(self):
"""Background task to flush batches on timeout."""
while self._running:
await asyncio.sleep(1)
async with self._batch_lock:
time_since_flush = (
datetime.utcnow() - self._last_flush
).total_seconds()
if (self._batch and
time_since_flush >= self.config.batch_timeout_seconds):
await self._flush_batch()
async def _report_metrics(self):
"""Periodically report metrics."""
while self._running:
await asyncio.sleep(10)
logger.info(f"Indexing metrics: {self.metrics.to_dict()}")
class DocumentTransformer:
"""
Transforms database rows to Elasticsearch documents.
Handles:
- Field mapping
- Data enrichment
- Denormalization
"""
def __init__(self, db_pool, cache):
self.db = db_pool
self.cache = cache
async def transform(self, row: dict) -> Optional[dict]:
"""Transform database row to search document."""
if not row:
return None
# Basic field mapping
document = {
"product_id": str(row["id"]),
"name": row["name"],
"description": row.get("description", ""),
"brand": row.get("brand", ""),
"category": row.get("category", ""),
"price": float(row.get("price", 0)),
"sale_price": float(row.get("sale_price", 0)) if row.get("sale_price") else None,
"in_stock": row.get("stock_quantity", 0) > 0,
"stock_quantity": row.get("stock_quantity", 0),
"rating": float(row.get("rating", 0)),
"review_count": row.get("review_count", 0),
"created_at": row.get("created_at"),
"updated_at": row.get("updated_at"),
}
# Enrich with related data (denormalization)
document = await self._enrich_document(document, row)
# Calculate derived fields
document = self._calculate_derived_fields(document)
return document
async def _enrich_document(self, document: dict, row: dict) -> dict:
"""Enrich document with related data."""
product_id = row["id"]
# Get categories path (for hierarchical faceting)
categories = await self._get_category_path(row.get("category_id"))
if categories:
document["categories"] = categories
# Get product tags
tags = await self._get_product_tags(product_id)
if tags:
document["tags"] = tags
# Get variant information (colors, sizes)
variants = await self._get_product_variants(product_id)
if variants:
document["colors"] = list(set(v["color"] for v in variants if v.get("color")))
document["sizes"] = list(set(v["size"] for v in variants if v.get("size")))
return document
def _calculate_derived_fields(self, document: dict) -> dict:
"""Calculate derived/computed fields."""
# Discount percentage
if document.get("sale_price") and document["price"] > 0:
discount = (document["price"] - document["sale_price"]) / document["price"]
document["discount_percent"] = int(discount * 100)
else:
document["discount_percent"] = 0
# Popularity score (for boosting)
document["popularity_score"] = self._calculate_popularity(document)
# Search boost field (combined text for searching)
document["search_text"] = " ".join([
document.get("name", ""),
document.get("brand", ""),
document.get("description", "")[:500],
" ".join(document.get("tags", [])),
])
return document
def _calculate_popularity(self, document: dict) -> float:
"""Calculate popularity score for ranking."""
# Weighted combination of signals
rating_score = document.get("rating", 0) / 5.0 # Normalize to 0-1
review_score = min(1.0, document.get("review_count", 0) / 1000) # Cap at 1000
# Recent products get a boost
# (would need actual date comparison in production)
recency_score = 0.5
return (rating_score * 0.4) + (review_score * 0.4) + (recency_score * 0.2)
async def _get_category_path(self, category_id: int) -> List[str]:
"""Get full category path for hierarchical faceting."""
if not category_id:
return []
# Check cache first
cache_key = f"category_path:{category_id}"
cached = await self.cache.get(cache_key)
if cached:
return cached
# Query database
path = await self.db.fetchval("""
WITH RECURSIVE cat_path AS (
SELECT id, name, parent_id, ARRAY[name] as path
FROM categories WHERE id = $1
UNION ALL
SELECT c.id, c.name, c.parent_id, c.name || cp.path
FROM categories c
JOIN cat_path cp ON c.id = cp.parent_id
)
SELECT path FROM cat_path WHERE parent_id IS NULL
""", category_id)
if path:
await self.cache.set(cache_key, path, ttl=3600)
return path or []
async def _get_product_tags(self, product_id: int) -> List[str]:
"""Get product tags."""
rows = await self.db.fetch("""
SELECT t.name FROM tags t
JOIN product_tags pt ON t.id = pt.tag_id
WHERE pt.product_id = $1
""", product_id)
return [r["name"] for r in rows]
async def _get_product_variants(self, product_id: int) -> List[dict]:
"""Get product variants (colors, sizes)."""
rows = await self.db.fetch("""
SELECT color, size, sku, stock_quantity
FROM product_variants
WHERE product_id = $1 AND stock_quantity > 0
""", product_id)
return [dict(r) for r in rows]
3.2 Handling Different Update Patterns
# pipeline/update_handlers.py
"""
Specialized handlers for different types of updates.
Some updates require different handling:
- Price changes: High priority, need fast indexing
- Stock changes: Critical for availability
- Description changes: Lower priority, can batch more
- Bulk imports: Need special handling to not overwhelm
"""
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class UpdatePriority(Enum):
CRITICAL = 1 # Stock out, price drops
HIGH = 2 # General updates
NORMAL = 3 # Description changes
LOW = 4 # Bulk imports
@dataclass
class PrioritizedUpdate:
"""Update with priority for queue ordering."""
product_id: str
operation: str # index, delete
document: Optional[dict]
priority: UpdatePriority
timestamp: float
class UpdateClassifier:
"""
Classifies updates by priority based on what changed.
"""
def classify(self, event: CDCEvent) -> UpdatePriority:
"""Determine priority of update."""
if event.is_delete:
return UpdatePriority.HIGH
before = event.before or {}
after = event.after or {}
# Stock went to zero - critical!
if before.get("stock_quantity", 0) > 0 and after.get("stock_quantity", 0) == 0:
return UpdatePriority.CRITICAL
# Stock restored - critical!
if before.get("stock_quantity", 0) == 0 and after.get("stock_quantity", 0) > 0:
return UpdatePriority.CRITICAL
# Price changed
if before.get("price") != after.get("price"):
return UpdatePriority.HIGH
# Sale price changed
if before.get("sale_price") != after.get("sale_price"):
return UpdatePriority.HIGH
# Only metadata changed (description, images, etc.)
return UpdatePriority.NORMAL
def is_significant_change(self, event: CDCEvent) -> bool:
"""Check if change affects search results."""
if event.is_delete:
return True
before = event.before or {}
after = event.after or {}
# Fields that affect search
significant_fields = {
"name", "description", "brand", "category",
"price", "sale_price", "stock_quantity",
"rating", "review_count"
}
for field in significant_fields:
if before.get(field) != after.get(field):
return True
return False
class PriorityIndexingPipeline(ProductIndexingPipeline):
"""
Indexing pipeline with priority queues.
Critical updates processed immediately.
Lower priority updates batched more aggressively.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.classifier = UpdateClassifier()
# Separate batches by priority
self._priority_batches = {
UpdatePriority.CRITICAL: [],
UpdatePriority.HIGH: [],
UpdatePriority.NORMAL: [],
UpdatePriority.LOW: [],
}
# Different batch sizes per priority
self._batch_sizes = {
UpdatePriority.CRITICAL: 10, # Small batches, fast flush
UpdatePriority.HIGH: 100,
UpdatePriority.NORMAL: 500,
UpdatePriority.LOW: 1000, # Large batches for bulk
}
async def _process_message(self, message):
"""Process with priority classification."""
event = CDCEvent.from_kafka_message(message.value)
self.metrics.events_received += 1
# Skip non-significant changes
if not self.classifier.is_significant_change(event):
return
# Classify priority
priority = self.classifier.classify(event)
# Transform and add to appropriate batch
operation = await self._event_to_operation(event)
if operation:
async with self._batch_lock:
self._priority_batches[priority].append(operation)
# Check if this priority batch should flush
if len(self._priority_batches[priority]) >= self._batch_sizes[priority]:
await self._flush_priority_batch(priority)
async def _flush_priority_batch(self, priority: UpdatePriority):
"""Flush a specific priority batch."""
batch = self._priority_batches[priority]
if not batch:
return
self._priority_batches[priority] = []
logger.info(f"Flushing {len(batch)} {priority.name} priority updates")
await self._bulk_index_with_retry(batch)
Part III: Bulk Indexing Strategies
Chapter 4: Initial Load and Bulk Imports
4.1 The Bulk Import Challenge
BULK IMPORT SCENARIO
Situation:
├── 1 million new products from vendor import
├── Normal indexing: ~100 products/sec from CDC
├── Time to index: 1M / 100 = 10,000 seconds ≈ 3 hours
└── Users searching for products that don't exist yet!
Challenges:
├── Don't overwhelm Elasticsearch cluster
├── Don't starve normal CDC updates
├── Don't cause search latency spikes
└── Handle failures gracefully
4.2 Bulk Indexing Implementation
# pipeline/bulk_indexer.py
"""
Bulk indexing for initial loads and large imports.
Strategies:
1. Rate limiting to protect cluster
2. Parallel workers with coordination
3. Progress tracking and resumability
4. Separate from real-time pipeline
"""
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, AsyncIterator
import hashlib
logger = logging.getLogger(__name__)
@dataclass
class BulkIndexJob:
"""Represents a bulk indexing job."""
job_id: str
source: str # "postgres", "csv", "api"
query: Optional[str] # SQL query for postgres source
total_documents: int
indexed_documents: int = 0
failed_documents: int = 0
status: str = "pending" # pending, running, completed, failed
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
last_processed_id: Optional[str] = None # For resumability
class BulkIndexer:
"""
Handles bulk indexing with rate limiting and progress tracking.
"""
def __init__(
self,
es_client,
db_pool,
transformer,
job_store,
max_rate_per_second: int = 5000,
batch_size: int = 1000,
parallel_workers: int = 4,
):
self.es = es_client
self.db = db_pool
self.transformer = transformer
self.jobs = job_store
self.max_rate = max_rate_per_second
self.batch_size = batch_size
self.parallel_workers = parallel_workers
# Rate limiter
self._rate_limiter = asyncio.Semaphore(parallel_workers)
self._tokens = max_rate_per_second
self._last_refill = datetime.utcnow()
async def create_job(
self,
source: str,
query: str = None,
total: int = None
) -> BulkIndexJob:
"""Create a new bulk indexing job."""
job_id = hashlib.md5(
f"{source}:{query}:{datetime.utcnow().isoformat()}".encode()
).hexdigest()[:12]
# Count total if not provided
if total is None and query:
count_query = f"SELECT COUNT(*) FROM ({query}) AS subq"
total = await self.db.fetchval(count_query)
job = BulkIndexJob(
job_id=job_id,
source=source,
query=query,
total_documents=total or 0
)
await self.jobs.save(job)
return job
async def run_job(self, job: BulkIndexJob):
"""Execute a bulk indexing job."""
job.status = "running"
job.started_at = datetime.utcnow()
await self.jobs.save(job)
logger.info(f"Starting bulk index job {job.job_id}: {job.total_documents} documents")
try:
# Stream documents and index in batches
batch = []
async for document in self._stream_documents(job):
batch.append(document)
if len(batch) >= self.batch_size:
await self._index_batch_with_rate_limit(batch)
job.indexed_documents += len(batch)
job.last_processed_id = batch[-1].get("product_id")
await self.jobs.save(job)
batch = []
# Log progress
progress = (job.indexed_documents / job.total_documents) * 100
if job.indexed_documents % 10000 == 0:
logger.info(
f"Job {job.job_id}: {job.indexed_documents}/{job.total_documents} "
f"({progress:.1f}%)"
)
# Final batch
if batch:
await self._index_batch_with_rate_limit(batch)
job.indexed_documents += len(batch)
job.status = "completed"
job.completed_at = datetime.utcnow()
duration = (job.completed_at - job.started_at).total_seconds()
rate = job.indexed_documents / duration
logger.info(
f"Job {job.job_id} completed: {job.indexed_documents} documents "
f"in {duration:.1f}s ({rate:.0f} docs/sec)"
)
except Exception as e:
logger.error(f"Job {job.job_id} failed: {e}", exc_info=True)
job.status = "failed"
raise
finally:
await self.jobs.save(job)
async def resume_job(self, job: BulkIndexJob):
"""Resume a failed or interrupted job."""
if job.status not in ("failed", "running"):
raise ValueError(f"Cannot resume job with status {job.status}")
logger.info(
f"Resuming job {job.job_id} from {job.last_processed_id}, "
f"{job.indexed_documents}/{job.total_documents} done"
)
await self.run_job(job)
async def _stream_documents(
self,
job: BulkIndexJob
) -> AsyncIterator[dict]:
"""Stream documents from source."""
if job.source == "postgres":
async for doc in self._stream_from_postgres(job):
yield doc
elif job.source == "csv":
async for doc in self._stream_from_csv(job):
yield doc
else:
raise ValueError(f"Unknown source: {job.source}")
async def _stream_from_postgres(
self,
job: BulkIndexJob
) -> AsyncIterator[dict]:
"""Stream documents from PostgreSQL using cursor."""
query = job.query
# Add resume condition if needed
if job.last_processed_id:
# Assume query is ordered by id
query = f"""
SELECT * FROM ({query}) AS subq
WHERE id > {job.last_processed_id}
ORDER BY id
"""
# Use server-side cursor for memory efficiency
async with self.db.transaction():
async for row in self.db.cursor(query, prefetch=self.batch_size):
document = await self.transformer.transform(dict(row))
if document:
yield document
async def _index_batch_with_rate_limit(self, batch: list):
"""Index batch with rate limiting to protect cluster."""
# Acquire rate limit tokens
await self._acquire_tokens(len(batch))
# Index batch
async with self._rate_limiter:
operations = [
{
"_op_type": "index",
"_index": "products",
"_id": doc["product_id"],
"_source": doc
}
for doc in batch
]
response = await self.es.bulk(operations=operations)
if response.get("errors"):
error_count = sum(
1 for item in response["items"]
if "error" in item.get("index", {})
)
logger.warning(f"Batch had {error_count} errors")
async def _acquire_tokens(self, count: int):
"""Token bucket rate limiting."""
while True:
now = datetime.utcnow()
elapsed = (now - self._last_refill).total_seconds()
# Refill tokens
self._tokens = min(
self.max_rate,
self._tokens + elapsed * self.max_rate
)
self._last_refill = now
if self._tokens >= count:
self._tokens -= count
return
# Wait for more tokens
wait_time = (count - self._tokens) / self.max_rate
await asyncio.sleep(wait_time)
# =============================================================================
# Bulk Index API
# =============================================================================
async def reindex_all_products(bulk_indexer: BulkIndexer):
"""Full reindex of all products."""
job = await bulk_indexer.create_job(
source="postgres",
query="""
SELECT p.*,
array_agg(DISTINCT pv.color) as colors,
array_agg(DISTINCT pv.size) as sizes
FROM products p
LEFT JOIN product_variants pv ON p.id = pv.product_id
WHERE p.deleted_at IS NULL
GROUP BY p.id
ORDER BY p.id
"""
)
await bulk_indexer.run_job(job)
return job
async def index_vendor_import(
bulk_indexer: BulkIndexer,
vendor_id: str,
import_batch_id: str
):
"""Index products from a vendor import."""
job = await bulk_indexer.create_job(
source="postgres",
query=f"""
SELECT * FROM products
WHERE vendor_id = '{vendor_id}'
AND import_batch_id = '{import_batch_id}'
ORDER BY id
"""
)
await bulk_indexer.run_job(job)
return job
Part IV: Zero-Downtime Reindexing
Chapter 5: The Reindexing Problem
5.1 Why Reindex?
SCENARIOS REQUIRING REINDEX
1. MAPPING CHANGES
├── Adding new fields
├── Changing field types
└── Modifying analyzers
2. ANALYSIS CHANGES
├── Adding new synonyms
├── Changing tokenizer
└── Adding new filters
3. SHARD RESTRUCTURING
├── Increasing shard count for growth
└── Rebalancing data
4. DATA FIXES
├── Fixing corrupted documents
└── Re-enriching with new logic
PROBLEM:
You can't modify mappings of existing fields.
You must create a new index and migrate data.
But search must keep working during migration!
5.2 Index Alias Pattern
INDEX ALIAS PATTERN
Instead of:
Application → "products" index
Use:
Application → "products" alias → "products_v1" index
Benefits:
├── Alias is a pointer, can be changed atomically
├── Can point to multiple indexes (for migration)
├── Application doesn't know actual index name
└── Zero-downtime switches
Migration flow:
┌────────────────────────────────────────────────────────────────────────┐
│ │
│ BEFORE: │
│ "products" alias → "products_v1" (current) │
│ │
│ DURING MIGRATION: │
│ "products" alias → "products_v1" (current, still serving reads) │
│ "products_write" alias → "products_v2" (new, receiving updates) │
│ │
│ AFTER SWITCH: │
│ "products" alias → "products_v2" (new) │
│ "products_v1" → (deleted after verification) │
│ │
└────────────────────────────────────────────────────────────────────────┘
5.3 Zero-Downtime Reindex Implementation
# pipeline/reindex.py
"""
Zero-downtime reindexing with index aliases.
Process:
1. Create new index with updated mapping
2. Set up dual-write (CDC writes to both)
3. Bulk copy existing data
4. Verify counts match
5. Atomic alias switch
6. Clean up old index
"""
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
logger = logging.getLogger(__name__)
@dataclass
class ReindexPlan:
"""Plan for reindex operation."""
plan_id: str
old_index: str
new_index: str
alias: str
new_mapping: dict
new_settings: dict
status: str = "planned"
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
# Progress tracking
documents_copied: int = 0
documents_total: int = 0
class ZeroDowntimeReindexer:
"""
Handles zero-downtime reindexing.
"""
def __init__(
self,
es_client,
indexing_pipeline,
bulk_indexer,
):
self.es = es_client
self.pipeline = indexing_pipeline
self.bulk = bulk_indexer
async def create_plan(
self,
alias: str,
new_mapping: dict,
new_settings: dict = None
) -> ReindexPlan:
"""Create a reindex plan."""
# Get current index behind alias
alias_info = await self.es.indices.get_alias(name=alias)
old_index = list(alias_info.keys())[0]
# Generate new index name
version = int(old_index.split("_v")[-1]) if "_v" in old_index else 1
new_index = f"{alias}_v{version + 1}"
# Get document count
count = await self.es.count(index=old_index)
return ReindexPlan(
plan_id=f"reindex_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}",
old_index=old_index,
new_index=new_index,
alias=alias,
new_mapping=new_mapping,
new_settings=new_settings or {},
documents_total=count["count"]
)
async def execute(self, plan: ReindexPlan) -> ReindexPlan:
"""
Execute reindex plan with zero downtime.
"""
plan.status = "running"
plan.started_at = datetime.utcnow()
try:
# Step 1: Create new index
logger.info(f"Creating new index: {plan.new_index}")
await self._create_new_index(plan)
# Step 2: Set up dual write
logger.info("Setting up dual write")
await self._setup_dual_write(plan)
# Step 3: Copy existing data
logger.info(f"Copying {plan.documents_total} documents")
await self._copy_data(plan)
# Step 4: Verify counts
logger.info("Verifying document counts")
await self._verify_counts(plan)
# Step 5: Atomic alias switch
logger.info("Switching alias")
await self._switch_alias(plan)
# Step 6: Clean up
logger.info("Cleaning up old index")
await self._cleanup(plan)
plan.status = "completed"
plan.completed_at = datetime.utcnow()
duration = (plan.completed_at - plan.started_at).total_seconds()
logger.info(
f"Reindex completed in {duration:.1f}s: "
f"{plan.documents_copied} documents migrated"
)
except Exception as e:
plan.status = "failed"
logger.error(f"Reindex failed: {e}", exc_info=True)
# Attempt rollback
await self._rollback(plan)
raise
return plan
async def _create_new_index(self, plan: ReindexPlan):
"""Create new index with updated mapping."""
body = {
"mappings": plan.new_mapping,
"settings": plan.new_settings
}
await self.es.indices.create(index=plan.new_index, body=body)
async def _setup_dual_write(self, plan: ReindexPlan):
"""Configure pipeline to write to both indexes."""
# Add write alias pointing to new index
await self.es.indices.put_alias(
index=plan.new_index,
name=f"{plan.alias}_write"
)
# Configure indexing pipeline to use write alias
# This ensures new updates go to both old and new index
self.pipeline.set_write_indexes([plan.old_index, plan.new_index])
async def _copy_data(self, plan: ReindexPlan):
"""Copy data from old index to new index."""
# Use Elasticsearch's reindex API with scroll
body = {
"source": {
"index": plan.old_index,
"size": 1000 # Batch size
},
"dest": {
"index": plan.new_index
}
}
# Start reindex task
task = await self.es.reindex(
body=body,
wait_for_completion=False,
refresh=True
)
task_id = task["task"]
# Monitor progress
while True:
status = await self.es.tasks.get(task_id=task_id)
if status["completed"]:
plan.documents_copied = status["task"]["status"]["created"]
break
plan.documents_copied = status["task"]["status"].get("created", 0)
progress = (plan.documents_copied / plan.documents_total) * 100
logger.info(f"Reindex progress: {progress:.1f}%")
await asyncio.sleep(5)
async def _verify_counts(self, plan: ReindexPlan):
"""Verify new index has all documents."""
# Refresh to ensure all documents are searchable
await self.es.indices.refresh(index=plan.new_index)
old_count = (await self.es.count(index=plan.old_index))["count"]
new_count = (await self.es.count(index=plan.new_index))["count"]
# Allow small difference due to concurrent writes
diff_pct = abs(old_count - new_count) / old_count * 100
if diff_pct > 0.1: # More than 0.1% difference
raise ValueError(
f"Count mismatch: old={old_count}, new={new_count} "
f"({diff_pct:.2f}% difference)"
)
logger.info(f"Count verified: old={old_count}, new={new_count}")
async def _switch_alias(self, plan: ReindexPlan):
"""Atomic alias switch."""
# Atomic operation: remove from old, add to new
await self.es.indices.update_aliases(body={
"actions": [
{
"remove": {
"index": plan.old_index,
"alias": plan.alias
}
},
{
"add": {
"index": plan.new_index,
"alias": plan.alias
}
}
]
})
# Remove write alias
await self.es.indices.delete_alias(
index=plan.new_index,
name=f"{plan.alias}_write"
)
# Update pipeline to write only to new index
self.pipeline.set_write_indexes([plan.new_index])
async def _cleanup(self, plan: ReindexPlan):
"""Clean up old index after successful migration."""
# Wait a bit to ensure no in-flight requests
await asyncio.sleep(30)
# Delete old index
await self.es.indices.delete(index=plan.old_index)
logger.info(f"Deleted old index: {plan.old_index}")
async def _rollback(self, plan: ReindexPlan):
"""Rollback on failure."""
logger.warning("Rolling back reindex")
try:
# Remove write alias if it exists
try:
await self.es.indices.delete_alias(
index=plan.new_index,
name=f"{plan.alias}_write"
)
except:
pass
# Delete new index
if await self.es.indices.exists(index=plan.new_index):
await self.es.indices.delete(index=plan.new_index)
# Reset pipeline to only write to old index
self.pipeline.set_write_indexes([plan.old_index])
logger.info("Rollback completed")
except Exception as e:
logger.error(f"Rollback failed: {e}")
# =============================================================================
# Usage Example
# =============================================================================
async def add_new_field_to_products():
"""Example: Adding a new 'brand_id' field to products."""
# New mapping with additional field
new_mapping = {
"properties": {
# ... existing fields ...
"brand_id": {
"type": "keyword"
},
"brand_popularity": {
"type": "float"
}
}
}
reindexer = ZeroDowntimeReindexer(es, pipeline, bulk_indexer)
# Create plan
plan = await reindexer.create_plan(
alias="products",
new_mapping=new_mapping
)
print(f"Reindex plan: {plan.old_index} → {plan.new_index}")
print(f"Documents to migrate: {plan.documents_total}")
# Execute
await reindexer.execute(plan)
Part V: Monitoring and Troubleshooting
Chapter 6: Indexing Observability
6.1 Key Metrics
# monitoring/indexing_metrics.py
"""
Metrics for monitoring indexing pipeline health.
"""
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List
@dataclass
class IndexingHealthMetrics:
"""Core metrics for indexing health."""
# Lag metrics
kafka_consumer_lag: int # Messages behind
indexing_lag_seconds: float # Time behind real-time
# Throughput metrics
events_per_second: float
documents_per_second: float
# Error metrics
error_rate: float
dlq_size: int
# Resource metrics
batch_queue_depth: int
es_bulk_queue_size: int
def is_healthy(self) -> bool:
"""Check if indexing is healthy."""
return (
self.indexing_lag_seconds < 60 and # Less than 1 minute behind
self.error_rate < 0.01 and # Less than 1% errors
self.dlq_size < 1000 # DLQ not growing
)
class IndexingMonitor:
"""
Monitors indexing pipeline health.
"""
def __init__(
self,
kafka_admin,
es_client,
pipeline,
alerter
):
self.kafka = kafka_admin
self.es = es_client
self.pipeline = pipeline
self.alerter = alerter
# Alert thresholds
self.lag_warning_seconds = 60
self.lag_critical_seconds = 300
self.error_rate_warning = 0.01
self.error_rate_critical = 0.05
async def collect_metrics(self) -> IndexingHealthMetrics:
"""Collect current metrics."""
# Get Kafka consumer lag
lag = await self._get_consumer_lag()
# Get pipeline metrics
pipeline_metrics = self.pipeline.metrics
# Calculate rates
events_per_second = self._calculate_rate(
pipeline_metrics.events_received
)
# Get ES metrics
es_stats = await self.es.nodes.stats(metric="thread_pool")
bulk_queue = es_stats["nodes"][list(es_stats["nodes"].keys())[0]]["thread_pool"]["bulk"]["queue"]
return IndexingHealthMetrics(
kafka_consumer_lag=lag,
indexing_lag_seconds=pipeline_metrics.lag_seconds,
events_per_second=events_per_second,
documents_per_second=self._calculate_rate(
pipeline_metrics.documents_indexed
),
error_rate=pipeline_metrics.errors / max(1, pipeline_metrics.events_received),
dlq_size=await self._get_dlq_size(),
batch_queue_depth=len(self.pipeline._batch),
es_bulk_queue_size=bulk_queue,
)
async def check_health(self):
"""Check health and alert if needed."""
metrics = await self.collect_metrics()
# Check lag
if metrics.indexing_lag_seconds > self.lag_critical_seconds:
await self.alerter.critical(
"Indexing lag critical",
f"Lag: {metrics.indexing_lag_seconds:.0f}s"
)
elif metrics.indexing_lag_seconds > self.lag_warning_seconds:
await self.alerter.warning(
"Indexing lag warning",
f"Lag: {metrics.indexing_lag_seconds:.0f}s"
)
# Check error rate
if metrics.error_rate > self.error_rate_critical:
await self.alerter.critical(
"Indexing error rate critical",
f"Error rate: {metrics.error_rate:.2%}"
)
elif metrics.error_rate > self.error_rate_warning:
await self.alerter.warning(
"Indexing error rate elevated",
f"Error rate: {metrics.error_rate:.2%}"
)
return metrics
async def _get_consumer_lag(self) -> int:
"""Get Kafka consumer lag."""
consumer_group = self.pipeline.config.consumer_group
# Get consumer group offsets
offsets = await self.kafka.list_consumer_group_offsets(consumer_group)
total_lag = 0
for tp, offset_meta in offsets.items():
# Get end offset for partition
end_offsets = await self.kafka.list_offsets({tp: -1})
end_offset = end_offsets[tp].offset
lag = end_offset - offset_meta.offset
total_lag += lag
return total_lag
async def _get_dlq_size(self) -> int:
"""Get dead letter queue size."""
dlq_topic = "dlq.products"
offsets = await self.kafka.list_offsets({
(dlq_topic, 0): -1 # Get end offset
})
return offsets[(dlq_topic, 0)].offset
# =============================================================================
# Dashboard Metrics
# =============================================================================
GRAFANA_DASHBOARD = """
INDEXING PIPELINE DASHBOARD
┌────────────────────────────────────────────────────────────────────────┐
│ INDEXING HEALTH │
│ │
│ LATENCY │
│ ├── Indexing lag: [████░░░░░░] 45s │
│ ├── Kafka consumer lag: [██░░░░░░░░] 1,234 messages │
│ └── ES bulk latency: [███░░░░░░░] 120ms p99 │
│ │
│ THROUGHPUT │
│ ├── CDC events/sec: [████████░░] 850/sec │
│ ├── Docs indexed/sec: [████████░░] 820/sec │
│ └── Bulk requests/sec: [██████░░░░] 12/sec │
│ │
│ ERRORS │
│ ├── Error rate: [█░░░░░░░░░] 0.2% │
│ ├── DLQ size: 47 messages │
│ └── ES rejections: 0/sec │
│ │
│ RESOURCES │
│ ├── Pipeline batch: [████░░░░░░] 234 pending │
│ ├── ES bulk queue: [██░░░░░░░░] 45 pending │
│ └── Memory usage: [██████░░░░] 2.1 GB │
│ │
└────────────────────────────────────────────────────────────────────────┘
"""
6.2 Troubleshooting Common Issues
# troubleshooting/runbooks.py
"""
Runbooks for common indexing issues.
"""
RUNBOOKS = {
"high_indexing_lag": """
RUNBOOK: High Indexing Lag
SYMPTOMS:
- Indexing lag > 60 seconds
- Search results stale
- Kafka consumer lag growing
DIAGNOSIS:
1. Check Elasticsearch cluster health:
GET /_cluster/health
2. Check for bulk rejections:
GET /_nodes/stats/thread_pool/bulk
3. Check indexing pipeline logs:
kubectl logs -l app=indexing-pipeline
4. Check Kafka consumer lag:
kafka-consumer-groups --describe --group product-indexer
COMMON CAUSES:
1. Elasticsearch overloaded:
- Check cluster CPU/memory
- Check bulk queue size
- Solution: Scale data nodes or reduce indexing rate
2. Slow document transformation:
- Check enrichment query latency
- Solution: Add caching, optimize queries
3. Network issues:
- Check ES connection errors
- Solution: Check network, increase timeouts
4. Kafka partition imbalance:
- Check partition assignment
- Solution: Rebalance consumers
RESOLUTION:
1. If ES overloaded: Reduce batch size, add rate limiting
2. If transformation slow: Enable caching, skip enrichment temporarily
3. If Kafka issues: Restart consumers, check broker health
""",
"indexing_errors": """
RUNBOOK: High Indexing Error Rate
SYMPTOMS:
- Error rate > 1%
- DLQ growing
- Documents missing from search
DIAGNOSIS:
1. Check error types in logs:
grep -i error indexing-pipeline.log | tail -100
2. Sample DLQ messages:
kafka-console-consumer --topic dlq.products --from-beginning --max-messages 10
3. Check ES mapping conflicts:
GET /products/_mapping
COMMON CAUSES:
1. Mapping conflicts:
- New field has wrong type
- Solution: Fix source data or update mapping
2. Document too large:
- Document exceeds http.max_content_length
- Solution: Truncate fields, increase limit
3. Invalid data:
- Null values in required fields
- Solution: Add validation in transformer
RESOLUTION:
1. Fix root cause
2. Replay DLQ messages:
kafka-console-consumer --topic dlq.products |
kafka-console-producer --topic ecommerce.public.products
""",
"stale_data": """
RUNBOOK: Stale Data in Search Results
SYMPTOMS:
- Recently updated products show old data
- Customer complaints about wrong prices
DIAGNOSIS:
1. Check indexing lag:
GET /metrics/indexing_lag
2. Verify document in ES:
GET /products/_doc/{product_id}
3. Check CDC events:
kafka-console-consumer --topic ecommerce.public.products
--partition 0 --offset latest --max-messages 10
4. Check PostgreSQL for recent changes:
SELECT * FROM products WHERE id = {id}
COMMON CAUSES:
1. Indexing lag:
- Pipeline behind
- Solution: See "high_indexing_lag" runbook
2. CDC not capturing changes:
- Replication slot behind
- Solution: Check Debezium connector status
3. Cache serving stale data:
- Search result cache not invalidated
- Solution: Reduce cache TTL, flush cache
RESOLUTION:
1. For individual document: Force reindex
POST /products/_update/{product_id}
2. For many documents: Trigger bulk reindex
"""
}
Part VI: Real-World Patterns
Chapter 7: Production Patterns
7.1 Multi-Table CDC
# pipeline/multi_table_cdc.py
"""
Handling CDC from multiple related tables.
Challenge:
- products, product_variants, product_images are separate tables
- Change to any table should update the search document
- Need to denormalize into single document
"""
class MultiTableIndexer:
"""
Handles CDC events from multiple tables.
When any related table changes, re-indexes the parent product.
"""
def __init__(self, es_client, db_pool, transformer):
self.es = es_client
self.db = db_pool
self.transformer = transformer
# Map table to parent relationship
self.table_relationships = {
"products": {"type": "parent", "id_field": "id"},
"product_variants": {"type": "child", "parent_field": "product_id"},
"product_images": {"type": "child", "parent_field": "product_id"},
"product_tags": {"type": "child", "parent_field": "product_id"},
}
async def process_event(self, event: CDCEvent):
"""Process CDC event and determine what to reindex."""
table = event.table
relationship = self.table_relationships.get(table)
if not relationship:
return # Unknown table
# Determine product ID to reindex
if relationship["type"] == "parent":
product_id = event.document_id
else:
# Get parent ID from child record
state = event.after or event.before
product_id = str(state.get(relationship["parent_field"]))
if not product_id:
return
# Fetch and reindex the complete product
await self._reindex_product(product_id)
async def _reindex_product(self, product_id: str):
"""Fetch full product data and reindex."""
# Fetch complete product with all related data
row = await self.db.fetchrow("""
SELECT p.*,
COALESCE(json_agg(DISTINCT pv.*) FILTER (WHERE pv.id IS NOT NULL), '[]') as variants,
COALESCE(json_agg(DISTINCT pi.*) FILTER (WHERE pi.id IS NOT NULL), '[]') as images,
COALESCE(array_agg(DISTINCT t.name) FILTER (WHERE t.id IS NOT NULL), '{}') as tags
FROM products p
LEFT JOIN product_variants pv ON p.id = pv.product_id
LEFT JOIN product_images pi ON p.id = pi.product_id
LEFT JOIN product_tags pt ON p.id = pt.product_id
LEFT JOIN tags t ON pt.tag_id = t.id
WHERE p.id = $1
GROUP BY p.id
""", int(product_id))
if not row:
# Product deleted - remove from index
await self.es.delete(index="products", id=product_id, ignore=[404])
return
# Transform and index
document = await self.transformer.transform(dict(row))
await self.es.index(
index="products",
id=product_id,
document=document
)
7.2 Handling Deletes Correctly
# pipeline/delete_handler.py
"""
Proper handling of deleted documents.
Challenges:
- Soft deletes vs hard deletes
- Cascading deletes
- Tombstone events
"""
class DeleteHandler:
"""
Handles document deletion in search index.
"""
def __init__(self, es_client, config):
self.es = es_client
# Soft delete: Keep in index but mark as deleted
# Hard delete: Remove from index
self.soft_delete_enabled = config.get("soft_delete", False)
async def handle_delete(self, event: CDCEvent):
"""Process delete event."""
product_id = event.document_id
if self.soft_delete_enabled:
await self._soft_delete(product_id)
else:
await self._hard_delete(product_id)
async def _hard_delete(self, product_id: str):
"""Remove document from index."""
await self.es.delete(
index="products",
id=product_id,
ignore=[404] # Ignore if already deleted
)
async def _soft_delete(self, product_id: str):
"""Mark document as deleted but keep in index."""
# Update document to mark as deleted
await self.es.update(
index="products",
id=product_id,
body={
"doc": {
"is_deleted": True,
"deleted_at": datetime.utcnow().isoformat(),
"in_stock": False, # Also mark out of stock
}
},
ignore=[404]
)
async def purge_deleted(self, older_than_days: int = 30):
"""Permanently remove soft-deleted documents."""
await self.es.delete_by_query(
index="products",
body={
"query": {
"bool": {
"must": [
{"term": {"is_deleted": True}},
{
"range": {
"deleted_at": {
"lt": f"now-{older_than_days}d"
}
}
}
]
}
}
}
)
Summary
What We Learned Today
DAY 2 SUMMARY: INDEXING PIPELINE
THE PROBLEM
├── Dual-write is dangerous (inconsistency, failures)
├── Search index can get out of sync
├── Need reliable, ordered updates
└── Must handle bulk imports too
CHANGE DATA CAPTURE
├── Read from database WAL (transaction log)
├── Debezium connector for PostgreSQL
├── Publish changes to Kafka
├── Maintains order, handles failures
└── Decouples app from search
INDEXING PIPELINE
├── Consume CDC events from Kafka
├── Transform to search documents
├── Batch for efficiency
├── Handle errors with DLQ
└── Monitor lag and health
BULK INDEXING
├── Rate limiting to protect cluster
├── Progress tracking
├── Resumability
├── Separate from real-time pipeline
ZERO-DOWNTIME REINDEX
├── Index alias pattern
├── Create new index
├── Dual write during migration
├── Atomic alias switch
└── Clean up old index
MONITORING
├── Indexing lag (seconds behind)
├── Consumer lag (messages behind)
├── Error rate
├── DLQ size
└── ES bulk queue
Key Takeaways
INDEXING PIPELINE KEY TAKEAWAYS
1. NEVER DUAL-WRITE
Use CDC instead — it's safer and more reliable
2. KAFKA PROVIDES DURABILITY
Events persisted, can replay, maintains order
3. BATCH FOR EFFICIENCY
Don't index one at a time — use bulk API
4. MONITOR LAG
Know how far behind you are at all times
5. PLAN FOR REINDEX
Use aliases from day one — you WILL need to reindex
DEFAULT PATTERN:
PostgreSQL → Debezium → Kafka → Indexing Pipeline → Elasticsearch
Interview Tip
WHEN ASKED "HOW DO YOU KEEP SEARCH IN SYNC?"
"I would use Change Data Capture rather than dual-write.
Dual-write has consistency issues — if either write fails,
data gets out of sync, and ordering is hard to guarantee.
With CDC, we read from the database's transaction log,
publish changes to Kafka, and have a separate indexing
pipeline consume and update Elasticsearch.
Benefits:
- Single source of truth (PostgreSQL)
- Events persisted and ordered (Kafka)
- Decoupled and scalable
- Can replay from any point
For bulk imports, I'd use a separate rate-limited process
to avoid overwhelming the cluster. And I'd always use
index aliases so we can do zero-downtime reindexing."
This shows you understand production realities.
Tomorrow's Preview
Day 3: Query Processing & Relevance — "Finding the needle and ranking the haystack"
We'll cover:
- How BM25 scoring works
- Query vs Filter performance
- Boosting strategies for business rules
- Function score queries
- Handling "no results" gracefully
PREVIEW: THE RELEVANCE PROBLEM
User searches: "apple"
Results returned:
1. Apple iPhone 14 Pro (Score: 8.5)
2. Fresh Red Apples (Score: 8.3)
3. Apple Watch Series 8 (Score: 8.1)
4. Apple Pie Recipe Book (Score: 7.9)
Is this the right order?
How do we know?
How do we tune it?
End of Week 7, Day 2
Tomorrow: Day 3 — Query Processing & Relevance: Finding the needle and ranking the haystack