Himanshu Kukreja
0%
LearnSystem DesignWeek 7Search Fundamentals
Day 01

Week 7 — Day 1: Search Fundamentals & Architecture

System Design Mastery Series — Building Blocks Week


Introduction

Today we answer a fundamental question: "Why is SQL LIKE so slow, and what do search engines do differently?"

The answer lies in a simple but powerful data structure: the inverted index. Understanding this structure is the key to understanding all of search.

Today's Theme: "How search engines think"

We'll cover:

  • Why databases are bad at text search
  • Inverted indexes from first principles
  • Text analysis: turning words into searchable tokens
  • Document modeling for search
  • Elasticsearch architecture
  • Designing our product search system

Part I: The Problem with Database Search

Chapter 1: Why SQL LIKE Fails

1.1 The Query That Kills Your Database

-- User searches for "red running shoes"
SELECT * FROM products 
WHERE name LIKE '%red%' 
  AND name LIKE '%running%'
  AND name LIKE '%shoes%';

Let's trace what happens:

QUERY EXECUTION

Table: products (50 million rows)

Step 1: Full Table Scan
├── Database must examine EVERY row
├── No index can help with leading wildcard (%)
├── 50M string comparisons
└── Time: 30+ seconds

Step 2: String Matching
├── For each row, check if 'red' appears anywhere
├── Then check 'running'
├── Then check 'shoes'
├── Multiple passes through same data
└── CPU intensive

Result:
├── 30+ seconds query time
├── Database CPU at 100%
├── Connection pool exhausted
├── Other queries blocked
└── Users leave

1.2 Why Indexes Don't Help

B-TREE INDEX LIMITATION

Index on products.name:

┌────────────────────────────────────────────────────────────┐
│                    B-Tree Index                            │
│                                                            │
│  "Apple iPhone 14"                                         │
│  "Blue Running Shoes"                                      │
│  "Green Tea Kettle"                                        │
│  "Nike Air Max"                                            │
│  "Red Nike Running Shoes"                                  │
│  "Running Shorts Blue"                                     │
│                                                            │
│  Index is sorted alphabetically by FULL string             │
│                                                            │
│  Can efficiently find:                                     │
│  ✓ WHERE name = 'Nike Air Max'        (exact match)        │
│  ✓ WHERE name LIKE 'Nike%'            (prefix match)       │
│                                                            │
│  Cannot efficiently find:                                  │
│  ✗ WHERE name LIKE '%Running%'        (substring)          │
│  ✗ WHERE name LIKE '%Shoes%'          (substring)          │
│                                                            │
│  Why? B-tree is sorted by string start, not contents       │
│                                                            │
└────────────────────────────────────────────────────────────┘

1.3 Additional Problems

BEYOND PERFORMANCE: FUNCTIONALITY GAPS

1. NO RELEVANCE RANKING
   Query: "nike shoes"
   SQL returns results in arbitrary order
   User wants: Most relevant first

2. NO TYPO TOLERANCE
   Query: "nikee shoes" (typo)
   SQL returns: 0 results
   User expected: Nike products

3. NO WORD BOUNDARIES
   Query: "red"
   SQL LIKE '%red%' matches:
   ├── "Red Shoes"         ✓ (correct)
   ├── "Bored Games"       ✗ (incorrect - "bored" contains "red")
   └── "Hundred Items"     ✗ (incorrect)

4. NO STEMMING
   Query: "running"
   SQL won't match: "run", "runs", "runner"
   All should match!

5. NO SYNONYMS
   Query: "couch"
   SQL won't match: "sofa"
   Both mean the same thing!

Chapter 2: The Inverted Index

2.1 The Key Insight

Instead of asking "which documents contain this word?" for every query, we pre-compute and store "for each word, which documents contain it?"

FORWARD INDEX (Traditional Database)

Document → Words

Doc1: "Red Nike Running Shoes"    → [red, nike, running, shoes]
Doc2: "Blue Adidas Running Shoes" → [blue, adidas, running, shoes]
Doc3: "Nike Basketball Shoes"     → [nike, basketball, shoes]

To find "running": Must scan ALL documents


INVERTED INDEX (Search Engine)

Word → Documents

red        → [Doc1]
nike       → [Doc1, Doc3]
running    → [Doc1, Doc2]
shoes      → [Doc1, Doc2, Doc3]
blue       → [Doc2]
adidas     → [Doc2]
basketball → [Doc3]

To find "running": Instant lookup → [Doc1, Doc2]

2.2 Building an Inverted Index

# core/inverted_index.py

from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional
import re


@dataclass
class Posting:
    """A single occurrence of a term in a document."""
    doc_id: str
    term_frequency: int  # How many times term appears
    positions: List[int] = field(default_factory=list)  # Word positions


@dataclass
class PostingList:
    """All occurrences of a term across documents."""
    term: str
    document_frequency: int = 0  # How many docs contain term
    postings: List[Posting] = field(default_factory=list)


class InvertedIndex:
    """
    A simple inverted index implementation.
    
    This is the core data structure behind Elasticsearch,
    Lucene, and all modern search engines.
    """
    
    def __init__(self):
        # term -> PostingList
        self.index: Dict[str, PostingList] = {}
        
        # doc_id -> document metadata
        self.documents: Dict[str, dict] = {}
        
        # Statistics for scoring
        self.total_docs = 0
        self.avg_doc_length = 0.0
        self._total_terms = 0
    
    def index_document(self, doc_id: str, content: str, metadata: dict = None):
        """
        Index a document.
        
        Steps:
        1. Tokenize content into terms
        2. For each term, add to posting list
        3. Store document metadata
        """
        
        # Tokenize (we'll improve this later)
        terms = self._tokenize(content)
        
        # Count term frequencies
        term_counts = defaultdict(int)
        term_positions = defaultdict(list)
        
        for position, term in enumerate(terms):
            term_counts[term] += 1
            term_positions[term].append(position)
        
        # Update index
        for term, count in term_counts.items():
            if term not in self.index:
                self.index[term] = PostingList(term=term)
            
            posting_list = self.index[term]
            posting_list.document_frequency += 1
            posting_list.postings.append(Posting(
                doc_id=doc_id,
                term_frequency=count,
                positions=term_positions[term]
            ))
        
        # Store document
        self.documents[doc_id] = {
            "content": content,
            "length": len(terms),
            "metadata": metadata or {}
        }
        
        # Update statistics
        self.total_docs += 1
        self._total_terms += len(terms)
        self.avg_doc_length = self._total_terms / self.total_docs
    
    def search(self, query: str) -> List[str]:
        """
        Simple search: find documents containing ALL query terms.
        
        Returns doc_ids sorted by relevance (we'll improve scoring later).
        """
        
        terms = self._tokenize(query)
        
        if not terms:
            return []
        
        # Get posting lists for all terms
        posting_lists = []
        for term in terms:
            if term not in self.index:
                return []  # Term not found, no results
            posting_lists.append(self.index[term])
        
        # Find intersection (documents containing ALL terms)
        result_docs = self._intersect_posting_lists(posting_lists)
        
        return result_docs
    
    def _intersect_posting_lists(
        self,
        posting_lists: List[PostingList]
    ) -> List[str]:
        """
        Find documents that appear in ALL posting lists.
        
        Optimization: Start with smallest list, intersect with others.
        """
        
        if not posting_lists:
            return []
        
        # Sort by document frequency (smallest first)
        sorted_lists = sorted(
            posting_lists,
            key=lambda pl: pl.document_frequency
        )
        
        # Start with smallest list
        result = {p.doc_id for p in sorted_lists[0].postings}
        
        # Intersect with remaining lists
        for posting_list in sorted_lists[1:]:
            list_docs = {p.doc_id for p in posting_list.postings}
            result = result.intersection(list_docs)
            
            if not result:
                return []  # Early termination
        
        return list(result)
    
    def _tokenize(self, text: str) -> List[str]:
        """
        Simple tokenization.
        
        Real search engines use much more sophisticated analysis.
        """
        
        # Lowercase
        text = text.lower()
        
        # Split on non-alphanumeric
        tokens = re.findall(r'\w+', text)
        
        return tokens


# =============================================================================
# Example Usage
# =============================================================================

def demonstrate_inverted_index():
    """Show how inverted index works."""
    
    index = InvertedIndex()
    
    # Index some products
    products = [
        ("p1", "Red Nike Running Shoes", {"price": 120, "brand": "Nike"}),
        ("p2", "Blue Adidas Running Shoes", {"price": 110, "brand": "Adidas"}),
        ("p3", "Nike Basketball Shoes White", {"price": 150, "brand": "Nike"}),
        ("p4", "Red Running Shorts", {"price": 45, "brand": "Nike"}),
        ("p5", "Professional Running Watch", {"price": 299, "brand": "Garmin"}),
    ]
    
    for doc_id, content, metadata in products:
        index.index_document(doc_id, content, metadata)
    
    # Show the index structure
    print("=== INVERTED INDEX ===")
    for term, posting_list in sorted(index.index.items()):
        docs = [p.doc_id for p in posting_list.postings]
        print(f"  {term:15} → {docs}")
    
    # Search examples
    print("\n=== SEARCHES ===")
    
    queries = ["running shoes", "nike", "red running", "basketball"]
    for query in queries:
        results = index.search(query)
        print(f"  '{query}' → {results}")


# Output:
# === INVERTED INDEX ===
#   adidas          → ['p2']
#   basketball      → ['p3']
#   blue            → ['p2']
#   nike            → ['p1', 'p3', 'p4']
#   professional    → ['p5']
#   red             → ['p1', 'p4']
#   running         → ['p1', 'p2', 'p4', 'p5']
#   shoes           → ['p1', 'p2', 'p3']
#   shorts          → ['p4']
#   watch           → ['p5']
#   white           → ['p3']
#
# === SEARCHES ===
#   'running shoes' → ['p1', 'p2']
#   'nike' → ['p1', 'p3', 'p4']
#   'red running' → ['p1', 'p4']
#   'basketball' → ['p3']

2.3 Why This Is Fast

QUERY: "running shoes"

Step 1: Tokenize query
├── "running shoes" → ["running", "shoes"]
└── Time: microseconds

Step 2: Look up "running" in index
├── Direct hash lookup
├── Returns: [Doc1, Doc2, Doc4, Doc5]
└── Time: microseconds

Step 3: Look up "shoes" in index
├── Direct hash lookup
├── Returns: [Doc1, Doc2, Doc3]
└── Time: microseconds

Step 4: Intersect results
├── [Doc1, Doc2, Doc4, Doc5] ∩ [Doc1, Doc2, Doc3]
├── Result: [Doc1, Doc2]
└── Time: microseconds

Total time: < 1 millisecond

Compare to SQL LIKE:
├── 50 million string comparisons
├── Multiple passes
└── Time: 30+ seconds

Part II: Text Analysis

Chapter 3: From Text to Tokens

The inverted index is only as good as the tokens we create. The text analysis pipeline transforms raw text into searchable tokens.

3.1 The Analysis Pipeline

TEXT ANALYSIS PIPELINE

Input: "The Quick Brown Fox Jumps Over the Lazy Dog"

Step 1: CHARACTER FILTERS
├── Remove HTML: <b>text</b> → text
├── Convert special chars: café → cafe
└── Output: "The Quick Brown Fox Jumps Over the Lazy Dog"

Step 2: TOKENIZER
├── Split on whitespace/punctuation
└── Output: ["The", "Quick", "Brown", "Fox", "Jumps", "Over", "the", "Lazy", "Dog"]

Step 3: TOKEN FILTERS
├── Lowercase: THE → the
├── Stop words removal: the, over → removed
├── Stemming: jumps → jump
└── Output: ["quick", "brown", "fox", "jump", "lazi", "dog"]

Final tokens indexed: ["quick", "brown", "fox", "jump", "lazi", "dog"]

3.2 Text Analyzer Implementation

# core/analyzer.py

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional
import re


class CharacterFilter(ABC):
    """Transforms characters before tokenization."""
    
    @abstractmethod
    def filter(self, text: str) -> str:
        pass


class HTMLStripFilter(CharacterFilter):
    """Remove HTML tags."""
    
    def filter(self, text: str) -> str:
        return re.sub(r'<[^>]+>', '', text)


class MappingCharFilter(CharacterFilter):
    """Map characters to replacements."""
    
    def __init__(self, mappings: dict):
        self.mappings = mappings
    
    def filter(self, text: str) -> str:
        for old, new in self.mappings.items():
            text = text.replace(old, new)
        return text


class Tokenizer(ABC):
    """Splits text into tokens."""
    
    @abstractmethod
    def tokenize(self, text: str) -> List[str]:
        pass


class StandardTokenizer(Tokenizer):
    """
    Standard tokenization on word boundaries.
    
    Handles:
    - Whitespace
    - Punctuation
    - Numbers
    """
    
    def tokenize(self, text: str) -> List[str]:
        # Split on non-alphanumeric, keeping numbers
        tokens = re.findall(r'\b\w+\b', text)
        return tokens


class WhitespaceTokenizer(Tokenizer):
    """Simple whitespace tokenization."""
    
    def tokenize(self, text: str) -> List[str]:
        return text.split()


class TokenFilter(ABC):
    """Transforms individual tokens."""
    
    @abstractmethod
    def filter(self, tokens: List[str]) -> List[str]:
        pass


class LowercaseFilter(TokenFilter):
    """Convert to lowercase."""
    
    def filter(self, tokens: List[str]) -> List[str]:
        return [t.lower() for t in tokens]


class StopWordsFilter(TokenFilter):
    """Remove common stop words."""
    
    DEFAULT_STOP_WORDS = {
        'a', 'an', 'the', 'and', 'or', 'but', 'in', 'on', 'at',
        'to', 'for', 'of', 'with', 'by', 'from', 'is', 'are',
        'was', 'were', 'be', 'been', 'being', 'have', 'has',
        'had', 'do', 'does', 'did', 'will', 'would', 'could',
        'should', 'may', 'might', 'must', 'shall', 'can',
        'this', 'that', 'these', 'those', 'it', 'its',
    }
    
    def __init__(self, stop_words: set = None):
        self.stop_words = stop_words or self.DEFAULT_STOP_WORDS
    
    def filter(self, tokens: List[str]) -> List[str]:
        return [t for t in tokens if t.lower() not in self.stop_words]


class PorterStemmer(TokenFilter):
    """
    Apply Porter Stemming algorithm.
    
    Reduces words to their root form:
    - running → run
    - shoes → shoe
    - quickly → quick
    """
    
    def filter(self, tokens: List[str]) -> List[str]:
        return [self._stem(t) for t in tokens]
    
    def _stem(self, word: str) -> str:
        """Simplified Porter Stemmer rules."""
        
        # Common suffix removal (simplified)
        suffixes = [
            ('ational', 'ate'),
            ('tional', 'tion'),
            ('enci', 'ence'),
            ('anci', 'ance'),
            ('izer', 'ize'),
            ('isation', 'ize'),
            ('ization', 'ize'),
            ('ation', 'ate'),
            ('ator', 'ate'),
            ('alism', 'al'),
            ('iveness', 'ive'),
            ('fulness', 'ful'),
            ('ousness', 'ous'),
            ('aliti', 'al'),
            ('iviti', 'ive'),
            ('biliti', 'ble'),
            ('ling', 'l'),
            ('ing', ''),
            ('ies', 'y'),
            ('es', ''),
            ('ed', ''),
            ('s', ''),
        ]
        
        for suffix, replacement in suffixes:
            if word.endswith(suffix) and len(word) > len(suffix) + 2:
                return word[:-len(suffix)] + replacement
        
        return word


class SynonymFilter(TokenFilter):
    """
    Expand tokens with synonyms.
    
    "couch" → ["couch", "sofa"]
    """
    
    def __init__(self, synonyms: dict):
        self.synonyms = synonyms
    
    def filter(self, tokens: List[str]) -> List[str]:
        result = []
        for token in tokens:
            result.append(token)
            if token in self.synonyms:
                result.extend(self.synonyms[token])
        return result


@dataclass
class Analyzer:
    """
    Complete text analysis pipeline.
    
    Combines character filters, tokenizer, and token filters.
    """
    
    char_filters: List[CharacterFilter]
    tokenizer: Tokenizer
    token_filters: List[TokenFilter]
    
    def analyze(self, text: str) -> List[str]:
        """Run text through the analysis pipeline."""
        
        # Apply character filters
        for char_filter in self.char_filters:
            text = char_filter.filter(text)
        
        # Tokenize
        tokens = self.tokenizer.tokenize(text)
        
        # Apply token filters
        for token_filter in self.token_filters:
            tokens = token_filter.filter(tokens)
        
        return tokens


# =============================================================================
# Pre-configured Analyzers
# =============================================================================

def create_standard_analyzer() -> Analyzer:
    """Standard analyzer for most use cases."""
    return Analyzer(
        char_filters=[
            HTMLStripFilter(),
        ],
        tokenizer=StandardTokenizer(),
        token_filters=[
            LowercaseFilter(),
            StopWordsFilter(),
        ]
    )


def create_product_analyzer() -> Analyzer:
    """Analyzer optimized for product search."""
    return Analyzer(
        char_filters=[
            HTMLStripFilter(),
            MappingCharFilter({
                '&': ' and ',
                '+': ' plus ',
            }),
        ],
        tokenizer=StandardTokenizer(),
        token_filters=[
            LowercaseFilter(),
            StopWordsFilter(),
            PorterStemmer(),
            SynonymFilter({
                'tv': ['television'],
                'phone': ['mobile', 'smartphone'],
                'laptop': ['notebook'],
                'couch': ['sofa'],
                'sneakers': ['shoes', 'trainers'],
            }),
        ]
    )


# =============================================================================
# Example Usage
# =============================================================================

def demonstrate_analysis():
    """Show how text analysis works."""
    
    analyzer = create_product_analyzer()
    
    texts = [
        "Red Nike Running Shoes",
        "The <b>Quick</b> Brown Fox",
        "Professional TV & Entertainment Center",
        "Comfortable Leather Couch",
    ]
    
    print("=== TEXT ANALYSIS ===")
    for text in texts:
        tokens = analyzer.analyze(text)
        print(f"  '{text}'")
        print(f"    → {tokens}")
        print()


# Output:
# === TEXT ANALYSIS ===
#   'Red Nike Running Shoes'
#     → ['red', 'nike', 'run', 'shoe']
#
#   'The <b>Quick</b> Brown Fox'
#     → ['quick', 'brown', 'fox']
#
#   'Professional TV & Entertainment Center'
#     → ['professional', 'tv', 'television', 'and', 'entertainment', 'center']
#
#   'Comfortable Leather Couch'
#     → ['comfortable', 'leather', 'couch', 'sofa']

3.3 Why Analysis Matters

SAME QUERY, DIFFERENT RESULTS

Query: "running shoes"

WITHOUT PROPER ANALYSIS
├── Tokens: ["running", "shoes"]
├── Index contains: ["Running", "SHOES", "shoe", "runner"]
├── Match: None (case mismatch, no stemming)
└── User sees: "No results found"

WITH PROPER ANALYSIS
├── Query tokens: ["run", "shoe"] (stemmed)
├── Index contains: ["run", "shoe"] (everything stemmed at index time)
├── Match: All running/shoes products
└── User sees: Relevant results

KEY INSIGHT:
Same analyzer MUST be used at:
1. Index time (when storing documents)
2. Query time (when searching)

Otherwise tokens won't match!

Part III: Document Modeling

4.1 The Product Document Schema

# schemas/product_document.py

"""
Product document schema for Elasticsearch.

Design principles:
1. Denormalize for search performance
2. Use appropriate field types
3. Configure analysis per field
4. Store what you need to display
"""

PRODUCT_MAPPING = {
    "mappings": {
        "properties": {
            # Identifiers
            "product_id": {
                "type": "keyword"  # Exact match only, no analysis
            },
            "sku": {
                "type": "keyword"
            },
            
            # Searchable text fields
            "name": {
                "type": "text",  # Full-text search
                "analyzer": "product_analyzer",
                "fields": {
                    "keyword": {
                        "type": "keyword"  # For sorting/aggregations
                    },
                    "autocomplete": {
                        "type": "text",
                        "analyzer": "autocomplete_analyzer",
                        "search_analyzer": "standard"
                    }
                }
            },
            "description": {
                "type": "text",
                "analyzer": "product_analyzer"
            },
            "brand": {
                "type": "text",
                "analyzer": "standard",
                "fields": {
                    "keyword": {
                        "type": "keyword"  # For faceting
                    }
                }
            },
            
            # Categorical fields (exact match, faceting)
            "category": {
                "type": "keyword"
            },
            "subcategory": {
                "type": "keyword"
            },
            "tags": {
                "type": "keyword"  # Array of tags
            },
            "color": {
                "type": "keyword"
            },
            "size": {
                "type": "keyword"
            },
            
            # Numeric fields
            "price": {
                "type": "float"
            },
            "sale_price": {
                "type": "float"
            },
            "discount_percent": {
                "type": "integer"
            },
            "rating": {
                "type": "float"
            },
            "review_count": {
                "type": "integer"
            },
            "stock_quantity": {
                "type": "integer"
            },
            
            # Boolean fields
            "in_stock": {
                "type": "boolean"
            },
            "is_featured": {
                "type": "boolean"
            },
            "is_new": {
                "type": "boolean"
            },
            
            # Date fields
            "created_at": {
                "type": "date"
            },
            "updated_at": {
                "type": "date"
            },
            
            # Geo field (for location-based search)
            "warehouse_location": {
                "type": "geo_point"
            },
            
            # Nested for complex structures
            "attributes": {
                "type": "nested",
                "properties": {
                    "name": {"type": "keyword"},
                    "value": {"type": "keyword"}
                }
            },
            
            # For boosting/scoring
            "popularity_score": {
                "type": "float"
            },
            "sales_rank": {
                "type": "integer"
            }
        }
    },
    
    "settings": {
        "index": {
            "number_of_shards": 5,
            "number_of_replicas": 1
        },
        "analysis": {
            "analyzer": {
                "product_analyzer": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": [
                        "lowercase",
                        "product_synonyms",
                        "porter_stem"
                    ]
                },
                "autocomplete_analyzer": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": [
                        "lowercase",
                        "edge_ngram_filter"
                    ]
                }
            },
            "filter": {
                "product_synonyms": {
                    "type": "synonym",
                    "synonyms": [
                        "tv, television",
                        "phone, mobile, smartphone",
                        "laptop, notebook",
                        "couch, sofa",
                        "sneakers, trainers, shoes"
                    ]
                },
                "edge_ngram_filter": {
                    "type": "edge_ngram",
                    "min_gram": 2,
                    "max_gram": 15
                }
            }
        }
    }
}

4.2 Field Type Selection Guide

ELASTICSEARCH FIELD TYPES

KEYWORD (exact match)
├── Use for: IDs, categories, tags, status
├── Not analyzed (stored as-is)
├── Supports: exact match, aggregations, sorting
├── Example: product_id, brand.keyword, color
└── DON'T use for: full-text search

TEXT (full-text search)
├── Use for: names, descriptions, content
├── Analyzed (tokenized, normalized)
├── Supports: full-text queries, relevance
├── Example: name, description
└── DON'T use for: exact match, sorting

NUMERIC (integer, float, long)
├── Use for: prices, quantities, scores
├── Supports: range queries, sorting, aggregations
├── Example: price, rating, stock_quantity
└── DON'T use for: IDs (use keyword)

DATE
├── Use for: timestamps
├── Supports: range queries, date math
├── Example: created_at, updated_at
└── Format: ISO 8601 recommended

BOOLEAN
├── Use for: flags
├── Example: in_stock, is_featured
└── Efficient for filtering

GEO_POINT
├── Use for: location data
├── Supports: distance queries, geo aggregations
├── Example: warehouse_location
└── Format: {lat, lon} or "lat,lon"

NESTED
├── Use for: arrays of objects needing independent queries
├── Example: product variants, attributes
├── More expensive (each nested doc is separate Lucene doc)
└── Use sparingly

4.3 Multi-Field Pattern

MULTI-FIELD PATTERN

One source field, multiple index representations:

"brand": {
    "type": "text",           // Full-text search
    "fields": {
        "keyword": {
            "type": "keyword" // Exact match, aggregations
        },
        "autocomplete": {
            "type": "text",
            "analyzer": "autocomplete_analyzer"
        }
    }
}

Usage:
├── Search: query on "brand" (analyzed text)
├── Facet: aggregate on "brand.keyword" (exact)
├── Sort: sort by "brand.keyword" (exact)
└── Autocomplete: query on "brand.autocomplete"

This is a VERY common pattern in production.

Part IV: Elasticsearch Architecture

Chapter 5: How Elasticsearch Works

5.1 Core Concepts

ELASTICSEARCH ARCHITECTURE

┌───────────────────────────────────────────────────────────────────────┐
│                         ELASTICSEARCH CLUSTER                         │
│                                                                       │
│  ┌────────────────────────────────────────────────────────────────┐   │
│  │                          INDEX                                 │   │
│  │                    (like a database table)                     │   │
│  │                                                                │   │
│  │   ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐           │   │
│  │   │ Shard 0 │  │ Shard 1 │  │ Shard 2 │  │ Shard 3 │           │   │
│  │   │(Primary)│  │(Primary)│  │(Primary)│  │(Primary)│           │   │
│  │   └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘           │   │
│  │        │            │            │            │                │   │
│  │   ┌────▼────┐  ┌────▼────┐  ┌────▼────┐  ┌────▼────┐           │   │
│  │   │ Shard 0 │  │ Shard 1 │  │ Shard 2 │  │ Shard 3 │           │   │
│  │   │(Replica)│  │(Replica)│  │(Replica)│  │(Replica)│           │   │
│  │   └─────────┘  └─────────┘  └─────────┘  └─────────┘           │   │
│  │                                                                │   │
│  └────────────────────────────────────────────────────────────────┘   │
│                                                                       │
│  NODES                                                                │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐      │
│  │  Node 1 │  │  Node 2 │  │  Node 3 │  │  Node 4 │  │  Node 5 │      │
│  │ (Data)  │  │ (Data)  │  │ (Data)  │  │(Master) │  │(Coordin)│      │
│  └─────────┘  └─────────┘  └─────────┘  └─────────┘  └─────────┘      │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

TERMINOLOGY:
├── Cluster: Group of nodes working together
├── Node: Single Elasticsearch server
├── Index: Collection of documents (like a table)
├── Shard: Horizontal partition of an index
├── Replica: Copy of a shard for redundancy
└── Document: Single record (like a row)

5.2 Node Roles

NODE ROLES

MASTER NODE
├── Cluster management (creating/deleting indexes)
├── Shard allocation decisions
├── Cluster state management
├── Should be lightweight (no data)
└── Recommend: 3 dedicated master nodes

DATA NODE
├── Stores shards
├── Executes searches and indexing
├── Memory and disk intensive
└── Scale horizontally for more capacity

COORDINATING NODE
├── Routes requests to appropriate shards
├── Gathers and merges results
├── Useful for heavy aggregations
└── Also called "client node"

INGEST NODE
├── Pre-processes documents before indexing
├── Runs ingest pipelines
└── Useful for data transformation

5.3 Query Execution Flow

SEARCH QUERY FLOW

Client sends: GET /products/_search?q=running+shoes

┌────────────────────────────────────────────────────────────────────────┐
│                                                                        │
│  1. COORDINATING NODE receives request                                 │
│     └── Parses query, determines which shards to query                 │
│                                                                        │
│  2. SCATTER phase                                                      │
│     ├── Query sent to ALL relevant shards (primary or replica)         │
│     ├── Shard 0: Search local inverted index                           │
│     ├── Shard 1: Search local inverted index                           │
│     ├── Shard 2: Search local inverted index                           │
│     └── Shard 3: Search local inverted index                           │
│                                                                        │
│  3. Each shard returns TOP N results                                   │
│     ├── Shard 0: [doc1: 0.95, doc5: 0.87, doc9: 0.82]                  │
│     ├── Shard 1: [doc2: 0.91, doc7: 0.84]                              │
│     ├── Shard 2: [doc3: 0.89, doc8: 0.81]                              │
│     └── Shard 3: [doc4: 0.88, doc6: 0.85]                              │
│                                                                        │
│  4. GATHER phase                                                       │
│     ├── Coordinating node collects all results                         │
│     ├── Merges and sorts by score                                      │
│     └── Returns top N to client                                        │
│                                                                        │
│  Final: [doc1: 0.95, doc2: 0.91, doc3: 0.89, doc4: 0.88, ...]          │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

5.4 Shard Sizing Guidelines

SHARD SIZING

Rule of thumb:
├── 20-40 GB per shard (sweet spot)
├── Max ~50 GB per shard
├── Min 1 GB per shard (avoid too small)
└── Aim for shards that fit in heap

Example for 500 GB index:
├── 500 GB / 30 GB per shard = ~17 shards
├── Round to nice number: 20 shards
├── With 1 replica: 40 total shard copies
└── Distributed across data nodes

TOO FEW SHARDS:
├── Individual shards too large
├── Can't scale horizontally
├── Recovery takes too long
└── Unbalanced load

TOO MANY SHARDS:
├── Overhead per shard (memory, file handles)
├── More coordination needed
├── Query performance degrades
└── "Oversharding" is common mistake

SHARDS ARE IMMUTABLE AFTER CREATION
├── Plan for growth (can't add shards later)
├── Use index aliases for flexibility
└── Consider time-based indices for logs

Part V: System Design

Chapter 6: Product Search Architecture

6.1 High-Level Architecture

# architecture/search_system.py

"""
Product Search System Architecture

Components:
1. Search API - handles user queries
2. Indexing Pipeline - keeps search fresh
3. Elasticsearch Cluster - stores and searches
4. Query Service - builds and optimizes queries
5. Suggestion Service - autocomplete
"""

from dataclasses import dataclass
from typing import List, Optional, Dict, Any


@dataclass
class SearchRequest:
    """User search request."""
    query: str
    filters: Dict[str, Any] = None  # category, brand, price_range
    sort: str = "relevance"  # relevance, price_asc, price_desc, rating
    page: int = 1
    page_size: int = 20


@dataclass
class SearchResult:
    """Search response."""
    products: List[dict]
    total: int
    facets: Dict[str, List[dict]]  # category counts, brand counts, etc.
    suggestions: List[str]  # "did you mean" suggestions
    took_ms: int


class ProductSearchService:
    """
    Main search service coordinating all components.
    """
    
    def __init__(
        self,
        es_client,
        query_builder,
        suggestion_service,
        cache
    ):
        self.es = es_client
        self.query_builder = query_builder
        self.suggestions = suggestion_service
        self.cache = cache
        
        self.index_name = "products"
    
    async def search(self, request: SearchRequest) -> SearchResult:
        """
        Execute product search.
        
        Flow:
        1. Check cache for identical query
        2. Build Elasticsearch query
        3. Execute search
        4. Process and return results
        """
        
        # Check cache
        cache_key = self._build_cache_key(request)
        cached = await self.cache.get(cache_key)
        if cached:
            return cached
        
        # Build query
        es_query = self.query_builder.build(request)
        
        # Execute search
        response = await self.es.search(
            index=self.index_name,
            body=es_query
        )
        
        # Process results
        result = self._process_response(response, request)
        
        # Cache results (short TTL for freshness)
        await self.cache.set(cache_key, result, ttl=60)
        
        return result
    
    def _process_response(
        self,
        response: dict,
        request: SearchRequest
    ) -> SearchResult:
        """Process Elasticsearch response."""
        
        hits = response["hits"]
        
        # Extract products
        products = [
            {
                "product_id": hit["_id"],
                "score": hit["_score"],
                **hit["_source"]
            }
            for hit in hits["hits"]
        ]
        
        # Extract facets
        facets = {}
        if "aggregations" in response:
            for agg_name, agg_data in response["aggregations"].items():
                facets[agg_name] = [
                    {"value": bucket["key"], "count": bucket["doc_count"]}
                    for bucket in agg_data.get("buckets", [])
                ]
        
        return SearchResult(
            products=products,
            total=hits["total"]["value"],
            facets=facets,
            suggestions=[],
            took_ms=response["took"]
        )
    
    def _build_cache_key(self, request: SearchRequest) -> str:
        """Build cache key from request."""
        import hashlib
        import json
        
        key_data = {
            "q": request.query,
            "f": request.filters,
            "s": request.sort,
            "p": request.page,
            "ps": request.page_size
        }
        
        key_str = json.dumps(key_data, sort_keys=True)
        return f"search:{hashlib.md5(key_str.encode()).hexdigest()}"


class QueryBuilder:
    """
    Builds Elasticsearch queries from search requests.
    """
    
    def build(self, request: SearchRequest) -> dict:
        """Build ES query DSL."""
        
        query = {
            "query": self._build_query(request),
            "from": (request.page - 1) * request.page_size,
            "size": request.page_size,
            "aggs": self._build_aggregations(),
            "highlight": self._build_highlight(),
        }
        
        # Add sorting
        if request.sort != "relevance":
            query["sort"] = self._build_sort(request.sort)
        
        return query
    
    def _build_query(self, request: SearchRequest) -> dict:
        """Build the query clause."""
        
        must = []
        filter_clauses = []
        
        # Full-text search on query
        if request.query:
            must.append({
                "multi_match": {
                    "query": request.query,
                    "fields": [
                        "name^3",        # Boost name matches
                        "brand^2",       # Boost brand matches
                        "description",
                        "category",
                        "tags"
                    ],
                    "type": "best_fields",
                    "fuzziness": "AUTO"  # Typo tolerance
                }
            })
        
        # Apply filters
        if request.filters:
            for field, value in request.filters.items():
                if field == "price_range":
                    filter_clauses.append({
                        "range": {
                            "price": {
                                "gte": value.get("min", 0),
                                "lte": value.get("max", 999999)
                            }
                        }
                    })
                elif field == "in_stock":
                    filter_clauses.append({
                        "term": {"in_stock": value}
                    })
                else:
                    # Keyword filter (category, brand, etc.)
                    if isinstance(value, list):
                        filter_clauses.append({
                            "terms": {f"{field}.keyword": value}
                        })
                    else:
                        filter_clauses.append({
                            "term": {f"{field}.keyword": value}
                        })
        
        # Always filter to in-stock by default
        filter_clauses.append({"term": {"in_stock": True}})
        
        return {
            "bool": {
                "must": must if must else [{"match_all": {}}],
                "filter": filter_clauses
            }
        }
    
    def _build_aggregations(self) -> dict:
        """Build facet aggregations."""
        
        return {
            "categories": {
                "terms": {
                    "field": "category",
                    "size": 20
                }
            },
            "brands": {
                "terms": {
                    "field": "brand.keyword",
                    "size": 20
                }
            },
            "price_ranges": {
                "range": {
                    "field": "price",
                    "ranges": [
                        {"key": "under_25", "to": 25},
                        {"key": "25_to_50", "from": 25, "to": 50},
                        {"key": "50_to_100", "from": 50, "to": 100},
                        {"key": "100_to_200", "from": 100, "to": 200},
                        {"key": "over_200", "from": 200}
                    ]
                }
            },
            "ratings": {
                "terms": {
                    "field": "rating",
                    "size": 5
                }
            }
        }
    
    def _build_highlight(self) -> dict:
        """Build highlight configuration."""
        
        return {
            "fields": {
                "name": {},
                "description": {"fragment_size": 150}
            },
            "pre_tags": ["<em>"],
            "post_tags": ["</em>"]
        }
    
    def _build_sort(self, sort: str) -> list:
        """Build sort clause."""
        
        sort_map = {
            "price_asc": [{"price": "asc"}],
            "price_desc": [{"price": "desc"}],
            "rating": [{"rating": "desc"}, {"review_count": "desc"}],
            "newest": [{"created_at": "desc"}],
            "popular": [{"sales_rank": "asc"}],
        }
        
        return sort_map.get(sort, [{"_score": "desc"}])

6.2 API Layer

# api/search_api.py

from fastapi import FastAPI, Query, HTTPException
from typing import Optional, List
import logging

logger = logging.getLogger(__name__)

app = FastAPI()


@app.get("/search")
async def search(
    q: str = Query(..., min_length=1, max_length=200),
    category: Optional[str] = None,
    brand: Optional[List[str]] = Query(None),
    min_price: Optional[float] = None,
    max_price: Optional[float] = None,
    in_stock: Optional[bool] = True,
    sort: str = Query("relevance", regex="^(relevance|price_asc|price_desc|rating|newest)$"),
    page: int = Query(1, ge=1, le=100),
    page_size: int = Query(20, ge=1, le=100),
):
    """
    Search products.
    
    Examples:
    - /search?q=running+shoes
    - /search?q=nike&category=shoes&min_price=50&max_price=150
    - /search?q=laptop&brand=Apple&brand=Dell&sort=price_asc
    """
    
    # Build filters
    filters = {}
    if category:
        filters["category"] = category
    if brand:
        filters["brand"] = brand
    if min_price is not None or max_price is not None:
        filters["price_range"] = {
            "min": min_price or 0,
            "max": max_price or 999999
        }
    if in_stock is not None:
        filters["in_stock"] = in_stock
    
    # Create request
    request = SearchRequest(
        query=q,
        filters=filters,
        sort=sort,
        page=page,
        page_size=page_size
    )
    
    try:
        result = await search_service.search(request)
        
        return {
            "query": q,
            "total": result.total,
            "page": page,
            "page_size": page_size,
            "took_ms": result.took_ms,
            "products": result.products,
            "facets": result.facets,
        }
        
    except Exception as e:
        logger.error(f"Search failed: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Search temporarily unavailable")


@app.get("/autocomplete")
async def autocomplete(
    q: str = Query(..., min_length=1, max_length=100),
    limit: int = Query(10, ge=1, le=20),
):
    """
    Autocomplete suggestions as user types.
    
    Example: /autocomplete?q=run → ["running shoes", "running watch", ...]
    """
    
    suggestions = await suggestion_service.get_suggestions(q, limit)
    
    return {
        "query": q,
        "suggestions": suggestions
    }

Part VI: Estimation & Sizing

Chapter 7: Cluster Sizing

7.1 Capacity Planning

# planning/capacity.py

"""
Capacity planning for product search cluster.
"""

def estimate_cluster_size(
    total_documents: int,
    avg_doc_size_kb: float,
    queries_per_second: int,
    indexing_per_second: int,
    retention_days: int = 0,  # 0 for no time-based rotation
    replica_count: int = 1,
) -> dict:
    """
    Estimate Elasticsearch cluster requirements.
    """
    
    # Storage calculation
    raw_data_gb = (total_documents * avg_doc_size_kb) / (1024 * 1024)
    
    # ES overhead (index structures, etc.) ~20-30%
    es_overhead = 1.3
    index_size_gb = raw_data_gb * es_overhead
    
    # With replicas
    total_storage_gb = index_size_gb * (1 + replica_count)
    
    # Shard calculation
    target_shard_size_gb = 30  # Sweet spot
    primary_shards = max(1, int(index_size_gb / target_shard_size_gb) + 1)
    total_shards = primary_shards * (1 + replica_count)
    
    # Node calculation
    # Rule: ~2 shards per GB of heap, max 20 shards per node
    shards_per_node = 15  # Conservative
    data_nodes_for_shards = max(3, (total_shards + shards_per_node - 1) // shards_per_node)
    
    # Memory: Want index to mostly fit in OS cache
    # ES heap: 50% of RAM, max 31GB
    # OS cache: other 50%
    heap_per_node_gb = 31
    os_cache_per_node_gb = 31
    
    storage_per_node_gb = total_storage_gb / data_nodes_for_shards
    ram_per_node_gb = max(64, storage_per_node_gb / 2)  # Aim for 50% cache ratio
    
    # CPU: Depends on query complexity
    # Rule of thumb: 1 core per 100 QPS for simple queries
    total_cores_needed = max(16, queries_per_second // 50)
    cores_per_node = max(8, total_cores_needed // data_nodes_for_shards)
    
    return {
        "storage": {
            "raw_data_gb": round(raw_data_gb, 1),
            "index_size_gb": round(index_size_gb, 1),
            "total_with_replicas_gb": round(total_storage_gb, 1),
        },
        "shards": {
            "primary_shards": primary_shards,
            "replica_shards": primary_shards * replica_count,
            "total_shards": total_shards,
        },
        "nodes": {
            "data_nodes": data_nodes_for_shards,
            "master_nodes": 3,  # Always 3 for HA
            "coordinating_nodes": max(2, queries_per_second // 5000),
        },
        "per_data_node": {
            "ram_gb": int(ram_per_node_gb),
            "heap_gb": min(31, int(ram_per_node_gb / 2)),
            "storage_gb": int(storage_per_node_gb * 1.5),  # 50% headroom
            "cpu_cores": cores_per_node,
        },
        "total_resources": {
            "total_ram_gb": int(ram_per_node_gb * data_nodes_for_shards),
            "total_storage_gb": int(storage_per_node_gb * data_nodes_for_shards * 1.5),
            "total_cores": cores_per_node * data_nodes_for_shards,
        }
    }


# Example: Our product search system
estimate = estimate_cluster_size(
    total_documents=50_000_000,
    avg_doc_size_kb=10,
    queries_per_second=10_000,
    indexing_per_second=100,
    replica_count=1,
)

print("""
CLUSTER SIZING ESTIMATE

Storage:
├── Raw data: {storage[raw_data_gb]} GB
├── Index size: {storage[index_size_gb]} GB
└── With replicas: {storage[total_with_replicas_gb]} GB

Shards:
├── Primary: {shards[primary_shards]}
├── Replicas: {shards[replica_shards]}
└── Total: {shards[total_shards]}

Nodes:
├── Data nodes: {nodes[data_nodes]}
├── Master nodes: {nodes[master_nodes]}
└── Coordinating nodes: {nodes[coordinating_nodes]}

Per Data Node:
├── RAM: {per_data_node[ram_gb]} GB
├── Heap: {per_data_node[heap_gb]} GB
├── Storage: {per_data_node[storage_gb]} GB
└── CPU: {per_data_node[cpu_cores]} cores

Total Resources:
├── RAM: {total_resources[total_ram_gb]} GB
├── Storage: {total_resources[total_storage_gb]} GB
└── CPU: {total_resources[total_cores]} cores
""".format(**estimate))

7.2 Final Architecture

PRODUCT SEARCH CLUSTER

┌────────────────────────────────────────────────────────────────────────┐
│                    PRODUCTION ARCHITECTURE                             │
│                                                                        │
│  LOAD BALANCER                                                         │
│  └── Routes to Search API instances                                    │
│                                                                        │
│  SEARCH API (6 instances, auto-scaling)                                │
│  ├── Handles /search and /autocomplete                                 │
│  ├── Query building and caching                                        │
│  └── 2 cores, 4GB RAM each                                             │
│                                                                        │
│  ELASTICSEARCH CLUSTER                                                 │
│  ├── Master nodes: 3 (m5.large)                                        │
│  │   └── Dedicated for cluster management                              │
│  ├── Data nodes: 6 (r5.2xlarge)                                        │
│  │   ├── 64 GB RAM (31 GB heap + 31 GB OS cache)                       │
│  │   ├── 500 GB SSD each                                               │
│  │   └── Holds ~7 shards each                                          │
│  └── Coordinating nodes: 2 (c5.xlarge)                                 │
│      └── Query routing and aggregation                                 │
│                                                                        │
│  INDEX CONFIGURATION                                                   │
│  ├── Primary shards: 20                                                │
│  ├── Replica shards: 20 (1 replica)                                    │
│  └── Total shards: 40                                                  │
│                                                                        │
│  SUPPORTING SERVICES                                                   │
│  ├── Redis: Query result cache (32 GB cluster)                         │
│  ├── Kafka: CDC events from PostgreSQL                                 │
│  └── Indexing Pipeline: 3 workers                                      │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Summary

What We Learned Today

DAY 1 SUMMARY: SEARCH FUNDAMENTALS

PROBLEM
├── SQL LIKE is O(n) - scans every row
├── No relevance ranking
├── No typo tolerance
├── No stemming or synonyms
└── Database dies under load

INVERTED INDEX
├── Pre-compute: word → documents
├── Query: instant lookup + intersection
├── O(1) lookup vs O(n) scan
└── Foundation of all search engines

TEXT ANALYSIS
├── Character filters → Tokenizer → Token filters
├── Lowercase, stemming, synonyms
├── MUST use same analyzer at index and query time
└── Determines what matches what

DOCUMENT MODELING
├── Choose field types carefully
├── keyword: exact match, aggregations
├── text: full-text search
├── Multi-field pattern: one source, multiple representations
└── Denormalize for search performance

ELASTICSEARCH ARCHITECTURE
├── Cluster → Nodes → Indexes → Shards
├── Shards are Lucene indexes
├── Primary + Replicas for HA
├── Scatter-gather query execution
└── Size shards 20-40 GB

CLUSTER SIZING
├── 50M docs × 10KB = 500 GB
├── 20 primary shards + 20 replicas
├── 6 data nodes × 64 GB RAM
├── 3 master nodes (dedicated)
└── 2 coordinating nodes

Key Takeaways

SEARCH FUNDAMENTALS KEY TAKEAWAYS

1. INVERTED INDEX IS THE KEY
   Everything else builds on this data structure

2. ANALYSIS DETERMINES MATCHING
   "running" matches "run" only if analyzer stems both

3. FIELD TYPE MATTERS
   keyword ≠ text — different purposes

4. PLAN SHARDS CAREFULLY
   Can't change after creation, plan for growth

5. ELASTICSEARCH IS NOT A DATABASE
   It's a search engine — different trade-offs

DEFAULT CHOICE: When in doubt, use Elasticsearch
with proper analysis and multi-field patterns

Interview Tip

WHEN ASKED "DESIGN A SEARCH SYSTEM"

Start with the inverted index:

"The core of any search system is the inverted index.
Instead of scanning documents for words, we pre-compute
which documents contain each word. This turns O(n) scans
into O(1) lookups.

For a 50M product catalog, I'd use Elasticsearch which
builds on Lucene's inverted index. Key design decisions:

1. Document schema - what fields to index and how
2. Text analysis - tokenization, stemming, synonyms
3. Shard strategy - 20 shards for 500GB
4. Indexing pipeline - how to keep search fresh

Want me to dive into any of these?"

This shows you understand the fundamentals.

Tomorrow's Preview

Day 2: Indexing Pipeline"Keeping search fresh"

We'll solve:

  • How to sync PostgreSQL → Elasticsearch
  • Change Data Capture (CDC) with Debezium
  • Handling bulk imports without overwhelming the cluster
  • Zero-downtime reindexing
  • What happens when indexing falls behind
PREVIEW: THE INDEXING PROBLEM

Product updated in PostgreSQL at 10:00:00
User searches at 10:00:01
Search returns OLD product data

How do we minimize this lag?
How do we handle millions of updates?
How do we reindex without downtime?

End of Week 7, Day 1

Tomorrow: Day 2 — Indexing Pipeline: Keeping search fresh