Day 04
Week 7 — Day 4: Advanced Search Features
System Design Mastery Series — Building Blocks Week
Introduction
Yesterday we mastered query processing and relevance tuning. Users can now search and get well-ranked results. But modern search is more than just matching queries to documents.
THE MODERN SEARCH EXPERIENCE
User opens search box:
├── Sees "Trending searches" (before typing anything)
│
User types "n":
├── Autocomplete shows: ["nike", "new balance", "north face"]
├── Response time: < 50ms
│
User types "nike ru":
├── Autocomplete: ["nike running shoes", "nike running shorts"]
│
User presses Enter:
├── 2,847 results
├── Facets: Category, Brand, Price, Color, Size, Rating
├── "Did you mean: nike running" (if typo detected)
│
User filters by "Running Shoes" category:
├── 423 results (instant update)
├── Facets update to show remaining options
│
User changes language to Spanish:
├── Same products, Spanish descriptions
├── Search for "zapatillas" finds "running shoes"
Today's Theme: "Building the search experience, not just the search engine"
We'll cover:
- Autocomplete with edge n-grams
- Search suggestions and "Did you mean"
- Faceted search and aggregations
- Synonyms and query expansion
- Multi-language search
- Search personalization
Part I: Autocomplete
Chapter 1: The Autocomplete Challenge
1.1 Why Autocomplete Is Hard
AUTOCOMPLETE REQUIREMENTS
SPEED
├── User expects suggestions while typing
├── Target: < 50ms response time
├── Network latency eats into budget
└── Must handle every keystroke
RELEVANCE
├── "ni" should show "nike" before "nikon" (for sports site)
├── Popular terms ranked higher
├── Recent/trending terms surface
└── Personalized to user history
SCALE
├── 50 million products
├── Millions of unique terms
├── Thousands of concurrent users typing
└── Every keystroke = query
TOLERANCE
├── Handle typos: "nikee" → "nike"
├── Handle partial words
├── Handle word order: "shoes nike" → "nike shoes"
└── Handle accents: "café" = "cafe"
1.2 Edge N-Gram Strategy
EDGE N-GRAM APPROACH
Instead of searching for prefix matches at query time,
pre-compute all prefixes at index time.
Document: "nike"
Standard tokenization:
→ ["nike"]
Edge n-gram tokenization (min=1, max=10):
→ ["n", "ni", "nik", "nike"]
Now "ni" is an exact match to the token "ni"!
Index time: O(n) extra tokens per word
Query time: O(1) lookup (like exact match)
Trade-off: More storage for faster queries
1.3 Implementation
# autocomplete/edge_ngram.py
"""
Edge n-gram autocomplete implementation.
"""
# Elasticsearch mapping for autocomplete
AUTOCOMPLETE_MAPPING = {
"settings": {
"analysis": {
"filter": {
"edge_ngram_filter": {
"type": "edge_ngram",
"min_gram": 1,
"max_gram": 15
},
"autocomplete_filter": {
"type": "edge_ngram",
"min_gram": 2, # Start at 2 to reduce noise
"max_gram": 20
}
},
"analyzer": {
# Used at INDEX time - generates n-grams
"autocomplete_index": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"autocomplete_filter"
]
},
# Used at SEARCH time - no n-grams, just lowercase
"autocomplete_search": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase"
]
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "standard",
"fields": {
"autocomplete": {
"type": "text",
"analyzer": "autocomplete_index",
"search_analyzer": "autocomplete_search"
}
}
},
"brand": {
"type": "text",
"fields": {
"autocomplete": {
"type": "text",
"analyzer": "autocomplete_index",
"search_analyzer": "autocomplete_search"
}
}
},
# For ranking suggestions
"popularity_score": {
"type": "float"
},
"search_count": {
"type": "long"
}
}
}
}
class AutocompleteService:
"""
Fast autocomplete using edge n-grams.
"""
def __init__(self, es_client, index: str = "products"):
self.es = es_client
self.index = index
async def suggest(
self,
prefix: str,
limit: int = 10,
category: str = None
) -> list:
"""
Get autocomplete suggestions for prefix.
Args:
prefix: What user has typed so far
limit: Max suggestions to return
category: Optional category filter
Returns:
List of suggestion objects with text and metadata
"""
if len(prefix) < 2:
return []
# Build query
query = self._build_autocomplete_query(prefix, category)
# Execute with size limit
response = await self.es.search(
index=self.index,
body=query,
size=limit,
_source=["name", "brand", "category", "popularity_score"]
)
# Process results
suggestions = []
seen = set() # Deduplicate
for hit in response["hits"]["hits"]:
source = hit["_source"]
# Create suggestion text
suggestion_text = source["name"]
# Deduplicate
if suggestion_text.lower() in seen:
continue
seen.add(suggestion_text.lower())
suggestions.append({
"text": suggestion_text,
"brand": source.get("brand"),
"category": source.get("category"),
"score": hit["_score"],
"product_id": hit["_id"]
})
return suggestions
def _build_autocomplete_query(
self,
prefix: str,
category: str = None
) -> dict:
"""Build autocomplete query."""
# Search on autocomplete fields
must = {
"multi_match": {
"query": prefix,
"fields": [
"name.autocomplete^3",
"brand.autocomplete^2"
],
"type": "bool_prefix" # Each term as prefix
}
}
# Optional category filter
filters = []
if category:
filters.append({"term": {"category": category}})
# Only in-stock products
filters.append({"term": {"in_stock": True}})
# Combine with popularity boost
query = {
"query": {
"function_score": {
"query": {
"bool": {
"must": [must],
"filter": filters
}
},
"functions": [
{
"field_value_factor": {
"field": "popularity_score",
"modifier": "log1p",
"missing": 1
}
}
],
"boost_mode": "multiply"
}
},
"collapse": {
# Deduplicate by name (one result per unique name)
"field": "name.keyword"
}
}
return query
async def suggest_queries(
self,
prefix: str,
limit: int = 5
) -> list:
"""
Suggest search queries (not products).
Uses a separate index of popular searches.
"""
response = await self.es.search(
index="search_queries",
body={
"query": {
"bool": {
"must": {
"match": {
"query.autocomplete": prefix
}
}
}
},
"sort": [
{"search_count": "desc"},
{"_score": "desc"}
]
},
size=limit
)
return [
{
"query": hit["_source"]["query"],
"count": hit["_source"]["search_count"]
}
for hit in response["hits"]["hits"]
]
# =============================================================================
# Search Query Index (for query suggestions)
# =============================================================================
SEARCH_QUERY_MAPPING = {
"settings": {
"analysis": {
"analyzer": {
"autocomplete_index": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "edge_ngram_filter"]
},
"autocomplete_search": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase"]
}
},
"filter": {
"edge_ngram_filter": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 20
}
}
}
},
"mappings": {
"properties": {
"query": {
"type": "text",
"analyzer": "standard",
"fields": {
"autocomplete": {
"type": "text",
"analyzer": "autocomplete_index",
"search_analyzer": "autocomplete_search"
},
"keyword": {
"type": "keyword"
}
}
},
"search_count": {
"type": "long"
},
"result_count": {
"type": "long"
},
"click_through_rate": {
"type": "float"
},
"last_searched": {
"type": "date"
}
}
}
}
class SearchQueryTracker:
"""
Tracks and indexes popular search queries for suggestions.
"""
def __init__(self, es_client):
self.es = es_client
self.index = "search_queries"
async def record_search(
self,
query: str,
result_count: int,
clicked: bool
):
"""Record a search for analytics and suggestions."""
query_normalized = query.lower().strip()
doc_id = self._hash_query(query_normalized)
# Upsert: increment count or create new
await self.es.update(
index=self.index,
id=doc_id,
body={
"script": {
"source": """
ctx._source.search_count += 1;
ctx._source.last_searched = params.now;
if (params.clicked) {
ctx._source.click_count += 1;
}
ctx._source.click_through_rate =
ctx._source.click_count / ctx._source.search_count;
""",
"params": {
"now": "now",
"clicked": clicked
}
},
"upsert": {
"query": query_normalized,
"search_count": 1,
"click_count": 1 if clicked else 0,
"result_count": result_count,
"click_through_rate": 1.0 if clicked else 0.0,
"last_searched": "now"
}
},
retry_on_conflict=3
)
def _hash_query(self, query: str) -> str:
"""Create consistent ID for query."""
import hashlib
return hashlib.md5(query.encode()).hexdigest()
1.4 Completion Suggester (Alternative)
# autocomplete/completion_suggester.py
"""
Elasticsearch Completion Suggester - an alternative to edge n-grams.
Pros:
- Extremely fast (in-memory FST data structure)
- Built-in fuzzy matching
- Supports contexts (categories, etc.)
Cons:
- Requires separate field type
- Less flexible ranking
- Higher memory usage
"""
COMPLETION_MAPPING = {
"mappings": {
"properties": {
"suggest": {
"type": "completion",
"analyzer": "simple",
"preserve_separators": True,
"preserve_position_increments": True,
"max_input_length": 50,
"contexts": [
{
"name": "category",
"type": "category"
}
]
},
"name": {"type": "text"},
"popularity": {"type": "long"}
}
}
}
class CompletionSuggester:
"""
Autocomplete using Elasticsearch's completion suggester.
"""
def __init__(self, es_client, index: str):
self.es = es_client
self.index = index
async def index_product(self, product: dict):
"""Index product with completion field."""
# Generate input variants for completion
inputs = self._generate_inputs(product)
doc = {
"suggest": {
"input": inputs,
"weight": product.get("popularity", 1),
"contexts": {
"category": product.get("category", "general")
}
},
"name": product["name"],
"popularity": product.get("popularity", 1)
}
await self.es.index(
index=self.index,
id=product["id"],
document=doc
)
def _generate_inputs(self, product: dict) -> list:
"""Generate completion inputs from product."""
inputs = []
name = product["name"]
brand = product.get("brand", "")
# Full name
inputs.append(name)
# Brand + name
if brand:
inputs.append(f"{brand} {name}")
# Individual words (for any-order matching)
words = name.split()
for word in words:
if len(word) > 2:
inputs.append(word)
return inputs
async def suggest(
self,
prefix: str,
limit: int = 10,
category: str = None,
fuzzy: bool = True
) -> list:
"""Get completion suggestions."""
suggest_query = {
"prefix": prefix,
"completion": {
"field": "suggest",
"size": limit,
"skip_duplicates": True
}
}
# Add fuzzy matching
if fuzzy:
suggest_query["completion"]["fuzzy"] = {
"fuzziness": "AUTO"
}
# Filter by category
if category:
suggest_query["completion"]["contexts"] = {
"category": category
}
response = await self.es.search(
index=self.index,
body={
"suggest": {
"product-suggest": suggest_query
}
}
)
suggestions = []
for option in response["suggest"]["product-suggest"][0]["options"]:
suggestions.append({
"text": option["text"],
"score": option["_score"],
"product_id": option["_id"]
})
return suggestions
Part II: Spelling Correction
Chapter 2: "Did You Mean?"
2.1 Spelling Correction Strategies
SPELLING CORRECTION APPROACHES
1. TERM-LEVEL SUGGESTIONS
Input: "runnign shoes"
Suggest: "running shoes"
How: Each term checked individually
2. PHRASE-LEVEL SUGGESTIONS
Input: "running sheos"
Suggest: "running shoes"
How: Context-aware, considers word combinations
3. INDEX-BASED CORRECTION
Only suggest terms that exist in index
"runnign" → "running" (if "running" in index)
Won't suggest random dictionary words
4. PHONETIC MATCHING
"fone" → "phone" (sounds similar)
Uses algorithms like Soundex, Metaphone
2.2 Implementation
# spelling/did_you_mean.py
"""
Spelling correction and "Did you mean" suggestions.
"""
from typing import List, Optional, Tuple
from dataclasses import dataclass
@dataclass
class SpellingCorrection:
"""A spelling correction suggestion."""
original: str
corrected: str
confidence: float
highlighted: str # With correction markers
class SpellingService:
"""
Provides spelling corrections for search queries.
"""
def __init__(self, es_client, index: str):
self.es = es_client
self.index = index
async def get_correction(
self,
query: str,
result_count: int
) -> Optional[SpellingCorrection]:
"""
Get spelling correction if query seems misspelled.
Heuristic: Suggest correction if:
1. Original query has few/no results
2. Corrected query would have significantly more results
"""
# Get phrase suggestion
suggestion = await self._get_phrase_suggestion(query)
if not suggestion or suggestion == query:
return None
# Check if suggestion has more results
suggestion_count = await self._count_results(suggestion)
# Only suggest if correction has significantly more results
if suggestion_count > result_count * 2 and suggestion_count > 10:
return SpellingCorrection(
original=query,
corrected=suggestion,
confidence=min(0.95, suggestion_count / (suggestion_count + result_count)),
highlighted=self._highlight_changes(query, suggestion)
)
return None
async def _get_phrase_suggestion(self, query: str) -> Optional[str]:
"""Get phrase-level spelling suggestion."""
response = await self.es.search(
index=self.index,
body={
"suggest": {
"text": query,
"phrase_suggestion": {
"phrase": {
"field": "name.trigram",
"size": 1,
"gram_size": 3,
"direct_generator": [
{
"field": "name.trigram",
"suggest_mode": "popular",
"min_word_length": 3
}
],
"collate": {
# Only suggest if results exist
"query": {
"source": {
"match": {
"name": "{{suggestion}}"
}
}
},
"prune": True
}
}
}
},
"size": 0
}
)
suggestions = response.get("suggest", {}).get("phrase_suggestion", [])
if suggestions and suggestions[0].get("options"):
return suggestions[0]["options"][0]["text"]
return None
async def _get_term_suggestions(self, query: str) -> List[Tuple[str, str]]:
"""Get term-level suggestions for each word."""
response = await self.es.search(
index=self.index,
body={
"suggest": {
"text": query,
"term_suggestion": {
"term": {
"field": "name",
"suggest_mode": "popular",
"sort": "frequency",
"string_distance": "jaro_winkler"
}
}
},
"size": 0
}
)
corrections = []
for suggestion in response.get("suggest", {}).get("term_suggestion", []):
original = suggestion["text"]
if suggestion.get("options"):
corrected = suggestion["options"][0]["text"]
if original != corrected:
corrections.append((original, corrected))
return corrections
async def _count_results(self, query: str) -> int:
"""Count results for a query."""
response = await self.es.count(
index=self.index,
body={
"query": {
"match": {
"name": query
}
}
}
)
return response["count"]
def _highlight_changes(self, original: str, corrected: str) -> str:
"""Highlight the changes in correction."""
original_words = original.lower().split()
corrected_words = corrected.lower().split()
highlighted = []
for i, word in enumerate(corrected_words):
if i >= len(original_words) or word != original_words[i]:
highlighted.append(f"<em>{word}</em>")
else:
highlighted.append(word)
return " ".join(highlighted)
# =============================================================================
# Trigram Field Mapping (for phrase suggester)
# =============================================================================
TRIGRAM_MAPPING = {
"settings": {
"analysis": {
"filter": {
"trigram_filter": {
"type": "ngram",
"min_gram": 3,
"max_gram": 3
}
},
"analyzer": {
"trigram_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"trigram_filter"
]
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"trigram": {
"type": "text",
"analyzer": "trigram_analyzer"
}
}
}
}
}
}
Part III: Faceted Search
Chapter 3: Dynamic Filtering
3.1 Facets and Aggregations
FACETED SEARCH CONCEPT
User searches: "running shoes"
Results: 2,847 products
FACETS (side panel):
┌─────────────────────────────────┐
│ CATEGORY │
│ Running Shoes (1,234) │
│ Trail Running (456) │
│ Track & Field (123) │
│ │
│ BRAND │
│ Nike (892) │
│ Adidas (567) │
│ New Balance (234) │
│ Brooks (189) │
│ ASICS (145) │
│ │
│ PRICE │
│ Under $50 (234) │
│ $50 - $100 (678) │
│ $100 - $150 (1,234) │
│ Over $150 (701) │
│ │
│ COLOR │
│ Black (1,456) │
│ White (987) │
│ Red (345) │
│ │
│ RATING │
│ ★★★★★ & up (567) │
│ ★★★★☆ & up (1,234) │
│ ★★★☆☆ & up (2,100) │
└─────────────────────────────────┘
Key insight:
Facet counts UPDATE when user applies filters.
If user selects "Nike", other brand counts disappear,
but Nike sub-categories appear.
3.2 Implementation
# facets/aggregations.py
"""
Faceted search with dynamic aggregations.
"""
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
@dataclass
class FacetConfig:
"""Configuration for a facet."""
name: str
field: str
type: str # terms, range, histogram
size: int = 20
ranges: List[dict] = None # For range facets
class FacetedSearchService:
"""
Provides faceted search with dynamic filter updates.
"""
# Standard facet configurations
FACETS = [
FacetConfig(
name="categories",
field="category",
type="terms",
size=20
),
FacetConfig(
name="brands",
field="brand.keyword",
type="terms",
size=30
),
FacetConfig(
name="price_ranges",
field="price",
type="range",
ranges=[
{"key": "under_50", "to": 50},
{"key": "50_100", "from": 50, "to": 100},
{"key": "100_150", "from": 100, "to": 150},
{"key": "150_200", "from": 150, "to": 200},
{"key": "over_200", "from": 200}
]
),
FacetConfig(
name="colors",
field="colors",
type="terms",
size=15
),
FacetConfig(
name="sizes",
field="sizes",
type="terms",
size=20
),
FacetConfig(
name="ratings",
field="rating",
type="range",
ranges=[
{"key": "4_up", "from": 4},
{"key": "3_up", "from": 3},
{"key": "2_up", "from": 2}
]
)
]
def __init__(self, es_client, index: str):
self.es = es_client
self.index = index
async def search_with_facets(
self,
query: str,
filters: Dict[str, Any],
page: int = 1,
page_size: int = 20
) -> dict:
"""
Search with facets that update based on filters.
"""
# Build base query
es_query = self._build_query(query, filters)
# Add aggregations for facets
es_query["aggs"] = self._build_aggregations(filters)
# Pagination
es_query["from"] = (page - 1) * page_size
es_query["size"] = page_size
# Execute
response = await self.es.search(
index=self.index,
body=es_query
)
# Process results
return {
"products": self._process_hits(response["hits"]["hits"]),
"total": response["hits"]["total"]["value"],
"facets": self._process_aggregations(response["aggregations"]),
"applied_filters": filters
}
def _build_query(
self,
query: str,
filters: Dict[str, Any]
) -> dict:
"""Build search query with filters."""
must = []
filter_clauses = []
# Text search
if query:
must.append({
"multi_match": {
"query": query,
"fields": ["name^3", "brand^2", "description"],
"type": "best_fields"
}
})
else:
must.append({"match_all": {}})
# Apply filters
filter_clauses.extend(self._build_filters(filters))
# Always in stock
filter_clauses.append({"term": {"in_stock": True}})
return {
"query": {
"bool": {
"must": must,
"filter": filter_clauses
}
}
}
def _build_filters(self, filters: Dict[str, Any]) -> list:
"""Build filter clauses from user selections."""
clauses = []
if filters.get("category"):
clauses.append({"term": {"category": filters["category"]}})
if filters.get("brands"):
brands = filters["brands"]
if isinstance(brands, list):
clauses.append({"terms": {"brand.keyword": brands}})
else:
clauses.append({"term": {"brand.keyword": brands}})
if filters.get("price_min") is not None or filters.get("price_max") is not None:
price_range = {}
if filters.get("price_min") is not None:
price_range["gte"] = filters["price_min"]
if filters.get("price_max") is not None:
price_range["lte"] = filters["price_max"]
clauses.append({"range": {"price": price_range}})
if filters.get("colors"):
colors = filters["colors"]
if isinstance(colors, list):
clauses.append({"terms": {"colors": colors}})
else:
clauses.append({"term": {"colors": colors}})
if filters.get("rating_min"):
clauses.append({
"range": {"rating": {"gte": filters["rating_min"]}}
})
return clauses
def _build_aggregations(self, filters: Dict[str, Any]) -> dict:
"""
Build aggregations for facets.
Key insight: Each facet should show counts as if
its own filter wasn't applied (for multi-select).
"""
aggs = {}
for facet in self.FACETS:
# Build aggregation
if facet.type == "terms":
agg = {
"terms": {
"field": facet.field,
"size": facet.size
}
}
elif facet.type == "range":
agg = {
"range": {
"field": facet.field,
"ranges": facet.ranges
}
}
else:
continue
# For multi-select facets, exclude own filter
# This allows selecting multiple brands, etc.
own_filter = self._get_filter_for_facet(facet.name, filters)
other_filters = self._get_filters_except(facet.name, filters)
if other_filters:
# Wrap in filter aggregation
aggs[facet.name] = {
"filter": {"bool": {"filter": other_filters}},
"aggs": {
facet.name: agg
}
}
else:
aggs[facet.name] = agg
return aggs
def _get_filter_for_facet(
self,
facet_name: str,
filters: Dict[str, Any]
) -> Optional[dict]:
"""Get the filter clause for a specific facet."""
filter_mapping = {
"categories": "category",
"brands": "brands",
"colors": "colors",
"ratings": "rating_min"
}
filter_key = filter_mapping.get(facet_name)
if filter_key and filters.get(filter_key):
return self._build_filters({filter_key: filters[filter_key]})[0]
return None
def _get_filters_except(
self,
facet_name: str,
filters: Dict[str, Any]
) -> list:
"""Get all filters except for a specific facet."""
filter_mapping = {
"categories": "category",
"brands": "brands",
"colors": "colors",
"ratings": "rating_min"
}
exclude_key = filter_mapping.get(facet_name)
filtered = {
k: v for k, v in filters.items()
if k != exclude_key
}
result = self._build_filters(filtered)
result.append({"term": {"in_stock": True}})
return result
def _process_aggregations(self, aggs: dict) -> Dict[str, List[dict]]:
"""Process aggregations into facet format."""
facets = {}
for facet in self.FACETS:
agg_data = aggs.get(facet.name, {})
# Handle nested aggregation (when filter wrapper used)
if facet.name in agg_data:
agg_data = agg_data[facet.name]
buckets = agg_data.get("buckets", [])
facets[facet.name] = [
{
"value": bucket["key"],
"count": bucket["doc_count"]
}
for bucket in buckets
if bucket["doc_count"] > 0
]
return facets
def _process_hits(self, hits: list) -> list:
"""Process search hits."""
return [
{
"product_id": hit["_id"],
"score": hit.get("_score"),
**hit["_source"]
}
for hit in hits
]
# =============================================================================
# Hierarchical Facets
# =============================================================================
class HierarchicalFacetService:
"""
Handles hierarchical facets like category trees.
Electronics
├── Phones
│ ├── Smartphones
│ └── Feature Phones
└── Computers
├── Laptops
└── Desktops
"""
def __init__(self, es_client, index: str):
self.es = es_client
self.index = index
async def get_category_facets(
self,
query: str,
selected_category: str = None
) -> dict:
"""
Get hierarchical category facets.
If no category selected: Show top-level categories
If category selected: Show subcategories
"""
# Determine which level to aggregate
if selected_category:
# Show children of selected category
field = "category_path"
prefix = selected_category
else:
# Show top-level categories
field = "category_level_1"
prefix = None
query_body = {
"query": {
"bool": {
"must": [
{"match": {"name": query}} if query else {"match_all": {}}
],
"filter": [
{"term": {"in_stock": True}}
]
}
},
"aggs": {
"categories": {
"terms": {
"field": field,
"size": 50
}
}
},
"size": 0
}
# Add category filter if selected
if selected_category:
query_body["query"]["bool"]["filter"].append({
"prefix": {"category_path": selected_category}
})
response = await self.es.search(
index=self.index,
body=query_body
)
buckets = response["aggregations"]["categories"]["buckets"]
return {
"parent": selected_category,
"categories": [
{"name": b["key"], "count": b["doc_count"]}
for b in buckets
]
}
Part IV: Synonyms and Query Expansion
Chapter 4: Understanding User Intent
4.1 Synonym Strategies
SYNONYM TYPES
1. EXPLICIT SYNONYMS
tv, television → television
couch, sofa, settee → sofa
2. ONE-WAY SYNONYMS
ipod => ipod, mp3 player
(searching "ipod" also finds "mp3 player", but not reverse)
3. MULTI-WORD SYNONYMS
usa, united states, united states of america
4. BRAND SYNONYMS
kleenex => kleenex, tissue
WHEN TO APPLY:
INDEX TIME:
├── Synonyms expanded when document indexed
├── Smaller index (one token covers all synonyms)
├── Requires reindex to change synonyms
└── Use for: Stable, well-known synonyms
QUERY TIME:
├── Synonyms expanded when searching
├── Larger queries (more terms)
├── Can change without reindex
└── Use for: Frequently changing synonyms
4.2 Implementation
# synonyms/synonym_service.py
"""
Synonym management and query expansion.
"""
from typing import List, Dict, Set
class SynonymManager:
"""
Manages synonyms for search.
"""
# Core synonym groups
SYNONYM_GROUPS = [
# Products
["tv", "television", "telly"],
["couch", "sofa", "settee", "loveseat"],
["laptop", "notebook", "notebook computer"],
["phone", "mobile", "cell phone", "smartphone", "mobile phone"],
["fridge", "refrigerator"],
# Clothing
["pants", "trousers", "slacks"],
["sneakers", "trainers", "tennis shoes", "athletic shoes"],
["hoodie", "hooded sweatshirt"],
["tee", "t-shirt", "tshirt"],
# Colors
["grey", "gray"],
["colour", "color"],
# Geographic
["usa", "united states", "america"],
["uk", "united kingdom", "britain", "great britain"],
]
# One-way expansions (brand → generic)
ONE_WAY_SYNONYMS = {
"kleenex": ["tissue", "facial tissue"],
"xerox": ["copy", "photocopy"],
"jacuzzi": ["hot tub", "whirlpool"],
"ipad": ["tablet"],
"ipod": ["mp3 player"],
"velcro": ["hook and loop"],
}
def generate_synonym_file(self) -> str:
"""
Generate Elasticsearch synonym file format.
Format: term1, term2, term3 => normalized_term
Or: term1, term2, term3 (all equivalent)
"""
lines = []
# Bidirectional synonyms
for group in self.SYNONYM_GROUPS:
lines.append(", ".join(group))
# One-way synonyms
for term, expansions in self.ONE_WAY_SYNONYMS.items():
all_terms = [term] + expansions
lines.append(f"{term} => {', '.join(all_terms)}")
return "\n".join(lines)
def get_elasticsearch_settings(self) -> dict:
"""Get Elasticsearch settings with synonyms."""
return {
"analysis": {
"filter": {
"synonym_filter": {
"type": "synonym",
"synonyms": [
line for line in self.generate_synonym_file().split("\n")
if line.strip()
]
},
"synonym_graph_filter": {
"type": "synonym_graph",
"synonyms": [
line for line in self.generate_synonym_file().split("\n")
if line.strip()
]
}
},
"analyzer": {
# For index time (standard synonyms)
"synonym_analyzer": {
"tokenizer": "standard",
"filter": [
"lowercase",
"synonym_filter"
]
},
# For query time (synonym graph for multi-word)
"search_synonym_analyzer": {
"tokenizer": "standard",
"filter": [
"lowercase",
"synonym_graph_filter"
]
}
}
}
}
# =============================================================================
# Dynamic Synonym Updates
# =============================================================================
class DynamicSynonymService:
"""
Manages synonyms that can be updated without reindex.
Uses Elasticsearch's reload feature with file-based synonyms.
"""
def __init__(self, es_client, synonym_path: str):
self.es = es_client
self.synonym_path = synonym_path
async def add_synonym_group(self, terms: List[str]):
"""Add a new synonym group."""
# Read current file
with open(self.synonym_path, "r") as f:
current = f.read()
# Add new group
new_line = ", ".join(terms)
updated = current + "\n" + new_line
# Write back
with open(self.synonym_path, "w") as f:
f.write(updated)
# Reload analyzers
await self._reload_analyzers()
async def _reload_analyzers(self):
"""Reload search analyzers to pick up synonym changes."""
# Close index
await self.es.indices.close(index="products")
# Update settings
await self.es.indices.put_settings(
index="products",
body={
"analysis": {
"filter": {
"synonym_filter": {
"type": "synonym",
"synonyms_path": self.synonym_path,
"updateable": True
}
}
}
}
)
# Reopen index
await self.es.indices.open(index="products")
# =============================================================================
# Query Expansion
# =============================================================================
class QueryExpander:
"""
Expands queries with related terms.
Goes beyond synonyms to include:
- Related categories
- Common co-purchases
- Semantic similarity
"""
def __init__(self, es_client, index: str):
self.es = es_client
self.index = index
# Simple related terms (in production, use ML)
self.related_terms = {
"laptop": ["laptop bag", "laptop stand", "laptop charger"],
"camera": ["camera lens", "camera bag", "memory card"],
"running shoes": ["running socks", "running shorts", "fitness tracker"],
}
async def expand_query(self, query: str) -> dict:
"""
Expand query with related terms.
Returns expanded query and suggestions.
"""
# Get related terms
related = self._get_related_terms(query)
# Build expanded query
expanded_query = {
"bool": {
"should": [
{
"multi_match": {
"query": query,
"fields": ["name^3", "description"],
"boost": 2.0 # Original query boosted
}
}
]
}
}
# Add related terms with lower boost
for term in related[:3]: # Limit to top 3
expanded_query["bool"]["should"].append({
"multi_match": {
"query": term,
"fields": ["name^2", "description"],
"boost": 0.5
}
})
return {
"query": expanded_query,
"original": query,
"expanded_with": related[:3]
}
def _get_related_terms(self, query: str) -> List[str]:
"""Get related terms for query."""
query_lower = query.lower()
# Check direct match
if query_lower in self.related_terms:
return self.related_terms[query_lower]
# Check partial match
for key, terms in self.related_terms.items():
if key in query_lower or query_lower in key:
return terms
return []
Part V: Multi-Language Search
Chapter 5: Internationalization
5.1 Multi-Language Strategies
MULTI-LANGUAGE SEARCH STRATEGIES
STRATEGY 1: SEPARATE INDEXES
├── products_en
├── products_es
├── products_fr
├── products_de
└── Query routes to correct index based on user locale
STRATEGY 2: MULTI-FIELD PER LANGUAGE
{
"name": "Running Shoes",
"name_en": "Running Shoes",
"name_es": "Zapatillas para correr",
"name_fr": "Chaussures de course"
}
└── Query searches appropriate field(s)
STRATEGY 3: LANGUAGE DETECTION
├── Auto-detect query language
├── Search appropriate fields
└── Fallback to all languages
TRADE-OFFS:
Separate Indexes:
├── + Clean separation
├── + Language-specific analysis
├── - Index management overhead
└── - Cross-language search harder
Multi-Field:
├── + Single index
├── + Cross-language search easy
├── - Larger index
└── - Analysis configuration complex
5.2 Implementation
# multilang/multi_language_search.py
"""
Multi-language search support.
"""
from typing import List, Optional, Dict
from enum import Enum
class Language(Enum):
EN = "en"
ES = "es"
FR = "fr"
DE = "de"
IT = "it"
PT = "pt"
JA = "ja"
ZH = "zh"
class MultiLanguageSearch:
"""
Multi-language search with language-specific analysis.
"""
# Language-specific analyzers
LANGUAGE_ANALYZERS = {
Language.EN: "english",
Language.ES: "spanish",
Language.FR: "french",
Language.DE: "german",
Language.IT: "italian",
Language.PT: "portuguese",
Language.JA: "kuromoji", # Japanese
Language.ZH: "smartcn", # Chinese
}
def __init__(self, es_client, index: str):
self.es = es_client
self.index = index
def get_mapping(self) -> dict:
"""Get mapping with multi-language fields."""
# Base properties
properties = {
"product_id": {"type": "keyword"},
"price": {"type": "float"},
"category": {"type": "keyword"},
}
# Add language-specific fields for text content
text_fields = ["name", "description"]
for field in text_fields:
properties[field] = {
"type": "text",
"analyzer": "standard",
"fields": {}
}
# Add language-specific subfields
for lang in Language:
properties[field]["fields"][lang.value] = {
"type": "text",
"analyzer": self.LANGUAGE_ANALYZERS[lang]
}
return {"mappings": {"properties": properties}}
async def search(
self,
query: str,
language: Language,
fallback_languages: List[Language] = None
) -> dict:
"""
Search with language-aware analysis.
"""
# Primary language fields
primary_fields = [
f"name.{language.value}^3",
f"description.{language.value}"
]
# Fallback fields (lower boost)
fallback_fields = []
if fallback_languages:
for lang in fallback_languages:
fallback_fields.extend([
f"name.{lang.value}^1.5",
f"description.{lang.value}^0.5"
])
# Always include base fields as fallback
fallback_fields.extend([
"name^1",
"description^0.3"
])
# Build query
es_query = {
"query": {
"bool": {
"should": [
{
"multi_match": {
"query": query,
"fields": primary_fields,
"type": "best_fields",
"boost": 2.0
}
},
{
"multi_match": {
"query": query,
"fields": fallback_fields,
"type": "best_fields"
}
}
]
}
}
}
response = await self.es.search(
index=self.index,
body=es_query
)
return self._process_results(response, language)
def _process_results(self, response: dict, language: Language) -> dict:
"""Process results, selecting appropriate language content."""
products = []
for hit in response["hits"]["hits"]:
source = hit["_source"]
# Select language-specific content
product = {
"product_id": source.get("product_id"),
"name": self._get_localized(source, "name", language),
"description": self._get_localized(source, "description", language),
"price": source.get("price"),
"score": hit.get("_score")
}
products.append(product)
return {
"products": products,
"total": response["hits"]["total"]["value"],
"language": language.value
}
def _get_localized(
self,
source: dict,
field: str,
language: Language
) -> str:
"""Get localized content with fallback."""
# Try language-specific field
lang_field = f"{field}_{language.value}"
if lang_field in source and source[lang_field]:
return source[lang_field]
# Fall back to base field
return source.get(field, "")
# =============================================================================
# Language Detection
# =============================================================================
class LanguageDetector:
"""
Detects query language for automatic routing.
"""
# Simple word-based detection (use proper library in production)
LANGUAGE_INDICATORS = {
Language.ES: {"zapatos", "camisa", "rojo", "azul", "para", "de", "el", "la"},
Language.FR: {"chaussures", "chemise", "rouge", "bleu", "pour", "le", "la"},
Language.DE: {"schuhe", "hemd", "rot", "blau", "für", "der", "die", "das"},
}
def detect(self, query: str) -> Language:
"""Detect query language."""
words = set(query.lower().split())
# Check for language-specific words
for lang, indicators in self.LANGUAGE_INDICATORS.items():
if words & indicators: # Intersection
return lang
# Default to English
return Language.EN
Part VI: Search Personalization
Chapter 6: Tailoring Results to Users
6.1 Personalization Signals
PERSONALIZATION SIGNALS
USER BEHAVIOR:
├── Search history (what they've searched)
├── Click history (what they've clicked)
├── Purchase history (what they've bought)
├── Browse history (categories viewed)
└── Time patterns (when they shop)
USER PROFILE:
├── Demographics (age, gender if known)
├── Location (shipping address, local inventory)
├── Preferences (stated preferences)
└── Segments (high-value, new user, etc.)
CONTEXTUAL:
├── Device (mobile vs desktop)
├── Time of day
├── Season/events
└── Referral source
PERSONALIZATION APPROACHES:
1. RE-RANKING
├── Search normally
├── Re-rank based on user affinity
└── Fast, simple, explainable
2. QUERY MODIFICATION
├── Add user preferences to query
├── Boost user's preferred brands/categories
└── More aggressive personalization
3. RESULT BLENDING
├── Mix personalized with popular
├── Explore vs exploit balance
└── Prevents filter bubbles
6.2 Implementation
# personalization/search_personalization.py
"""
Search personalization based on user behavior.
"""
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set
from datetime import datetime, timedelta
from collections import Counter
@dataclass
class UserProfile:
"""User profile for personalization."""
user_id: str
# Behavior-based preferences
preferred_brands: List[str] = field(default_factory=list)
preferred_categories: List[str] = field(default_factory=list)
# Price sensitivity
avg_purchase_price: float = 0.0
price_range_min: float = 0.0
price_range_max: float = 1000.0
# Recent activity
recent_searches: List[str] = field(default_factory=list)
recent_views: List[str] = field(default_factory=list)
recent_purchases: List[str] = field(default_factory=list)
# Computed affinities (0-1 scores)
brand_affinities: Dict[str, float] = field(default_factory=dict)
category_affinities: Dict[str, float] = field(default_factory=dict)
class UserProfileBuilder:
"""
Builds user profiles from behavior data.
"""
def __init__(self, behavior_store):
self.behavior = behavior_store
async def build_profile(self, user_id: str) -> UserProfile:
"""Build user profile from behavior history."""
# Get recent behavior (last 30 days)
cutoff = datetime.utcnow() - timedelta(days=30)
searches = await self.behavior.get_searches(user_id, since=cutoff)
views = await self.behavior.get_views(user_id, since=cutoff)
purchases = await self.behavior.get_purchases(user_id, since=cutoff)
# Compute brand affinities
brand_counts = Counter()
for item in purchases + views: # Purchases weighted more
brand = item.get("brand")
if brand:
weight = 3 if item in purchases else 1
brand_counts[brand] += weight
total_brand = sum(brand_counts.values()) or 1
brand_affinities = {
brand: count / total_brand
for brand, count in brand_counts.most_common(10)
}
# Compute category affinities
category_counts = Counter()
for item in purchases + views:
category = item.get("category")
if category:
weight = 3 if item in purchases else 1
category_counts[category] += weight
total_category = sum(category_counts.values()) or 1
category_affinities = {
cat: count / total_category
for cat, count in category_counts.most_common(10)
}
# Compute price range
purchase_prices = [p.get("price", 0) for p in purchases if p.get("price")]
if purchase_prices:
avg_price = sum(purchase_prices) / len(purchase_prices)
price_min = min(purchase_prices) * 0.5
price_max = max(purchase_prices) * 1.5
else:
avg_price, price_min, price_max = 0, 0, 1000
return UserProfile(
user_id=user_id,
preferred_brands=list(brand_affinities.keys())[:5],
preferred_categories=list(category_affinities.keys())[:5],
avg_purchase_price=avg_price,
price_range_min=price_min,
price_range_max=price_max,
recent_searches=[s.get("query") for s in searches[:10]],
recent_views=[v.get("product_id") for v in views[:20]],
recent_purchases=[p.get("product_id") for p in purchases[:10]],
brand_affinities=brand_affinities,
category_affinities=category_affinities
)
class PersonalizedSearch:
"""
Personalized search based on user profile.
"""
def __init__(self, es_client, index: str, profile_service):
self.es = es_client
self.index = index
self.profiles = profile_service
async def search(
self,
query: str,
user_id: Optional[str],
filters: dict = None,
personalization_strength: float = 0.5
) -> dict:
"""
Execute personalized search.
Args:
query: Search query
user_id: User ID (None for anonymous)
filters: Explicit filters
personalization_strength: 0-1, how much to personalize
"""
# Get user profile
profile = None
if user_id:
profile = await self.profiles.get_profile(user_id)
# Build query
es_query = self._build_personalized_query(
query,
filters or {},
profile,
personalization_strength
)
# Execute
response = await self.es.search(
index=self.index,
body=es_query
)
return self._process_results(response, profile)
def _build_personalized_query(
self,
query: str,
filters: dict,
profile: Optional[UserProfile],
strength: float
) -> dict:
"""Build query with personalization."""
# Base query
must = {
"multi_match": {
"query": query,
"fields": ["name^3", "brand^2", "description"],
"type": "best_fields"
}
}
# Filters
filter_clauses = self._build_filters(filters)
filter_clauses.append({"term": {"in_stock": True}})
# Personalization boosts
functions = [
# Base popularity
{
"field_value_factor": {
"field": "popularity_score",
"modifier": "log1p",
"missing": 1
},
"weight": 1
}
]
if profile and strength > 0:
# Boost preferred brands
for brand, affinity in profile.brand_affinities.items():
functions.append({
"filter": {"term": {"brand.keyword": brand}},
"weight": 1 + (affinity * strength * 2)
})
# Boost preferred categories
for category, affinity in profile.category_affinities.items():
functions.append({
"filter": {"term": {"category": category}},
"weight": 1 + (affinity * strength * 1.5)
})
# Boost items in user's price range
if profile.avg_purchase_price > 0:
functions.append({
"filter": {
"range": {
"price": {
"gte": profile.price_range_min,
"lte": profile.price_range_max
}
}
},
"weight": 1 + (strength * 0.5)
})
# Slightly demote recently viewed (avoid repetition)
if profile.recent_views:
functions.append({
"filter": {
"ids": {"values": profile.recent_views[:10]}
},
"weight": 0.8 # Slight demotion
})
return {
"query": {
"function_score": {
"query": {
"bool": {
"must": [must],
"filter": filter_clauses
}
},
"functions": functions,
"score_mode": "sum",
"boost_mode": "multiply"
}
}
}
def _build_filters(self, filters: dict) -> list:
"""Build filter clauses."""
clauses = []
if filters.get("category"):
clauses.append({"term": {"category": filters["category"]}})
if filters.get("brand"):
clauses.append({"term": {"brand.keyword": filters["brand"]}})
if filters.get("price_min") or filters.get("price_max"):
price = {}
if filters.get("price_min"):
price["gte"] = filters["price_min"]
if filters.get("price_max"):
price["lte"] = filters["price_max"]
clauses.append({"range": {"price": price}})
return clauses
def _process_results(
self,
response: dict,
profile: Optional[UserProfile]
) -> dict:
"""Process results with personalization context."""
products = [
{
"product_id": hit["_id"],
"score": hit.get("_score"),
**hit["_source"]
}
for hit in response["hits"]["hits"]
]
result = {
"products": products,
"total": response["hits"]["total"]["value"],
"personalized": profile is not None
}
# Add personalization explanation (for debugging)
if profile:
result["personalization_context"] = {
"preferred_brands": profile.preferred_brands[:3],
"preferred_categories": profile.preferred_categories[:3],
"price_range": {
"min": profile.price_range_min,
"max": profile.price_range_max
}
}
return result
Summary
What We Learned Today
DAY 4 SUMMARY: ADVANCED SEARCH FEATURES
AUTOCOMPLETE
├── Edge n-grams: Pre-compute prefixes at index time
├── Completion suggester: In-memory FST for speed
├── Popularity ranking: Popular terms first
└── Target: < 50ms response time
SPELLING CORRECTION
├── Term-level: Individual word corrections
├── Phrase-level: Context-aware corrections
├── Collate: Only suggest if results exist
└── Show "Did you mean" when few results
FACETED SEARCH
├── Aggregations generate facet counts
├── Filter context for cacheability
├── Multi-select: Exclude own filter from counts
└── Hierarchical: Category trees
SYNONYMS
├── Bidirectional: tv, television (equivalent)
├── One-way: ipod => ipod, mp3 player
├── Index vs query time trade-offs
└── Dynamic updates with analyzer reload
MULTI-LANGUAGE
├── Language-specific analyzers
├── Field-per-language or index-per-language
├── Language detection for routing
└── Fallback to base language
PERSONALIZATION
├── User profile from behavior
├── Brand/category affinities
├── Price range preferences
├── Function score boosts
└── Balance personalized vs popular
Key Takeaways
ADVANCED FEATURES KEY TAKEAWAYS
1. AUTOCOMPLETE IS CRITICAL
Every keystroke = user expectation
Edge n-grams trade storage for speed
2. DON'T RETURN "NO RESULTS"
Spelling correction, fuzzy matching
Progressive relaxation
3. FACETS ENABLE DISCOVERY
Users don't always know what they want
Let them browse and filter
4. SYNONYMS BRIDGE VOCABULARY GAP
User says "couch", you indexed "sofa"
Must match both ways
5. PERSONALIZATION WITH CARE
Balance relevance vs discovery
Avoid filter bubbles
Let users override
DEFAULT APPROACH:
Start with edge n-grams autocomplete
Add phrase suggester for spelling
Build facets from aggregations
Add synonyms for known vocabulary
Personalize based on purchase history
Interview Tip
WHEN ASKED "HOW WOULD YOU BUILD AUTOCOMPLETE?"
"For autocomplete, I'd use edge n-grams to pre-compute all
prefixes at index time. 'nike' becomes ['n', 'ni', 'nik', 'nike'].
This means at query time, 'ni' is an exact token match, not
a prefix scan. Response times are consistently fast.
For ranking suggestions, I'd use a function_score combining:
- Text relevance (how well the prefix matches)
- Popularity (frequently searched/clicked items first)
- Personalization (user's brand preferences)
For very high traffic, I'd also consider:
- Redis cache for hot prefixes
- Completion suggester for in-memory FST
- Separate suggestions index for popular queries
The key metric is p99 latency under 50ms - users expect
instant feedback as they type."
This shows you understand both implementation and UX.
Tomorrow's Preview
Day 5: Search Operations & Scale — "Running search in production"
We'll cover:
- Cluster architecture and sizing
- Index lifecycle management
- Handling traffic spikes (Black Friday)
- Monitoring and alerting
- Disaster recovery
- Performance tuning
PREVIEW: BLACK FRIDAY SCENARIO
Normal traffic: 10K queries/sec
Black Friday: 50K queries/sec (5x spike)
Duration: 24 hours
Your cluster:
├── 6 data nodes
├── 50M products
├── 500GB index
Questions:
├── Can you handle 5x traffic?
├── What breaks first?
├── How do you prepare?
├── What's your rollback plan?
End of Week 7, Day 4
Tomorrow: Day 5 — Search Operations & Scale: Running search in production