Week 7 Capstone: Designing a Legal Document Search System
A Real-World Problem Covering Everything You've Learned
The Interview Begins
You walk into the interview room at a legal technology company. The interviewer, a Staff Engineer, smiles and gestures to the whiteboard.
Interviewer: "Thanks for coming in. Today we're going to work through a system design problem that's core to our business. I'm interested in your thought process, so please think out loud. Feel free to ask questions — this is meant to be collaborative."
They write on the whiteboard:
╔══════════════════════════════════════════════════════════════════════════╗
║ ║
║ Design a Legal Document Search System ║
║ ║
║ Context: ║
║ You're building search infrastructure for a legal research platform ║
║ used by law firms, corporate legal departments, and courts. ║
║ Lawyers need to find relevant case law, contracts, and legal ║
║ documents quickly and accurately. ║
║ ║
║ Key Requirements: ║
║ 1. Search across 50M+ legal documents (cases, statutes, contracts) ║
║ 2. Support complex boolean queries and proximity search ║
║ 3. Citation tracking and cross-referencing ║
║ 4. Jurisdiction and date filtering ║
║ 5. Document upload and OCR processing ║
║ 6. Relevance tuned for legal terminology ║
║ ║
╚══════════════════════════════════════════════════════════════════════════╝
Interviewer: "Take a few minutes to think about this, then walk me through your approach. We have about 45 minutes."
Phase 1: Requirements Clarification (5 minutes)
Before diving in, you take a breath and start asking questions.
Your Questions
You: "Before I start designing, I'd like to clarify a few requirements. First, what types of legal documents are we indexing?"
Interviewer: "Good question. We have several document types:
- Court opinions and case law (federal and state courts)
- Statutes and regulations
- Legal briefs and motions
- Contracts (uploaded by clients)
- Legal journals and secondary sources
- Patent filings"
You: "What's the average document size? Legal documents can be quite lengthy."
Interviewer: "They vary significantly. Court opinions average 15-20 pages, but some are 200+ pages. Contracts are typically 20-50 pages. We also have multi-volume regulatory codes. Average document is about 10,000 words, but the 99th percentile is 100,000 words."
You: "For search, what are the primary use cases? Simple keyword search, or do lawyers need more advanced query capabilities?"
Interviewer: "Lawyers need sophisticated search:
- Boolean operators (AND, OR, NOT)
- Proximity search ('negligence' within 10 words of 'liability')
- Phrase search with exact matching
- Citation search (find all cases citing '410 U.S. 113')
- Field-specific search (judge:Ginsburg, court:Supreme Court)
- Date and jurisdiction filters"
You: "How fresh does the data need to be? When a new court opinion is published, how quickly should it be searchable?"
Interviewer: "For court opinions from major courts, within 1 hour of publication. For user-uploaded documents, within 5 minutes. We have partnerships with court electronic filing systems for real-time feeds."
You: "What about document security? I imagine some uploaded contracts are highly confidential."
Interviewer: "Absolutely critical. Public court documents are accessible to all subscribers. But client-uploaded documents must be strictly isolated — only that law firm's users can search them. We call these 'private workspaces'."
You: "Last question — what's our user base and expected query volume?"
Interviewer: "We have 50,000 law firm subscribers with about 200,000 individual lawyer accounts. During business hours, we see 5,000-10,000 searches per minute. Paralegals doing research can run hundreds of searches per hour."
You: "Perfect. Let me summarize the requirements."
Functional Requirements
1. DOCUMENT SEARCH
- Full-text search across all document content
- Boolean queries (AND, OR, NOT, parentheses)
- Proximity search (terms within N words)
- Phrase search with exact matching
- Wildcard and fuzzy matching
- Field-specific search (title, judge, court, parties)
2. FILTERING & FACETS
- Jurisdiction (federal, state, specific courts)
- Date range (decided date, filed date)
- Document type (opinion, statute, contract, brief)
- Practice area (criminal, corporate, IP, family)
- Judge/Author
- Citation count (highly cited cases)
3. CITATION NETWORK
- Find all documents citing a specific case
- Find all cases cited by a document
- Citation depth (cases citing cases that cite X)
- Negative citations (cases that overrule or distinguish)
4. PRIVATE WORKSPACES
- Law firms upload their own documents
- OCR processing for scanned documents
- Strict tenant isolation
- Combined search (public + private documents)
5. DOCUMENT PROCESSING
- PDF/Word/Image ingestion
- OCR for scanned documents
- Automatic metadata extraction
- Citation extraction and linking
6. USER FEATURES
- Save searches and set alerts
- Highlight and annotate documents
- Search history and recent documents
- Export and print formatting
Non-Functional Requirements
1. SCALE
- 50M+ public documents
- 10M+ private documents across tenants
- 200,000 registered users
- 10,000 searches/minute peak
2. LATENCY
- Simple search: <500ms p99
- Complex boolean: <2s p99
- Autocomplete: <100ms p99
- Document indexing: <5 minutes p99
3. AVAILABILITY
- 99.9% uptime (8.7 hours downtime/year)
- Graceful degradation during failures
- Read replicas for search continuity
4. DATA FRESHNESS
- Court opinions: <1 hour from publication
- User uploads: <5 minutes
- Citation links: <24 hours
5. SECURITY
- Strict tenant isolation for private docs
- Audit logging for all access
- Encryption at rest and in transit
- SOC 2 Type II compliance
6. ACCURACY
- Zero false positives in tenant isolation
- High precision for legal terminology
- Proper handling of legal citations
Phase 2: Back of the Envelope Estimation (5 minutes)
You: "Let me work through the numbers to understand the scale."
Document Volume
DOCUMENT COUNTS
Public documents:
Court opinions: 30M
Statutes & regulations: 5M
Legal journals: 10M
Patent filings: 5M
─────────────────────────
Total public: 50M documents
Private documents:
Average per law firm: 10,000 documents
Number of firms: 5,000
─────────────────────────
Total private: 50M documents (varies widely)
Growth:
New court opinions: 50,000/month
New user uploads: 500,000/month
Storage Estimation
DOCUMENT SIZE
Average document: 10,000 words ≈ 60KB text
Large documents (p99): 100,000 words ≈ 600KB text
Metadata per document: 2KB
Citation data: 1KB (average 20 citations × 50 bytes)
Total per document: ~65KB average
STORAGE CALCULATION
Raw text (100M docs):
100M × 65KB = 6.5 TB
Elasticsearch index:
With overhead (2x): 13 TB
With replicas (2x): 26 TB
Original files (PDFs):
100M × 500KB avg = 50 TB
Total storage: ~80 TB
Search Traffic
SEARCH QUERIES
Active users: 50,000 concurrent (peak)
Searches per user/hour: 20 (intensive research)
Peak searches: 1M/hour = 280/second
Query complexity:
Simple keyword: 40%
Boolean: 35%
Proximity: 15%
Citation search: 10%
Autocomplete:
5 keystrokes per search
Peak: 280 × 5 = 1,400/second
Key Metrics Summary
┌──────────────────────────────────────────────────────────────────────────┐
│ ESTIMATION SUMMARY │
│ │
│ DOCUMENTS │
│ ├── Total documents: 100M (50M public + 50M private) │
│ ├── New documents/month: 550K │
│ └── Average size: 65KB indexed content │
│ │
│ STORAGE │
│ ├── Elasticsearch: 26 TB (with replicas) │
│ ├── Original files: 50 TB │
│ └── Total: ~80 TB │
│ │
│ TRAFFIC │
│ ├── Peak searches: 300/second │
│ ├── Peak autocomplete: 1,500/second │
│ └── Indexing: ~200 docs/minute │
│ │
│ INFRASTRUCTURE (estimated) │
│ ├── ES data nodes: 12 × 64GB RAM, 2TB SSD │
│ ├── ES master nodes: 3 × 16GB RAM │
│ ├── ES coordinating: 4 × 32GB RAM │
│ └── Document processors: 6 × 16 CPU (OCR/extraction) │
│ │
└──────────────────────────────────────────────────────────────────────────┘
Phase 3: High-Level Design (10 minutes)
You: "Now let me sketch out the high-level architecture."
System Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ LEGAL DOCUMENT SEARCH ARCHITECTURE │
│ │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Web App │ │ Mobile App │ │ API Users │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └────────────────────┼────────────────────┘ │
│ ▼ │
│ ┌───────────────────────┐ │
│ │ API Gateway │ │
│ │ (Auth, Rate Limit) │ │
│ └───────────┬───────────┘ │
│ │ │
│ ┌──────────────────────┼──────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌──────────────────┐ │
│ │ Search │ │ Document │ │ Citation │ │
│ │ Service │ │ Service │ │ Service │ │
│ └──────┬──────┘ └──────┬──────┘ └────────┬─────────┘ │
│ │ │ │ │
│ │ ┌─────┴─────┐ │ │
│ │ ▼ ▼ │ │
│ │ ┌───────────┐ ┌──────────┐ │ │
│ │ │ OCR │ │ Metadata │ │ │
│ │ │ Pipeline │ │ Extractor│ │ │
│ │ └─────┬─────┘ └────┬─────┘ │ │
│ │ │ │ │ │
│ │ └─────┬──────┘ │ │
│ │ ▼ │ │
│ │ ┌────────────────┐ │ │
│ │ │ Kafka Topics │ │ │
│ │ │ (CDC Events) │ │ │
│ │ └────────┬───────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌────────────────┐ │ │
│ │ │ Indexing │ │ │
│ │ │ Pipeline │ │ │
│ │ └────────┬───────┘ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ ELASTICSEARCH CLUSTER │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Public Docs │ │Private Docs │ │ Citations │ │ │
│ │ │ Index │ │ Index │ │ Index │ │ │
│ │ │ (50M docs) │ │ (per tenant)│ │ (graph) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ PostgreSQL │ │ Redis │ │ S3 │ │
│ │ (metadata) │ │ (cache) │ │ (file store) │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Component Breakdown
You: "Let me walk through each component..."
1. Search Service
Purpose: Handles all search queries from users
Key responsibilities:
- Parse complex boolean and proximity queries
- Route to appropriate index (public, private, or both)
- Apply tenant isolation filters
- Aggregate results across indices
- Handle highlighting and snippets
Technology choice: Python/FastAPI for flexibility in query parsing
2. Document Service
Purpose: Manages document ingestion and processing
Key responsibilities:
- Accept document uploads (PDF, Word, images)
- Trigger OCR for scanned documents
- Extract metadata (dates, parties, judges)
- Detect and link citations
- Publish to Kafka for indexing
Technology choice: Python with Apache Tika for extraction
3. Citation Service
Purpose: Manages the citation network
Key responsibilities:
- Parse legal citations from text
- Build citation graph (who cites whom)
- Detect citation types (following, distinguishing, overruling)
- Provide citation depth queries
Technology choice: Specialized citation parser + Neo4j or ES for graph
4. Indexing Pipeline
Purpose: Streams documents into Elasticsearch
Key responsibilities:
- Consume from Kafka topics
- Transform documents for search
- Apply legal-specific text analysis
- Handle bulk indexing efficiently
- Maintain citation links
Data Flow
You: "Let me trace through a document upload flow..."
DOCUMENT UPLOAD FLOW
Step 1: User uploads contract PDF
Client ──▶ API Gateway ──▶ Document Service
Step 2: Store original file
Document Service ──▶ S3 (original PDF stored)
Step 3: Process document
Document Service ──▶ OCR Pipeline (if scanned)
──▶ Metadata Extractor
──▶ Citation Extractor
Step 4: Publish for indexing
Document Service ──▶ Kafka (document.created topic)
Step 5: Index in Elasticsearch
Indexing Pipeline ◀── Kafka
Indexing Pipeline ──▶ Elasticsearch (private_docs_tenant_123)
Step 6: Update citation graph
Citation Service ◀── Kafka
Citation Service ──▶ Elasticsearch (citations index)
Time: Upload to searchable ≈ 2-3 minutes
Phase 4: Deep Dives (20 minutes)
Interviewer: "Great high-level design. Let's dive deeper into a few areas. Tell me more about how you'd handle the complex boolean and proximity queries lawyers need."
Deep Dive 1: Legal Query Parser (Week 7, Day 3 — Query Processing)
You: "Legal search requires query capabilities beyond standard full-text search. Let me explain how I'd build the query parser."
The Problem
LEGAL QUERY EXAMPLES
Simple boolean:
"negligence AND liability"
"contract OR agreement"
"patent NOT software"
Complex boolean:
"(breach AND contract) OR (negligence AND tort)"
Proximity search:
"negligence /10 liability" (within 10 words)
"intellectual property /s patent" (/s = same sentence)
"employment /p discrimination" (/p = same paragraph)
Field-specific:
"judge:Ginsburg AND civil rights"
"court:SCOTUS AND date:[2020 TO 2024]"
Citation search:
"cites:410 U.S. 113" (find all citing this case)
"citedby:Roe v. Wade" (find all cases cited by)
Phrase search:
"reasonable person standard"
"beyond a reasonable doubt"
The Solution
QUERY PARSING PIPELINE
┌──────────────────────────────────────────────────────────────────────────┐
│ LEGAL QUERY PARSER │
│ │
│ Input: "(breach /5 contract) AND damages NOT punitive" │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Lexer │───▶│ Parser │───▶│ ES Query │ │
│ │ (tokenize) │ │ (AST) │ │ Builder │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Tokens: AST: ES Query: │
│ [LPAREN, TERM, BoolNode( bool: │
│ PROXIMITY, TERM, AND, must: │
│ RPAREN, AND, ProximityNode, - span_near │
│ TERM, NOT, TERM] TermNode, - match │
│ NotNode) must_not: │
│ - match │
│ │
└──────────────────────────────────────────────────────────────────────────┘
Implementation
# search/legal_query_parser.py
"""
Legal query parser supporting boolean and proximity search.
Applies: Week 7, Day 3 - Query Processing
"""
from dataclasses import dataclass
from typing import List, Optional, Union
from enum import Enum, auto
import re
class TokenType(Enum):
"""Token types in legal queries."""
TERM = auto()
PHRASE = auto()
AND = auto()
OR = auto()
NOT = auto()
LPAREN = auto()
RPAREN = auto()
PROXIMITY = auto() # /5, /10, /s, /p
FIELD = auto() # judge:, court:, date:
CITATION = auto() # cites:, citedby:
RANGE = auto() # [2020 TO 2024]
@dataclass
class Token:
"""A lexical token."""
type: TokenType
value: str
position: int
class LegalQueryLexer:
"""
Tokenizes legal search queries.
"""
KEYWORDS = {
'AND': TokenType.AND,
'OR': TokenType.OR,
'NOT': TokenType.NOT,
'AND NOT': TokenType.NOT,
}
FIELD_PREFIXES = ['judge:', 'court:', 'date:', 'party:', 'title:']
CITATION_PREFIXES = ['cites:', 'citedby:', 'citing:']
PROXIMITY_PATTERN = re.compile(r'/(\d+|s|p)')
def tokenize(self, query: str) -> List[Token]:
"""Tokenize a legal query string."""
tokens = []
position = 0
while position < len(query):
# Skip whitespace
if query[position].isspace():
position += 1
continue
# Parentheses
if query[position] == '(':
tokens.append(Token(TokenType.LPAREN, '(', position))
position += 1
continue
if query[position] == ')':
tokens.append(Token(TokenType.RPAREN, ')', position))
position += 1
continue
# Phrase (quoted string)
if query[position] == '"':
end = query.find('"', position + 1)
if end == -1:
end = len(query)
phrase = query[position+1:end]
tokens.append(Token(TokenType.PHRASE, phrase, position))
position = end + 1
continue
# Proximity operator
if query[position] == '/':
match = self.PROXIMITY_PATTERN.match(query[position:])
if match:
tokens.append(Token(TokenType.PROXIMITY, match.group(1), position))
position += len(match.group(0))
continue
# Range [2020 TO 2024]
if query[position] == '[':
end = query.find(']', position)
if end != -1:
range_expr = query[position:end+1]
tokens.append(Token(TokenType.RANGE, range_expr, position))
position = end + 1
continue
# Word or keyword
word_match = re.match(r'[\w\.:]+', query[position:])
if word_match:
word = word_match.group(0)
# Check for field prefix
for prefix in self.FIELD_PREFIXES:
if word.lower().startswith(prefix):
tokens.append(Token(TokenType.FIELD, word, position))
break
else:
# Check for citation prefix
for prefix in self.CITATION_PREFIXES:
if word.lower().startswith(prefix):
tokens.append(Token(TokenType.CITATION, word, position))
break
else:
# Check for keyword
upper = word.upper()
if upper in self.KEYWORDS:
tokens.append(Token(self.KEYWORDS[upper], word, position))
else:
tokens.append(Token(TokenType.TERM, word, position))
position += len(word)
continue
# Unknown character, skip
position += 1
return tokens
class LegalQueryBuilder:
"""
Builds Elasticsearch queries from parsed legal queries.
"""
def __init__(self):
self.lexer = LegalQueryLexer()
def build_query(
self,
query_string: str,
tenant_id: Optional[str] = None,
search_fields: List[str] = None
) -> dict:
"""
Build Elasticsearch query from legal query string.
Applies concepts:
- Query vs Filter context (Day 3)
- Multi-match for field boosting (Day 3)
- Span queries for proximity (Day 4)
"""
if not search_fields:
search_fields = ["content", "title^2", "summary^1.5"]
tokens = self.lexer.tokenize(query_string)
# Parse tokens into ES query
es_query = self._parse_expression(tokens, search_fields)
# Wrap with filters
final_query = {
"query": {
"bool": {
"must": [es_query],
"filter": []
}
}
}
# Add tenant isolation filter (critical for security!)
if tenant_id:
final_query["query"]["bool"]["filter"].append({
"term": {"tenant_id": tenant_id}
})
return final_query
def _parse_expression(
self,
tokens: List[Token],
fields: List[str]
) -> dict:
"""Parse tokens into ES query structure."""
if not tokens:
return {"match_all": {}}
# Simple case: single term
if len(tokens) == 1:
return self._token_to_query(tokens[0], fields)
# Handle boolean operators
must_clauses = []
should_clauses = []
must_not_clauses = []
i = 0
current_operator = TokenType.AND # Default is AND
while i < len(tokens):
token = tokens[i]
if token.type == TokenType.AND:
current_operator = TokenType.AND
i += 1
continue
if token.type == TokenType.OR:
current_operator = TokenType.OR
i += 1
continue
if token.type == TokenType.NOT:
# Next token goes to must_not
if i + 1 < len(tokens):
must_not_clauses.append(
self._token_to_query(tokens[i + 1], fields)
)
i += 2
continue
if token.type == TokenType.LPAREN:
# Find matching paren and recurse
paren_depth = 1
j = i + 1
while j < len(tokens) and paren_depth > 0:
if tokens[j].type == TokenType.LPAREN:
paren_depth += 1
elif tokens[j].type == TokenType.RPAREN:
paren_depth -= 1
j += 1
sub_tokens = tokens[i + 1:j - 1]
sub_query = self._parse_expression(sub_tokens, fields)
if current_operator == TokenType.OR:
should_clauses.append(sub_query)
else:
must_clauses.append(sub_query)
i = j
continue
if token.type == TokenType.PROXIMITY:
# Proximity: previous term /N next term
if must_clauses and i + 1 < len(tokens):
prev_query = must_clauses.pop()
next_query = self._token_to_query(tokens[i + 1], fields)
proximity = self._parse_proximity(token.value)
span_query = self._build_span_near(
prev_query, next_query, proximity, fields[0]
)
must_clauses.append(span_query)
i += 2
continue
# Regular term/phrase
term_query = self._token_to_query(token, fields)
if current_operator == TokenType.OR:
should_clauses.append(term_query)
else:
must_clauses.append(term_query)
i += 1
# Build final bool query
bool_query = {"bool": {}}
if must_clauses:
bool_query["bool"]["must"] = must_clauses
if should_clauses:
bool_query["bool"]["should"] = should_clauses
bool_query["bool"]["minimum_should_match"] = 1
if must_not_clauses:
bool_query["bool"]["must_not"] = must_not_clauses
return bool_query
def _token_to_query(self, token: Token, fields: List[str]) -> dict:
"""Convert a single token to ES query."""
if token.type == TokenType.TERM:
return {
"multi_match": {
"query": token.value,
"fields": fields,
"type": "best_fields"
}
}
if token.type == TokenType.PHRASE:
return {
"multi_match": {
"query": token.value,
"fields": fields,
"type": "phrase"
}
}
if token.type == TokenType.FIELD:
# Parse field:value
field, value = token.value.split(':', 1)
return {
"match": {field: value}
}
if token.type == TokenType.CITATION:
# Handle citation search
prefix, citation = token.value.split(':', 1)
if prefix.lower() == 'cites':
return {
"term": {"citations.cited_id": citation}
}
elif prefix.lower() == 'citedby':
return {
"term": {"citations.citing_id": citation}
}
if token.type == TokenType.RANGE:
# Parse [2020 TO 2024]
match = re.match(r'\[(\S+)\s+TO\s+(\S+)\]', token.value)
if match:
return {
"range": {
"date": {
"gte": match.group(1),
"lte": match.group(2)
}
}
}
return {"match_all": {}}
def _parse_proximity(self, value: str) -> int:
"""Parse proximity value to integer."""
if value == 's':
return 15 # Same sentence ≈ 15 words
if value == 'p':
return 50 # Same paragraph ≈ 50 words
return int(value)
def _build_span_near(
self,
query1: dict,
query2: dict,
slop: int,
field: str
) -> dict:
"""Build span_near query for proximity search."""
# Extract terms from queries
term1 = self._extract_term(query1)
term2 = self._extract_term(query2)
return {
"span_near": {
"clauses": [
{"span_term": {field: term1}},
{"span_term": {field: term2}}
],
"slop": slop,
"in_order": False
}
}
def _extract_term(self, query: dict) -> str:
"""Extract term from a query structure."""
if "multi_match" in query:
return query["multi_match"]["query"]
if "match" in query:
return list(query["match"].values())[0]
return ""
# =============================================================================
# Usage Example
# =============================================================================
async def search_legal_documents(
query_string: str,
tenant_id: str,
filters: dict = None
) -> dict:
"""
Execute a legal search query.
Example queries:
- "(negligence /10 liability) AND damages"
- "breach AND contract NOT employment"
- "cites:410 U.S. 113"
- "judge:Ginsburg AND civil rights"
"""
builder = LegalQueryBuilder()
es_query = builder.build_query(
query_string,
tenant_id=tenant_id,
search_fields=["content", "title^3", "summary^2", "headnotes^2"]
)
# Add additional filters
if filters:
if filters.get("jurisdiction"):
es_query["query"]["bool"]["filter"].append({
"term": {"jurisdiction": filters["jurisdiction"]}
})
if filters.get("date_from") or filters.get("date_to"):
date_range = {}
if filters.get("date_from"):
date_range["gte"] = filters["date_from"]
if filters.get("date_to"):
date_range["lte"] = filters["date_to"]
es_query["query"]["bool"]["filter"].append({
"range": {"decision_date": date_range}
})
if filters.get("document_type"):
es_query["query"]["bool"]["filter"].append({
"term": {"doc_type": filters["document_type"]}
})
# Add highlighting for snippets
es_query["highlight"] = {
"fields": {
"content": {
"fragment_size": 200,
"number_of_fragments": 3
},
"headnotes": {}
},
"pre_tags": ["<mark>"],
"post_tags": ["</mark>"]
}
return es_query
Edge Cases
Interviewer: "What happens if a user's query has unbalanced parentheses or invalid syntax?"
You: "Good question. We handle this gracefully:"
QUERY ERROR HANDLING
1. UNBALANCED PARENTHESES
Input: "(breach AND contract"
Handling:
- Detect during parsing
- Auto-close open parens at end
- OR: Fall back to simple search
- Show warning to user: "Query may not be parsed as intended"
2. INVALID PROXIMITY
Input: "negligence /abc liability"
Handling:
- Ignore invalid proximity operator
- Treat as "negligence AND abc AND liability"
- Log for analytics (common user errors)
3. EMPTY QUOTES
Input: 'contract AND ""'
Handling:
- Ignore empty phrase
- Process rest of query normally
4. DEEPLY NESTED
Input: "((((term))))"
Handling:
- Set max nesting depth (10)
- Beyond that, flatten to simple AND
Deep Dive 2: Multi-Tenant Isolation (Week 7, Day 1 & Day 5 — Security)
Interviewer: "You mentioned tenant isolation for private documents. How do you ensure a law firm can never see another firm's documents?"
You: "This is absolutely critical for a legal platform. A data leak between law firms would be catastrophic. Let me explain our defense-in-depth approach."
The Problem
TENANT ISOLATION REQUIREMENTS
Law Firm A uploads confidential merger contract.
Law Firm B must NEVER be able to:
- Search and find this document
- Access it by ID if they guess
- See it in autocomplete suggestions
- See it in "related documents"
- See aggregate stats that leak info
Even if:
- There's a bug in our code
- An attacker manipulates API calls
- A developer makes a mistake
- The query somehow bypasses filters
This is a ZERO TOLERANCE requirement.
The Solution
You: "We implement multiple layers of isolation:"
MULTI-LAYER TENANT ISOLATION
┌────────────────────────────────────────────────────────────────────────────┐
│ DEFENSE IN DEPTH │
│ │
│ LAYER 1: SEPARATE INDICES │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ private_docs_ │ │ private_docs_ │ │ private_docs_ │ │
│ │ firm_abc123 │ │ firm_def456 │ │ firm_ghi789 │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ Each tenant has own index - no cross-index queries possible │
│ │
│ LAYER 2: QUERY FILTER INJECTION │
│ Every query automatically includes: │
│ {"filter": {"term": {"tenant_id": "firm_abc123"}}} │
│ Applied at service layer, cannot be bypassed by API │
│ │
│ LAYER 3: FIELD-LEVEL SECURITY │
│ Elasticsearch document-level security: │
│ User can only see docs where tenant_id matches their JWT │
│ │
│ LAYER 4: API GATEWAY VALIDATION │
│ - Extract tenant_id from JWT token │
│ - Inject into request context │
│ - Cannot be overridden by request body │
│ │
│ LAYER 5: AUDIT LOGGING │
│ Every search logged with: │
│ - User ID, Tenant ID, Query, Results returned │
│ - Anomaly detection for cross-tenant access attempts │
│ │
└────────────────────────────────────────────────────────────────────────────┘
Implementation
# security/tenant_isolation.py
"""
Multi-tenant isolation for legal document search.
Applies: Week 7, Day 1 - Partitioning (tenant per index)
Week 7, Day 5 - Security and Operations
"""
from dataclasses import dataclass
from typing import Optional, List
from functools import wraps
import logging
logger = logging.getLogger(__name__)
@dataclass
class TenantContext:
"""Immutable tenant context from JWT."""
tenant_id: str
user_id: str
roles: List[str]
@property
def private_index(self) -> str:
"""Get tenant-specific private index name."""
return f"private_docs_{self.tenant_id}"
@property
def can_search_public(self) -> bool:
"""Check if user can search public documents."""
return "search:public" in self.roles
@property
def can_search_private(self) -> bool:
"""Check if user can search private documents."""
return "search:private" in self.roles
class TenantIsolationMiddleware:
"""
Middleware to enforce tenant isolation.
This runs BEFORE any search service code.
"""
def __init__(self, jwt_validator):
self.jwt_validator = jwt_validator
async def __call__(self, request):
"""Extract and validate tenant context."""
# Get token from header
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise AuthenticationError("Missing token")
token = auth_header[7:]
# Validate and extract claims
claims = await self.jwt_validator.validate(token)
# Create immutable context
context = TenantContext(
tenant_id=claims["tenant_id"],
user_id=claims["user_id"],
roles=claims.get("roles", [])
)
# Attach to request (cannot be modified)
request.state.tenant = context
# Log for audit
logger.info(
"Request authenticated",
extra={
"tenant_id": context.tenant_id,
"user_id": context.user_id,
"path": request.url.path
}
)
return context
def require_tenant_isolation(func):
"""
Decorator ensuring tenant isolation in search functions.
"""
@wraps(func)
async def wrapper(*args, **kwargs):
# Get tenant context (must exist)
tenant: TenantContext = kwargs.get("tenant")
if not tenant:
raise SecurityError("Tenant context required")
# Verify tenant_id is not overridable in query
query = kwargs.get("query", {})
if _contains_tenant_override(query):
logger.warning(
"Blocked tenant override attempt",
extra={
"tenant_id": tenant.tenant_id,
"attempted_query": query
}
)
raise SecurityError("Tenant override not allowed")
return await func(*args, **kwargs)
return wrapper
def _contains_tenant_override(query: dict) -> bool:
"""Check if query tries to override tenant filter."""
query_str = str(query).lower()
# Block any attempt to query tenant_id field directly
if "tenant_id" in query_str:
return True
# Block attempts to query other tenant indices
if "private_docs_" in query_str:
return True
return False
class IsolatedSearchService:
"""
Search service with built-in tenant isolation.
"""
def __init__(self, es_client):
self.es = es_client
self.public_index = "public_legal_docs"
@require_tenant_isolation
async def search(
self,
query_string: str,
tenant: TenantContext,
include_public: bool = True,
include_private: bool = True,
filters: dict = None
) -> dict:
"""
Execute search with automatic tenant isolation.
The tenant context comes from JWT, not from the request body.
It cannot be overridden by the caller.
"""
indices = []
# Public documents (if permitted and requested)
if include_public and tenant.can_search_public:
indices.append(self.public_index)
# Private documents (tenant-specific index)
if include_private and tenant.can_search_private:
indices.append(tenant.private_index)
if not indices:
return {"hits": [], "total": 0}
# Build query with MANDATORY tenant filter
es_query = self._build_isolated_query(
query_string,
tenant,
filters
)
# Execute search
response = await self.es.search(
index=",".join(indices),
body=es_query
)
# Audit log
await self._audit_log(
tenant=tenant,
query=query_string,
indices=indices,
result_count=response["hits"]["total"]["value"]
)
return self._process_response(response)
def _build_isolated_query(
self,
query_string: str,
tenant: TenantContext,
filters: dict
) -> dict:
"""Build query with mandatory tenant isolation."""
# Parse the user's query
parsed_query = LegalQueryBuilder().build_query(
query_string,
tenant_id=None, # We add filter separately
search_fields=["content", "title^3", "summary^2"]
)
# CRITICAL: Add tenant filter that CANNOT be bypassed
# This filter is added by the service, not from user input
tenant_filter = {
"bool": {
"should": [
# Public documents (no tenant restriction)
{"term": {"is_public": True}},
# Private documents (must match tenant)
{
"bool": {
"must": [
{"term": {"is_public": False}},
{"term": {"tenant_id": tenant.tenant_id}}
]
}
}
],
"minimum_should_match": 1
}
}
# Inject filter
parsed_query["query"]["bool"]["filter"].append(tenant_filter)
# Add any additional filters
if filters:
for key, value in filters.items():
if key in ["jurisdiction", "doc_type", "court"]:
parsed_query["query"]["bool"]["filter"].append({
"term": {key: value}
})
return parsed_query
async def _audit_log(
self,
tenant: TenantContext,
query: str,
indices: List[str],
result_count: int
):
"""Log search for audit trail."""
await self.es.index(
index="search_audit_log",
document={
"timestamp": "now",
"tenant_id": tenant.tenant_id,
"user_id": tenant.user_id,
"query": query,
"indices_searched": indices,
"result_count": result_count,
"client_ip": "extracted_from_request"
}
)
def _process_response(self, response: dict) -> dict:
"""Process ES response, ensuring no cross-tenant leakage."""
hits = []
for hit in response["hits"]["hits"]:
# Double-check: never return tenant_id in response
source = hit["_source"]
if "tenant_id" in source:
del source["tenant_id"]
hits.append({
"id": hit["_id"],
"score": hit["_score"],
"source": source,
"highlights": hit.get("highlight", {})
})
return {
"hits": hits,
"total": response["hits"]["total"]["value"]
}
Verification Tests
You: "We also have automated tests that verify isolation:"
# tests/test_tenant_isolation.py
"""
Critical security tests for tenant isolation.
These tests MUST pass before any deployment.
"""
import pytest
class TestTenantIsolation:
"""Tests that verify tenant isolation cannot be bypassed."""
async def test_cannot_search_other_tenant_documents(
self,
search_service,
tenant_a,
tenant_b
):
"""Tenant A cannot find Tenant B's documents."""
# Tenant B uploads a document
doc = await upload_document(
tenant=tenant_b,
content="This is a secret merger agreement"
)
# Tenant A searches for it
results = await search_service.search(
query_string="secret merger agreement",
tenant=tenant_a,
include_private=True
)
# Must NOT find it
assert doc["id"] not in [h["id"] for h in results["hits"]]
async def test_cannot_access_by_id_guess(
self,
document_service,
tenant_a,
tenant_b
):
"""Tenant A cannot access Tenant B's document by ID."""
# Tenant B's document
doc = await upload_document(tenant=tenant_b, content="Secret")
# Tenant A tries to access by ID
with pytest.raises(NotFoundError):
await document_service.get_document(
document_id=doc["id"],
tenant=tenant_a
)
async def test_cannot_override_tenant_filter(
self,
search_service,
tenant_a
):
"""Query cannot override tenant filter."""
# Try to inject tenant_id in query
with pytest.raises(SecurityError):
await search_service.search(
query_string="contract",
tenant=tenant_a,
filters={"tenant_id": "other_tenant"} # Should be blocked
)
async def test_cannot_query_other_tenant_index(
self,
search_service,
tenant_a
):
"""Cannot query another tenant's index directly."""
# Try to search other tenant's index
with pytest.raises(SecurityError):
await search_service.search_raw(
index="private_docs_other_tenant",
query={"match_all": {}},
tenant=tenant_a
)
async def test_public_documents_visible_to_all(
self,
search_service,
tenant_a,
tenant_b
):
"""Public documents are searchable by all tenants."""
# Search public case law
results_a = await search_service.search(
query_string="Roe v. Wade",
tenant=tenant_a,
include_public=True
)
results_b = await search_service.search(
query_string="Roe v. Wade",
tenant=tenant_b,
include_public=True
)
# Both should find the same public documents
assert results_a["total"] > 0
assert results_a["total"] == results_b["total"]
Deep Dive 3: Citation Network Search (Week 7, Day 2 & Day 4 — Indexing & Advanced Features)
Interviewer: "Tell me about the citation search. How do you build and query the citation network?"
You: "Citation analysis is crucial for legal research. Lawyers need to find all cases that cite a given case, understand if it's still 'good law', and explore citation chains."
The Problem
CITATION SEARCH REQUIREMENTS
Given a case like "Brown v. Board of Education":
1. CITING CASES
Find all cases that cite this case
Expected: Thousands of results
2. CITED BY
Find all cases this case cites
Expected: Dozens of results
3. CITATION DEPTH
Find cases citing cases that cite this case
"2-hop" citation network
4. CITATION TREATMENT
How was the case cited?
- Followed (positive)
- Distinguished (neutral)
- Overruled (negative)
- Mentioned (neutral)
5. CITATION VALIDITY
Is this case still "good law"?
Has it been overruled or limited?
The Solution
CITATION DATA MODEL
┌────────────────────────────────────────────────────────────────────────────┐
│ CITATION INDEX STRUCTURE │
│ │
│ Document: Brown v. Board of Education │
│ ├── ID: "347_us_483" │
│ ├── citations_outbound: [ │
│ │ {"id": "163_us_537", "treatment": "overruled"}, │
│ │ {"id": "305_us_337", "treatment": "followed"}, │
│ │ ... │
│ │ ] │
│ ├── citation_count_inbound: 15234 │
│ └── treatment_summary: { │
│ "followed": 12000, │
│ "distinguished": 2500, │
│ "mentioned": 700, │
│ "overruled": 0 │
│ } │
│ │
│ Separate Citation Edges Index: │
│ ├── {citing_id, cited_id, treatment, context_snippet, date} │
│ └── Enables efficient graph queries │
│ │
└────────────────────────────────────────────────────────────────────────────┘
Implementation
# citations/citation_service.py
"""
Citation network search and analysis.
Applies: Week 7, Day 2 - Indexing Pipeline (citation extraction)
Week 7, Day 4 - Advanced Features (graph-like queries)
"""
from dataclasses import dataclass
from typing import List, Optional, Dict
from enum import Enum
class CitationTreatment(Enum):
"""How a case was cited."""
FOLLOWED = "followed"
DISTINGUISHED = "distinguished"
OVERRULED = "overruled"
QUESTIONED = "questioned"
MENTIONED = "mentioned"
@dataclass
class Citation:
"""A citation between two legal documents."""
citing_id: str
citing_name: str
cited_id: str
cited_name: str
treatment: CitationTreatment
context: str # Surrounding text
date: str
@dataclass
class CitationAnalysis:
"""Analysis of a document's citation profile."""
document_id: str
document_name: str
total_citations: int
treatment_breakdown: Dict[str, int]
is_good_law: bool
overruled_by: Optional[str]
key_citing_cases: List[dict]
class CitationService:
"""
Manages citation network queries.
"""
def __init__(self, es_client):
self.es = es_client
self.docs_index = "legal_documents"
self.citations_index = "citation_edges"
async def find_citing_cases(
self,
document_id: str,
treatment: Optional[CitationTreatment] = None,
limit: int = 100
) -> List[dict]:
"""
Find all cases that cite a given document.
This is the "cited by" query - who cites this case?
"""
query = {
"query": {
"bool": {
"must": [
{"term": {"cited_id": document_id}}
]
}
},
"sort": [
{"citation_date": "desc"}
],
"size": limit
}
if treatment:
query["query"]["bool"]["must"].append({
"term": {"treatment": treatment.value}
})
response = await self.es.search(
index=self.citations_index,
body=query
)
# Enrich with document details
citing_ids = [hit["_source"]["citing_id"] for hit in response["hits"]["hits"]]
docs = await self._get_documents(citing_ids)
results = []
for hit in response["hits"]["hits"]:
citation = hit["_source"]
doc = docs.get(citation["citing_id"], {})
results.append({
"document_id": citation["citing_id"],
"name": doc.get("name", citation["citing_id"]),
"court": doc.get("court"),
"date": doc.get("decision_date"),
"treatment": citation["treatment"],
"context": citation.get("context_snippet", "")
})
return results
async def find_cited_cases(
self,
document_id: str,
limit: int = 100
) -> List[dict]:
"""
Find all cases cited by a given document.
This is the "cites" query - what does this case cite?
"""
query = {
"query": {
"term": {"citing_id": document_id}
},
"size": limit
}
response = await self.es.search(
index=self.citations_index,
body=query
)
# Enrich with document details
cited_ids = [hit["_source"]["cited_id"] for hit in response["hits"]["hits"]]
docs = await self._get_documents(cited_ids)
results = []
for hit in response["hits"]["hits"]:
citation = hit["_source"]
doc = docs.get(citation["cited_id"], {})
results.append({
"document_id": citation["cited_id"],
"name": doc.get("name", citation["cited_id"]),
"court": doc.get("court"),
"date": doc.get("decision_date"),
"treatment": citation["treatment"]
})
return results
async def analyze_citation_profile(
self,
document_id: str
) -> CitationAnalysis:
"""
Analyze the citation profile of a document.
Determines if case is "good law" and how it's been treated.
"""
# Get the document
doc = await self._get_document(document_id)
# Get citation treatment aggregation
agg_query = {
"query": {
"term": {"cited_id": document_id}
},
"size": 0,
"aggs": {
"treatments": {
"terms": {"field": "treatment"}
},
"recent_overruled": {
"filter": {"term": {"treatment": "overruled"}},
"aggs": {
"cases": {
"top_hits": {
"size": 1,
"sort": [{"citation_date": "desc"}]
}
}
}
},
"key_citations": {
"top_hits": {
"size": 10,
"sort": [{"citing_court_rank": "desc"}]
}
}
}
}
response = await self.es.search(
index=self.citations_index,
body=agg_query
)
aggs = response["aggregations"]
# Build treatment breakdown
treatment_breakdown = {
bucket["key"]: bucket["doc_count"]
for bucket in aggs["treatments"]["buckets"]
}
# Determine if good law
overruled_hits = aggs["recent_overruled"]["cases"]["hits"]["hits"]
is_overruled = len(overruled_hits) > 0
overruled_by = None
if is_overruled:
overruled_by = overruled_hits[0]["_source"]["citing_id"]
# Key citing cases
key_cases = [
{
"id": hit["_source"]["citing_id"],
"treatment": hit["_source"]["treatment"]
}
for hit in aggs["key_citations"]["hits"]["hits"]
]
return CitationAnalysis(
document_id=document_id,
document_name=doc.get("name", ""),
total_citations=response["hits"]["total"]["value"],
treatment_breakdown=treatment_breakdown,
is_good_law=not is_overruled,
overruled_by=overruled_by,
key_citing_cases=key_cases
)
async def find_citation_chain(
self,
document_id: str,
depth: int = 2,
limit_per_level: int = 10
) -> dict:
"""
Find citation chain up to N levels deep.
Example: Cases citing cases that cite the target case.
"""
chain = {
"root": document_id,
"levels": []
}
current_ids = [document_id]
for level in range(depth):
# Find all cases citing the current level
query = {
"query": {
"terms": {"cited_id": current_ids}
},
"aggs": {
"citing_cases": {
"terms": {
"field": "citing_id",
"size": limit_per_level
}
}
},
"size": 0
}
response = await self.es.search(
index=self.citations_index,
body=query
)
citing_ids = [
bucket["key"]
for bucket in response["aggregations"]["citing_cases"]["buckets"]
]
if not citing_ids:
break
chain["levels"].append({
"depth": level + 1,
"count": len(citing_ids),
"sample_ids": citing_ids[:5]
})
current_ids = citing_ids
return chain
async def _get_document(self, document_id: str) -> dict:
"""Get a single document by ID."""
try:
response = await self.es.get(
index=self.docs_index,
id=document_id
)
return response["_source"]
except:
return {}
async def _get_documents(self, document_ids: List[str]) -> Dict[str, dict]:
"""Get multiple documents by ID."""
if not document_ids:
return {}
response = await self.es.mget(
index=self.docs_index,
body={"ids": document_ids}
)
return {
doc["_id"]: doc["_source"]
for doc in response["docs"]
if doc.get("found")
}
# =============================================================================
# Citation Extraction Pipeline
# =============================================================================
class CitationExtractor:
"""
Extracts citations from legal document text.
Applies: Week 7, Day 2 - Indexing Pipeline
"""
# Patterns for different citation formats
PATTERNS = {
# US Supreme Court: 410 U.S. 113
"us_reports": r"(\d+)\s+U\.S\.\s+(\d+)",
# Federal Reporter: 123 F.2d 456
"federal_reporter": r"(\d+)\s+F\.(2d|3d)?\s+(\d+)",
# State reporters: 123 Cal.App.4th 456
"state_reporter": r"(\d+)\s+([A-Z][a-z]+\.?\s*(?:App\.?)?\s*(?:\d+[a-z]+)?)\s+(\d+)",
# Case names: Brown v. Board of Education
"case_name": r"([A-Z][a-z]+)\s+v\.\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)",
}
def extract_citations(self, text: str) -> List[dict]:
"""Extract all citations from document text."""
import re
citations = []
for pattern_name, pattern in self.PATTERNS.items():
for match in re.finditer(pattern, text):
# Get surrounding context
start = max(0, match.start() - 100)
end = min(len(text), match.end() + 100)
context = text[start:end]
# Detect treatment from context
treatment = self._detect_treatment(context)
citations.append({
"raw_citation": match.group(0),
"pattern_type": pattern_name,
"context": context,
"treatment": treatment,
"position": match.start()
})
return citations
def _detect_treatment(self, context: str) -> str:
"""Detect how a citation is being used."""
context_lower = context.lower()
# Negative treatments
if any(word in context_lower for word in
["overruled", "overrule", "rejected", "abrogated"]):
return "overruled"
if any(word in context_lower for word in
["questioned", "doubted", "criticized"]):
return "questioned"
# Neutral/distinguishing
if any(word in context_lower for word in
["distinguished", "distinguishing", "unlike"]):
return "distinguished"
# Positive
if any(word in context_lower for word in
["followed", "following", "accord", "see also", "affirmed"]):
return "followed"
# Default
return "mentioned"
Deep Dive 4: Document Processing Pipeline (Week 7, Day 2 — CDC and Indexing)
Interviewer: "How do you handle document ingestion, especially for scanned documents that need OCR?"
You: "This is a critical part of the pipeline. Law firms often upload scanned contracts, old case files, and handwritten notes. We need robust processing."
The Pipeline
DOCUMENT PROCESSING PIPELINE
┌────────────────────────────────────────────────────────────────────────────┐
│ DOCUMENT INGESTION FLOW │
│ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Upload │────▶│ Triage │────▶│ Process │────▶│ Index │ │
│ │ (API) │ │ (Queue) │ │ (Workers) │ │ (ES) │ │
│ └───────────┘ └───────────┘ └───────────┘ └───────────┘ │
│ │
│ TRIAGE DECISIONS: │
│ ├── PDF (text-based) ──▶ Extract text directly │
│ ├── PDF (scanned) ──▶ OCR pipeline │
│ ├── Word/DOCX ──▶ Apache Tika extraction │
│ ├── Image (JPG/PNG) ──▶ OCR pipeline │
│ └── Unknown ──▶ Tika with fallback │
│ │
│ OCR PIPELINE: │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Image │────▶│ Enhance │────▶│ OCR │────▶│ Post- │ │
│ │ Extract │ │ Quality │ │ (Tessera) │ │ Process │ │
│ └───────────┘ └───────────┘ └───────────┘ └───────────┘ │
│ deskew, multi-lang spell check, │
│ denoise confidence legal terms │
│ │
└────────────────────────────────────────────────────────────────────────────┘
Implementation
# ingestion/document_processor.py
"""
Document processing pipeline for legal documents.
Applies: Week 7, Day 2 - Indexing Pipeline
"""
from dataclasses import dataclass
from typing import Optional, List
from enum import Enum
import asyncio
import logging
logger = logging.getLogger(__name__)
class DocumentType(Enum):
PDF_TEXT = "pdf_text"
PDF_SCANNED = "pdf_scanned"
WORD = "word"
IMAGE = "image"
UNKNOWN = "unknown"
@dataclass
class ProcessedDocument:
"""Result of document processing."""
document_id: str
tenant_id: str
original_filename: str
content: str
metadata: dict
citations: List[dict]
page_count: int
word_count: int
ocr_confidence: Optional[float]
processing_time_ms: int
class DocumentProcessor:
"""
Processes uploaded documents for indexing.
"""
def __init__(
self,
s3_client,
tika_client,
ocr_service,
citation_extractor,
kafka_producer
):
self.s3 = s3_client
self.tika = tika_client
self.ocr = ocr_service
self.citations = citation_extractor
self.kafka = kafka_producer
async def process_document(
self,
document_id: str,
tenant_id: str,
s3_key: str,
filename: str
) -> ProcessedDocument:
"""
Process a document through the full pipeline.
"""
import time
start_time = time.time()
# Download from S3
file_bytes = await self.s3.download(s3_key)
# Determine document type
doc_type = self._detect_document_type(filename, file_bytes)
logger.info(
f"Processing document {document_id}",
extra={"type": doc_type.value, "size": len(file_bytes)}
)
# Extract content based on type
if doc_type == DocumentType.PDF_TEXT:
content, metadata = await self._extract_pdf_text(file_bytes)
ocr_confidence = None
elif doc_type == DocumentType.PDF_SCANNED:
content, metadata, ocr_confidence = await self._process_scanned_pdf(
file_bytes
)
elif doc_type == DocumentType.WORD:
content, metadata = await self._extract_word(file_bytes)
ocr_confidence = None
elif doc_type == DocumentType.IMAGE:
content, ocr_confidence = await self._process_image(file_bytes)
metadata = {}
else:
# Fallback to Tika
content, metadata = await self._extract_with_tika(file_bytes)
ocr_confidence = None
# Extract citations
citations = self.citations.extract_citations(content)
# Build processed document
processing_time = int((time.time() - start_time) * 1000)
processed = ProcessedDocument(
document_id=document_id,
tenant_id=tenant_id,
original_filename=filename,
content=content,
metadata=metadata,
citations=citations,
page_count=metadata.get("page_count", 1),
word_count=len(content.split()),
ocr_confidence=ocr_confidence,
processing_time_ms=processing_time
)
# Publish to Kafka for indexing
await self._publish_for_indexing(processed)
return processed
def _detect_document_type(
self,
filename: str,
file_bytes: bytes
) -> DocumentType:
"""Detect the type of document for processing."""
ext = filename.lower().split(".")[-1]
if ext in ("doc", "docx"):
return DocumentType.WORD
if ext in ("jpg", "jpeg", "png", "tiff", "bmp"):
return DocumentType.IMAGE
if ext == "pdf":
# Check if PDF has extractable text
if self._pdf_has_text(file_bytes):
return DocumentType.PDF_TEXT
else:
return DocumentType.PDF_SCANNED
return DocumentType.UNKNOWN
def _pdf_has_text(self, file_bytes: bytes) -> bool:
"""Check if PDF contains extractable text or is scanned."""
import fitz # PyMuPDF
doc = fitz.open(stream=file_bytes, filetype="pdf")
# Check first few pages
for page_num in range(min(3, len(doc))):
page = doc[page_num]
text = page.get_text()
if len(text.strip()) > 100:
return True
return False
async def _extract_pdf_text(self, file_bytes: bytes) -> tuple:
"""Extract text from text-based PDF."""
import fitz
doc = fitz.open(stream=file_bytes, filetype="pdf")
content_parts = []
for page in doc:
content_parts.append(page.get_text())
content = "\n\n".join(content_parts)
metadata = {
"page_count": len(doc),
"title": doc.metadata.get("title", ""),
"author": doc.metadata.get("author", ""),
"creation_date": doc.metadata.get("creationDate", "")
}
return content, metadata
async def _process_scanned_pdf(self, file_bytes: bytes) -> tuple:
"""Process scanned PDF through OCR."""
import fitz
doc = fitz.open(stream=file_bytes, filetype="pdf")
content_parts = []
confidences = []
for page_num, page in enumerate(doc):
# Extract page as image
pix = page.get_pixmap(dpi=300)
image_bytes = pix.tobytes("png")
# OCR the image
text, confidence = await self.ocr.process_image(image_bytes)
content_parts.append(text)
confidences.append(confidence)
content = "\n\n".join(content_parts)
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
metadata = {"page_count": len(doc)}
return content, metadata, avg_confidence
async def _process_image(self, file_bytes: bytes) -> tuple:
"""Process single image through OCR."""
text, confidence = await self.ocr.process_image(file_bytes)
return text, confidence
async def _extract_word(self, file_bytes: bytes) -> tuple:
"""Extract content from Word document."""
result = await self.tika.extract(file_bytes, "application/msword")
return result["content"], result.get("metadata", {})
async def _extract_with_tika(self, file_bytes: bytes) -> tuple:
"""Fallback extraction with Apache Tika."""
result = await self.tika.extract(file_bytes)
return result.get("content", ""), result.get("metadata", {})
async def _publish_for_indexing(self, document: ProcessedDocument):
"""Publish processed document to Kafka for indexing."""
event = {
"event_type": "document.processed",
"document_id": document.document_id,
"tenant_id": document.tenant_id,
"content": document.content,
"metadata": document.metadata,
"citations": document.citations,
"stats": {
"page_count": document.page_count,
"word_count": document.word_count,
"ocr_confidence": document.ocr_confidence,
"processing_time_ms": document.processing_time_ms
}
}
await self.kafka.produce(
topic="documents.processed",
key=document.document_id,
value=event
)
Phase 5: Scaling and Edge Cases (5 minutes)
Interviewer: "How would this system scale to 10x the document volume?"
Scaling Strategy
You: "Let me walk through the scaling vectors..."
SCALING FROM 100M TO 1B DOCUMENTS
CURRENT STATE (100M docs)
├── 12 data nodes (64GB RAM, 2TB SSD each)
├── 26 TB total storage with replicas
├── 300 QPS peak
10X SCALE (1B docs)
├── Storage: 260 TB with replicas
├── Data nodes: 60+ (or larger instances)
├── QPS: 3000 (if usage scales linearly)
SCALING APPROACH:
1. INDEX SHARDING STRATEGY
Current: 24 primary shards
Scaled: Split by document type + date
├── cases_federal_2020-2024 (5 shards)
├── cases_federal_2015-2019 (5 shards)
├── cases_state_california (3 shards)
├── statutes_federal (2 shards)
└── private_docs_* (per tenant, 1-3 shards each)
2. TIERED STORAGE
Hot (last 2 years): Fast SSD, 1 replica
Warm (2-5 years): Standard SSD, 1 replica
Cold (5+ years): HDD, 1 replica, fewer nodes
Use Index Lifecycle Management (ILM) to move automatically.
3. QUERY ROUTING
Route queries to relevant time-based indices
User searching 2023 cases doesn't hit 2010 indices
4. READ REPLICAS FOR GEOGRAPHIC DISTRIBUTION
Primary cluster: US East
Read replica: US West
Read replica: EU (for international firms)
Edge Cases
Interviewer: "What edge cases should we handle?"
You: "Several important ones for legal search:"
EDGE CASES
1. VERY LARGE DOCUMENTS
Problem: 500-page Supreme Court opinions
Solution:
├── Split into sections for indexing
├── Store section metadata (page ranges)
├── Aggregate results by parent document
└── Lazy-load full content
2. SPECIAL CHARACTERS IN LEGAL TEXT
Problem: § (section), ¶ (paragraph), legal symbols
Solution:
├── Custom character filter in analyzer
├── Map § → "section"
├── Preserve symbols in stored content
└── Normalize for search
3. CITATION VARIATIONS
Problem: Same case cited many ways
"Brown v. Board", "Brown v. Bd. of Ed.", "347 U.S. 483"
Solution:
├── Citation normalization during extraction
├── Canonical ID for each case
├── Synonym expansion for case names
└── All variations map to same document
4. TENANT WITH MILLIONS OF DOCUMENTS
Problem: One large law firm skews resources
Solution:
├── Per-tenant resource limits
├── Multiple shards for large tenants
├── Query timeout enforcement
└── Fair scheduling across tenants
5. SIMULTANEOUS UPDATES TO SAME DOCUMENT
Problem: Multiple users editing annotations
Solution:
├── Annotations stored separately from content
├── User-specific annotation layer
├── Optimistic locking for shared edits
└── Content itself is immutable
Phase 6: Monitoring and Operations (5 minutes)
Interviewer: "How would you monitor this system in production?"
Key Metrics
You: "I'd track metrics at multiple levels..."
┌────────────────────────────────────────────────────────────────────────────┐
│ LEGAL SEARCH MONITORING DASHBOARD │
│ │
│ SEARCH PERFORMANCE │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Query Latency p99 │ QPS │ Error Rate │ │
│ │ ┌─────────────────────┐ │ ┌─────────────────┐ │ ┌─────────────┐ │ │
│ │ │ 320ms │ │ │ 180 │ │ │ 0.02% │ │ │
│ │ │ Target: <500ms ✓ │ │ │ ▄▅▆▇▆▅▄▅▆▇ │ │ │ ▁▁▁▁▁▁▁▁▁ │ │ │
│ │ └─────────────────────┘ │ └─────────────────┘ │ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ DOCUMENT PROCESSING │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Processing Lag │ Queue Depth │ OCR Success Rate │ │
│ │ ┌───────────────┐ │ ┌──────────────┐ │ ┌──────────────────────────┐ │ │
│ │ │ 45s │ │ │ 23 │ │ │ 98.5% │ │ │
│ │ │ Target: <5m ✓ │ │ │ ▂▃▄▃▂▂▃▄▃▂ │ │ │ ████████████████████░░ │ │ │
│ │ └───────────────┘ │ └──────────────┘ │ └──────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ CLUSTER HEALTH │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Status: GREEN │ Nodes: 12/12 │ Shards: 100% │ Disk: 62% │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ TENANT ISOLATION VERIFICATION │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Cross-tenant query attempts: 0 (last 24h) ✓ │ │
│ │ Isolation test (automated): PASSED 2 minutes ago │ │
│ │ Audit log anomalies: 0 detected │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ SEARCH QUALITY │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Zero Result Rate: 3.2% │ Avg Results/Query: 45 │ Citation Hit: 89% │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────┘
Critical Alerts
ALERTING CONFIGURATION
CRITICAL (PagerDuty):
├── Cluster status RED for >1 minute
├── Search latency p99 >2s for >5 minutes
├── Any cross-tenant data access attempt
├── Document processing queue >10,000
├── Error rate >5%
WARNING (Slack):
├── Cluster status YELLOW for >10 minutes
├── Search latency p99 >1s for >15 minutes
├── OCR success rate <95%
├── Disk usage >80%
├── Zero result rate >10%
INFO (Dashboard only):
├── New tenant onboarded
├── Large document processed (>100 pages)
├── Unusual query patterns
Interview Conclusion
Interviewer: "Excellent work. You've covered a lot of ground — from complex query parsing to security to operations. A few final questions: What would you prioritize building first?"
You: "I'd prioritize in this order:
- Tenant isolation — Security is non-negotiable for a legal platform
- Basic search with boolean operators — Core functionality lawyers expect
- Document ingestion with OCR — Getting content into the system
- Citation extraction and linking — Differentiating feature for legal research
- Advanced features (proximity search, analytics) — Once the core is solid
The key insight is that legal search has unique requirements: complex queries, strict security, and domain-specific features like citations. A generic search solution wouldn't work here."
Interviewer: "Great. Any questions for me?"
You: "I'd love to hear how you currently handle the citation network — do you use a graph database, or is it all in Elasticsearch? And what's your experience with OCR accuracy on older legal documents?"
Concepts Applied Summary
Week 7 Concepts Used
| Day | Concept | Application in This Design |
|---|---|---|
| Day 1 | Inverted Index | Full-text search for legal documents |
| Day 1 | Text Analysis | Legal-specific analyzers (§ symbols, citations) |
| Day 1 | Document Modeling | Separate indices for public/private/citations |
| Day 2 | CDC Pipeline | Document processing → Kafka → Elasticsearch |
| Day 2 | Bulk Indexing | Initial load of 50M court opinions |
| Day 2 | Zero-Downtime Reindex | Schema updates without service interruption |
| Day 3 | Query vs Filter | Boolean logic in query, tenant in filter |
| Day 3 | BM25 Tuning | Legal terminology relevance |
| Day 3 | Function Scores | Boost by citation count, recency |
| Day 4 | Autocomplete | Recent payees, frequent searches |
| Day 4 | Synonyms | Legal term synonyms ("contract"/"agreement") |
| Day 4 | Multi-language | N/A (English legal docs) |
| Day 5 | Cluster Architecture | Master/data/coordinating node separation |
| Day 5 | Capacity Planning | Storage and compute for 100M docs |
| Day 5 | Monitoring | Search quality, processing lag |
| Day 5 | Disaster Recovery | Snapshots, cross-region replication |
Code Patterns Demonstrated
1. LEGAL QUERY PARSER
- Custom lexer/parser for boolean + proximity
- Converts legal syntax to ES query DSL
2. MULTI-TENANT ISOLATION
- JWT-based tenant context
- Mandatory filter injection
- Per-tenant indices
- Audit logging
3. CITATION NETWORK
- Citation extraction from text
- Treatment detection
- Graph queries (citing, cited-by)
4. DOCUMENT PROCESSING
- Type detection and routing
- OCR pipeline for scanned docs
- Kafka for async processing
Self-Assessment Checklist
After studying this capstone, you should be able to:
- Design a search system for domain-specific content (legal, medical, etc.)
- Implement complex query parsing beyond simple keyword search
- Build multi-tenant search with strict isolation guarantees
- Create a document processing pipeline with OCR support
- Model citation/reference networks in a search index
- Size an Elasticsearch cluster for specific requirements
- Design monitoring for search quality and security
- Handle edge cases like large documents and special characters
- Explain trade-offs between different indexing strategies
- Articulate security considerations for sensitive data
This capstone integrates all concepts from Week 7 of the System Design Mastery Series. The legal domain demonstrates how search systems must be customized for specific use cases while maintaining security, performance, and reliability.
Next Week Preview: Week 8 — Analytics Pipeline
We'll design a data pipeline from event ingestion to queryable analytics, covering streaming vs batch processing, data modeling for OLAP, and handling late-arriving data.