Himanshu Kukreja
0%
LearnSystem DesignWeek 7Interview Week 7 Legal Document Search
Capstone

Week 7 Capstone: Designing a Legal Document Search System

A Real-World Problem Covering Everything You've Learned


The Interview Begins

You walk into the interview room at a legal technology company. The interviewer, a Staff Engineer, smiles and gestures to the whiteboard.

Interviewer: "Thanks for coming in. Today we're going to work through a system design problem that's core to our business. I'm interested in your thought process, so please think out loud. Feel free to ask questions — this is meant to be collaborative."

They write on the whiteboard:

╔══════════════════════════════════════════════════════════════════════════╗
║                                                                          ║
║        Design a Legal Document Search System                             ║
║                                                                          ║
║   Context:                                                               ║
║   You're building search infrastructure for a legal research platform    ║
║   used by law firms, corporate legal departments, and courts.            ║
║   Lawyers need to find relevant case law, contracts, and legal           ║
║   documents quickly and accurately.                                      ║
║                                                                          ║
║   Key Requirements:                                                      ║
║   1. Search across 50M+ legal documents (cases, statutes, contracts)     ║
║   2. Support complex boolean queries and proximity search                ║
║   3. Citation tracking and cross-referencing                             ║
║   4. Jurisdiction and date filtering                                     ║
║   5. Document upload and OCR processing                                  ║
║   6. Relevance tuned for legal terminology                               ║
║                                                                          ║
╚══════════════════════════════════════════════════════════════════════════╝

Interviewer: "Take a few minutes to think about this, then walk me through your approach. We have about 45 minutes."


Phase 1: Requirements Clarification (5 minutes)

Before diving in, you take a breath and start asking questions.

Your Questions

You: "Before I start designing, I'd like to clarify a few requirements. First, what types of legal documents are we indexing?"

Interviewer: "Good question. We have several document types:

  • Court opinions and case law (federal and state courts)
  • Statutes and regulations
  • Legal briefs and motions
  • Contracts (uploaded by clients)
  • Legal journals and secondary sources
  • Patent filings"

You: "What's the average document size? Legal documents can be quite lengthy."

Interviewer: "They vary significantly. Court opinions average 15-20 pages, but some are 200+ pages. Contracts are typically 20-50 pages. We also have multi-volume regulatory codes. Average document is about 10,000 words, but the 99th percentile is 100,000 words."

You: "For search, what are the primary use cases? Simple keyword search, or do lawyers need more advanced query capabilities?"

Interviewer: "Lawyers need sophisticated search:

  • Boolean operators (AND, OR, NOT)
  • Proximity search ('negligence' within 10 words of 'liability')
  • Phrase search with exact matching
  • Citation search (find all cases citing '410 U.S. 113')
  • Field-specific search (judge:Ginsburg, court:Supreme Court)
  • Date and jurisdiction filters"

You: "How fresh does the data need to be? When a new court opinion is published, how quickly should it be searchable?"

Interviewer: "For court opinions from major courts, within 1 hour of publication. For user-uploaded documents, within 5 minutes. We have partnerships with court electronic filing systems for real-time feeds."

You: "What about document security? I imagine some uploaded contracts are highly confidential."

Interviewer: "Absolutely critical. Public court documents are accessible to all subscribers. But client-uploaded documents must be strictly isolated — only that law firm's users can search them. We call these 'private workspaces'."

You: "Last question — what's our user base and expected query volume?"

Interviewer: "We have 50,000 law firm subscribers with about 200,000 individual lawyer accounts. During business hours, we see 5,000-10,000 searches per minute. Paralegals doing research can run hundreds of searches per hour."

You: "Perfect. Let me summarize the requirements."

Functional Requirements

1. DOCUMENT SEARCH
   - Full-text search across all document content
   - Boolean queries (AND, OR, NOT, parentheses)
   - Proximity search (terms within N words)
   - Phrase search with exact matching
   - Wildcard and fuzzy matching
   - Field-specific search (title, judge, court, parties)

2. FILTERING & FACETS
   - Jurisdiction (federal, state, specific courts)
   - Date range (decided date, filed date)
   - Document type (opinion, statute, contract, brief)
   - Practice area (criminal, corporate, IP, family)
   - Judge/Author
   - Citation count (highly cited cases)

3. CITATION NETWORK
   - Find all documents citing a specific case
   - Find all cases cited by a document
   - Citation depth (cases citing cases that cite X)
   - Negative citations (cases that overrule or distinguish)

4. PRIVATE WORKSPACES
   - Law firms upload their own documents
   - OCR processing for scanned documents
   - Strict tenant isolation
   - Combined search (public + private documents)

5. DOCUMENT PROCESSING
   - PDF/Word/Image ingestion
   - OCR for scanned documents
   - Automatic metadata extraction
   - Citation extraction and linking

6. USER FEATURES
   - Save searches and set alerts
   - Highlight and annotate documents
   - Search history and recent documents
   - Export and print formatting

Non-Functional Requirements

1. SCALE
   - 50M+ public documents
   - 10M+ private documents across tenants
   - 200,000 registered users
   - 10,000 searches/minute peak

2. LATENCY
   - Simple search: <500ms p99
   - Complex boolean: <2s p99
   - Autocomplete: <100ms p99
   - Document indexing: <5 minutes p99

3. AVAILABILITY
   - 99.9% uptime (8.7 hours downtime/year)
   - Graceful degradation during failures
   - Read replicas for search continuity

4. DATA FRESHNESS
   - Court opinions: <1 hour from publication
   - User uploads: <5 minutes
   - Citation links: <24 hours

5. SECURITY
   - Strict tenant isolation for private docs
   - Audit logging for all access
   - Encryption at rest and in transit
   - SOC 2 Type II compliance

6. ACCURACY
   - Zero false positives in tenant isolation
   - High precision for legal terminology
   - Proper handling of legal citations

Phase 2: Back of the Envelope Estimation (5 minutes)

You: "Let me work through the numbers to understand the scale."

Document Volume

DOCUMENT COUNTS

Public documents:
  Court opinions:           30M
  Statutes & regulations:   5M
  Legal journals:           10M
  Patent filings:           5M
  ─────────────────────────
  Total public:             50M documents

Private documents:
  Average per law firm:     10,000 documents
  Number of firms:          5,000
  ─────────────────────────
  Total private:            50M documents (varies widely)

Growth:
  New court opinions:       50,000/month
  New user uploads:         500,000/month

Storage Estimation

DOCUMENT SIZE

Average document:           10,000 words ≈ 60KB text
Large documents (p99):      100,000 words ≈ 600KB text

Metadata per document:      2KB
Citation data:              1KB (average 20 citations × 50 bytes)

Total per document:         ~65KB average

STORAGE CALCULATION

Raw text (100M docs):
  100M × 65KB = 6.5 TB

Elasticsearch index:
  With overhead (2x): 13 TB
  With replicas (2x): 26 TB

Original files (PDFs):
  100M × 500KB avg = 50 TB

Total storage:              ~80 TB

Search Traffic

SEARCH QUERIES

Active users:               50,000 concurrent (peak)
Searches per user/hour:     20 (intensive research)
Peak searches:              1M/hour = 280/second

Query complexity:
  Simple keyword:           40%
  Boolean:                  35%
  Proximity:                15%
  Citation search:          10%

Autocomplete:
  5 keystrokes per search
  Peak: 280 × 5 = 1,400/second

Key Metrics Summary

┌──────────────────────────────────────────────────────────────────────────┐
│                    ESTIMATION SUMMARY                                    │
│                                                                          │
│  DOCUMENTS                                                               │
│  ├── Total documents:         100M (50M public + 50M private)            │
│  ├── New documents/month:     550K                                       │
│  └── Average size:            65KB indexed content                       │
│                                                                          │
│  STORAGE                                                                 │
│  ├── Elasticsearch:           26 TB (with replicas)                      │
│  ├── Original files:          50 TB                                      │
│  └── Total:                   ~80 TB                                     │
│                                                                          │
│  TRAFFIC                                                                 │
│  ├── Peak searches:           300/second                                 │
│  ├── Peak autocomplete:       1,500/second                               │
│  └── Indexing:                ~200 docs/minute                           │
│                                                                          │
│  INFRASTRUCTURE (estimated)                                              │
│  ├── ES data nodes:           12 × 64GB RAM, 2TB SSD                     │
│  ├── ES master nodes:         3 × 16GB RAM                               │
│  ├── ES coordinating:         4 × 32GB RAM                               │
│  └── Document processors:     6 × 16 CPU (OCR/extraction)                │
│                                                                          │
└──────────────────────────────────────────────────────────────────────────┘

Phase 3: High-Level Design (10 minutes)

You: "Now let me sketch out the high-level architecture."

System Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                    LEGAL DOCUMENT SEARCH ARCHITECTURE                       │
│                                                                             │
│                                                                             │
│    ┌──────────────┐     ┌──────────────┐     ┌──────────────┐               │
│    │   Web App    │     │  Mobile App  │     │   API Users  │               │
│    └──────┬───────┘     └──────┬───────┘     └──────┬───────┘               │
│           │                    │                    │                       │
│           └────────────────────┼────────────────────┘                       │
│                                ▼                                            │
│                    ┌───────────────────────┐                                │
│                    │     API Gateway       │                                │
│                    │   (Auth, Rate Limit)  │                                │
│                    └───────────┬───────────┘                                │
│                                │                                            │
│         ┌──────────────────────┼──────────────────────┐                     │
│         │                      │                      │                     │
│         ▼                      ▼                      ▼                     │
│  ┌─────────────┐      ┌─────────────┐      ┌──────────────────┐             │
│  │   Search    │      │  Document   │      │    Citation      │             │
│  │   Service   │      │   Service   │      │    Service       │             │
│  └──────┬──────┘      └──────┬──────┘      └────────┬─────────┘             │
│         │                    │                      │                       │
│         │              ┌─────┴─────┐                │                       │
│         │              ▼           ▼                │                       │
│         │      ┌───────────┐ ┌──────────┐           │                       │
│         │      │    OCR    │ │ Metadata │           │                       │
│         │      │  Pipeline │ │ Extractor│           │                       │
│         │      └─────┬─────┘ └────┬─────┘           │                       │
│         │            │            │                 │                       │
│         │            └─────┬──────┘                 │                       │
│         │                  ▼                        │                       │
│         │         ┌────────────────┐                │                       │
│         │         │  Kafka Topics  │                │                       │
│         │         │  (CDC Events)  │                │                       │
│         │         └────────┬───────┘                │                       │
│         │                  │                        │                       │
│         │                  ▼                        │                       │
│         │         ┌────────────────┐                │                       │
│         │         │   Indexing     │                │                       │
│         │         │   Pipeline     │                │                       │
│         │         └────────┬───────┘                │                       │
│         │                  │                        │                       │
│         ▼                  ▼                        ▼                       │
│  ┌───────────────────────────────────────────────────────────────────┐      │
│  │                    ELASTICSEARCH CLUSTER                          │      │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                │      │
│  │  │ Public Docs │  │Private Docs │  │  Citations  │                │      │
│  │  │   Index     │  │   Index     │  │    Index    │                │      │
│  │  │  (50M docs) │  │ (per tenant)│  │  (graph)    │                │      │
│  │  └─────────────┘  └─────────────┘  └─────────────┘                │      │
│  └───────────────────────────────────────────────────────────────────┘      │
│                                                                             │
│  ┌───────────────┐  ┌───────────────┐  ┌───────────────┐                    │
│  │  PostgreSQL   │  │     Redis     │  │      S3       │                    │
│  │  (metadata)   │  │   (cache)     │  │ (file store)  │                    │
│  └───────────────┘  └───────────────┘  └───────────────┘                    │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Component Breakdown

You: "Let me walk through each component..."

1. Search Service

Purpose: Handles all search queries from users

Key responsibilities:

  • Parse complex boolean and proximity queries
  • Route to appropriate index (public, private, or both)
  • Apply tenant isolation filters
  • Aggregate results across indices
  • Handle highlighting and snippets

Technology choice: Python/FastAPI for flexibility in query parsing

2. Document Service

Purpose: Manages document ingestion and processing

Key responsibilities:

  • Accept document uploads (PDF, Word, images)
  • Trigger OCR for scanned documents
  • Extract metadata (dates, parties, judges)
  • Detect and link citations
  • Publish to Kafka for indexing

Technology choice: Python with Apache Tika for extraction

3. Citation Service

Purpose: Manages the citation network

Key responsibilities:

  • Parse legal citations from text
  • Build citation graph (who cites whom)
  • Detect citation types (following, distinguishing, overruling)
  • Provide citation depth queries

Technology choice: Specialized citation parser + Neo4j or ES for graph

4. Indexing Pipeline

Purpose: Streams documents into Elasticsearch

Key responsibilities:

  • Consume from Kafka topics
  • Transform documents for search
  • Apply legal-specific text analysis
  • Handle bulk indexing efficiently
  • Maintain citation links

Data Flow

You: "Let me trace through a document upload flow..."

DOCUMENT UPLOAD FLOW

Step 1: User uploads contract PDF
        Client ──▶ API Gateway ──▶ Document Service

Step 2: Store original file
        Document Service ──▶ S3 (original PDF stored)

Step 3: Process document
        Document Service ──▶ OCR Pipeline (if scanned)
                        ──▶ Metadata Extractor
                        ──▶ Citation Extractor

Step 4: Publish for indexing
        Document Service ──▶ Kafka (document.created topic)

Step 5: Index in Elasticsearch
        Indexing Pipeline ◀── Kafka
        Indexing Pipeline ──▶ Elasticsearch (private_docs_tenant_123)

Step 6: Update citation graph
        Citation Service ◀── Kafka
        Citation Service ──▶ Elasticsearch (citations index)

Time: Upload to searchable ≈ 2-3 minutes

Phase 4: Deep Dives (20 minutes)

Interviewer: "Great high-level design. Let's dive deeper into a few areas. Tell me more about how you'd handle the complex boolean and proximity queries lawyers need."


You: "Legal search requires query capabilities beyond standard full-text search. Let me explain how I'd build the query parser."

The Problem

LEGAL QUERY EXAMPLES

Simple boolean:
  "negligence AND liability"
  "contract OR agreement"
  "patent NOT software"

Complex boolean:
  "(breach AND contract) OR (negligence AND tort)"

Proximity search:
  "negligence /10 liability"  (within 10 words)
  "intellectual property /s patent"  (/s = same sentence)
  "employment /p discrimination"  (/p = same paragraph)

Field-specific:
  "judge:Ginsburg AND civil rights"
  "court:SCOTUS AND date:[2020 TO 2024]"

Citation search:
  "cites:410 U.S. 113"  (find all citing this case)
  "citedby:Roe v. Wade"  (find all cases cited by)

Phrase search:
  "reasonable person standard"
  "beyond a reasonable doubt"

The Solution

QUERY PARSING PIPELINE

┌──────────────────────────────────────────────────────────────────────────┐
│                    LEGAL QUERY PARSER                                    │
│                                                                          │
│  Input: "(breach /5 contract) AND damages NOT punitive"                  │
│                                                                          │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐                   │
│  │   Lexer     │───▶│   Parser    │───▶│ ES Query    │                   │
│  │ (tokenize)  │    │  (AST)      │    │ Builder     │                   │
│  └─────────────┘    └─────────────┘    └─────────────┘                   │
│                                                                          │
│  Tokens:                AST:              ES Query:                      │
│  [LPAREN, TERM,         BoolNode(         bool:                          │
│   PROXIMITY, TERM,       AND,               must:                        │
│   RPAREN, AND,           ProximityNode,     - span_near                  │
│   TERM, NOT, TERM]       TermNode,          - match                      │
│                          NotNode)           must_not:                    │
│                                             - match                      │
│                                                                          │
└──────────────────────────────────────────────────────────────────────────┘

Implementation

# search/legal_query_parser.py

"""
Legal query parser supporting boolean and proximity search.

Applies: Week 7, Day 3 - Query Processing
"""

from dataclasses import dataclass
from typing import List, Optional, Union
from enum import Enum, auto
import re


class TokenType(Enum):
    """Token types in legal queries."""
    TERM = auto()
    PHRASE = auto()
    AND = auto()
    OR = auto()
    NOT = auto()
    LPAREN = auto()
    RPAREN = auto()
    PROXIMITY = auto()      # /5, /10, /s, /p
    FIELD = auto()          # judge:, court:, date:
    CITATION = auto()       # cites:, citedby:
    RANGE = auto()          # [2020 TO 2024]


@dataclass
class Token:
    """A lexical token."""
    type: TokenType
    value: str
    position: int


class LegalQueryLexer:
    """
    Tokenizes legal search queries.
    """
    
    KEYWORDS = {
        'AND': TokenType.AND,
        'OR': TokenType.OR,
        'NOT': TokenType.NOT,
        'AND NOT': TokenType.NOT,
    }
    
    FIELD_PREFIXES = ['judge:', 'court:', 'date:', 'party:', 'title:']
    CITATION_PREFIXES = ['cites:', 'citedby:', 'citing:']
    
    PROXIMITY_PATTERN = re.compile(r'/(\d+|s|p)')
    
    def tokenize(self, query: str) -> List[Token]:
        """Tokenize a legal query string."""
        
        tokens = []
        position = 0
        
        while position < len(query):
            # Skip whitespace
            if query[position].isspace():
                position += 1
                continue
            
            # Parentheses
            if query[position] == '(':
                tokens.append(Token(TokenType.LPAREN, '(', position))
                position += 1
                continue
            
            if query[position] == ')':
                tokens.append(Token(TokenType.RPAREN, ')', position))
                position += 1
                continue
            
            # Phrase (quoted string)
            if query[position] == '"':
                end = query.find('"', position + 1)
                if end == -1:
                    end = len(query)
                phrase = query[position+1:end]
                tokens.append(Token(TokenType.PHRASE, phrase, position))
                position = end + 1
                continue
            
            # Proximity operator
            if query[position] == '/':
                match = self.PROXIMITY_PATTERN.match(query[position:])
                if match:
                    tokens.append(Token(TokenType.PROXIMITY, match.group(1), position))
                    position += len(match.group(0))
                    continue
            
            # Range [2020 TO 2024]
            if query[position] == '[':
                end = query.find(']', position)
                if end != -1:
                    range_expr = query[position:end+1]
                    tokens.append(Token(TokenType.RANGE, range_expr, position))
                    position = end + 1
                    continue
            
            # Word or keyword
            word_match = re.match(r'[\w\.:]+', query[position:])
            if word_match:
                word = word_match.group(0)
                
                # Check for field prefix
                for prefix in self.FIELD_PREFIXES:
                    if word.lower().startswith(prefix):
                        tokens.append(Token(TokenType.FIELD, word, position))
                        break
                else:
                    # Check for citation prefix
                    for prefix in self.CITATION_PREFIXES:
                        if word.lower().startswith(prefix):
                            tokens.append(Token(TokenType.CITATION, word, position))
                            break
                    else:
                        # Check for keyword
                        upper = word.upper()
                        if upper in self.KEYWORDS:
                            tokens.append(Token(self.KEYWORDS[upper], word, position))
                        else:
                            tokens.append(Token(TokenType.TERM, word, position))
                
                position += len(word)
                continue
            
            # Unknown character, skip
            position += 1
        
        return tokens


class LegalQueryBuilder:
    """
    Builds Elasticsearch queries from parsed legal queries.
    """
    
    def __init__(self):
        self.lexer = LegalQueryLexer()
    
    def build_query(
        self,
        query_string: str,
        tenant_id: Optional[str] = None,
        search_fields: List[str] = None
    ) -> dict:
        """
        Build Elasticsearch query from legal query string.
        
        Applies concepts:
        - Query vs Filter context (Day 3)
        - Multi-match for field boosting (Day 3)
        - Span queries for proximity (Day 4)
        """
        
        if not search_fields:
            search_fields = ["content", "title^2", "summary^1.5"]
        
        tokens = self.lexer.tokenize(query_string)
        
        # Parse tokens into ES query
        es_query = self._parse_expression(tokens, search_fields)
        
        # Wrap with filters
        final_query = {
            "query": {
                "bool": {
                    "must": [es_query],
                    "filter": []
                }
            }
        }
        
        # Add tenant isolation filter (critical for security!)
        if tenant_id:
            final_query["query"]["bool"]["filter"].append({
                "term": {"tenant_id": tenant_id}
            })
        
        return final_query
    
    def _parse_expression(
        self,
        tokens: List[Token],
        fields: List[str]
    ) -> dict:
        """Parse tokens into ES query structure."""
        
        if not tokens:
            return {"match_all": {}}
        
        # Simple case: single term
        if len(tokens) == 1:
            return self._token_to_query(tokens[0], fields)
        
        # Handle boolean operators
        must_clauses = []
        should_clauses = []
        must_not_clauses = []
        
        i = 0
        current_operator = TokenType.AND  # Default is AND
        
        while i < len(tokens):
            token = tokens[i]
            
            if token.type == TokenType.AND:
                current_operator = TokenType.AND
                i += 1
                continue
            
            if token.type == TokenType.OR:
                current_operator = TokenType.OR
                i += 1
                continue
            
            if token.type == TokenType.NOT:
                # Next token goes to must_not
                if i + 1 < len(tokens):
                    must_not_clauses.append(
                        self._token_to_query(tokens[i + 1], fields)
                    )
                    i += 2
                    continue
            
            if token.type == TokenType.LPAREN:
                # Find matching paren and recurse
                paren_depth = 1
                j = i + 1
                while j < len(tokens) and paren_depth > 0:
                    if tokens[j].type == TokenType.LPAREN:
                        paren_depth += 1
                    elif tokens[j].type == TokenType.RPAREN:
                        paren_depth -= 1
                    j += 1
                
                sub_tokens = tokens[i + 1:j - 1]
                sub_query = self._parse_expression(sub_tokens, fields)
                
                if current_operator == TokenType.OR:
                    should_clauses.append(sub_query)
                else:
                    must_clauses.append(sub_query)
                
                i = j
                continue
            
            if token.type == TokenType.PROXIMITY:
                # Proximity: previous term /N next term
                if must_clauses and i + 1 < len(tokens):
                    prev_query = must_clauses.pop()
                    next_query = self._token_to_query(tokens[i + 1], fields)
                    
                    proximity = self._parse_proximity(token.value)
                    span_query = self._build_span_near(
                        prev_query, next_query, proximity, fields[0]
                    )
                    must_clauses.append(span_query)
                    i += 2
                    continue
            
            # Regular term/phrase
            term_query = self._token_to_query(token, fields)
            
            if current_operator == TokenType.OR:
                should_clauses.append(term_query)
            else:
                must_clauses.append(term_query)
            
            i += 1
        
        # Build final bool query
        bool_query = {"bool": {}}
        
        if must_clauses:
            bool_query["bool"]["must"] = must_clauses
        if should_clauses:
            bool_query["bool"]["should"] = should_clauses
            bool_query["bool"]["minimum_should_match"] = 1
        if must_not_clauses:
            bool_query["bool"]["must_not"] = must_not_clauses
        
        return bool_query
    
    def _token_to_query(self, token: Token, fields: List[str]) -> dict:
        """Convert a single token to ES query."""
        
        if token.type == TokenType.TERM:
            return {
                "multi_match": {
                    "query": token.value,
                    "fields": fields,
                    "type": "best_fields"
                }
            }
        
        if token.type == TokenType.PHRASE:
            return {
                "multi_match": {
                    "query": token.value,
                    "fields": fields,
                    "type": "phrase"
                }
            }
        
        if token.type == TokenType.FIELD:
            # Parse field:value
            field, value = token.value.split(':', 1)
            return {
                "match": {field: value}
            }
        
        if token.type == TokenType.CITATION:
            # Handle citation search
            prefix, citation = token.value.split(':', 1)
            if prefix.lower() == 'cites':
                return {
                    "term": {"citations.cited_id": citation}
                }
            elif prefix.lower() == 'citedby':
                return {
                    "term": {"citations.citing_id": citation}
                }
        
        if token.type == TokenType.RANGE:
            # Parse [2020 TO 2024]
            match = re.match(r'\[(\S+)\s+TO\s+(\S+)\]', token.value)
            if match:
                return {
                    "range": {
                        "date": {
                            "gte": match.group(1),
                            "lte": match.group(2)
                        }
                    }
                }
        
        return {"match_all": {}}
    
    def _parse_proximity(self, value: str) -> int:
        """Parse proximity value to integer."""
        if value == 's':
            return 15  # Same sentence ≈ 15 words
        if value == 'p':
            return 50  # Same paragraph ≈ 50 words
        return int(value)
    
    def _build_span_near(
        self,
        query1: dict,
        query2: dict,
        slop: int,
        field: str
    ) -> dict:
        """Build span_near query for proximity search."""
        
        # Extract terms from queries
        term1 = self._extract_term(query1)
        term2 = self._extract_term(query2)
        
        return {
            "span_near": {
                "clauses": [
                    {"span_term": {field: term1}},
                    {"span_term": {field: term2}}
                ],
                "slop": slop,
                "in_order": False
            }
        }
    
    def _extract_term(self, query: dict) -> str:
        """Extract term from a query structure."""
        if "multi_match" in query:
            return query["multi_match"]["query"]
        if "match" in query:
            return list(query["match"].values())[0]
        return ""


# =============================================================================
# Usage Example
# =============================================================================

async def search_legal_documents(
    query_string: str,
    tenant_id: str,
    filters: dict = None
) -> dict:
    """
    Execute a legal search query.
    
    Example queries:
    - "(negligence /10 liability) AND damages"
    - "breach AND contract NOT employment"
    - "cites:410 U.S. 113"
    - "judge:Ginsburg AND civil rights"
    """
    
    builder = LegalQueryBuilder()
    
    es_query = builder.build_query(
        query_string,
        tenant_id=tenant_id,
        search_fields=["content", "title^3", "summary^2", "headnotes^2"]
    )
    
    # Add additional filters
    if filters:
        if filters.get("jurisdiction"):
            es_query["query"]["bool"]["filter"].append({
                "term": {"jurisdiction": filters["jurisdiction"]}
            })
        
        if filters.get("date_from") or filters.get("date_to"):
            date_range = {}
            if filters.get("date_from"):
                date_range["gte"] = filters["date_from"]
            if filters.get("date_to"):
                date_range["lte"] = filters["date_to"]
            es_query["query"]["bool"]["filter"].append({
                "range": {"decision_date": date_range}
            })
        
        if filters.get("document_type"):
            es_query["query"]["bool"]["filter"].append({
                "term": {"doc_type": filters["document_type"]}
            })
    
    # Add highlighting for snippets
    es_query["highlight"] = {
        "fields": {
            "content": {
                "fragment_size": 200,
                "number_of_fragments": 3
            },
            "headnotes": {}
        },
        "pre_tags": ["<mark>"],
        "post_tags": ["</mark>"]
    }
    
    return es_query

Edge Cases

Interviewer: "What happens if a user's query has unbalanced parentheses or invalid syntax?"

You: "Good question. We handle this gracefully:"

QUERY ERROR HANDLING

1. UNBALANCED PARENTHESES
   Input: "(breach AND contract"
   
   Handling:
   - Detect during parsing
   - Auto-close open parens at end
   - OR: Fall back to simple search
   - Show warning to user: "Query may not be parsed as intended"

2. INVALID PROXIMITY
   Input: "negligence /abc liability"
   
   Handling:
   - Ignore invalid proximity operator
   - Treat as "negligence AND abc AND liability"
   - Log for analytics (common user errors)

3. EMPTY QUOTES
   Input: 'contract AND ""'
   
   Handling:
   - Ignore empty phrase
   - Process rest of query normally

4. DEEPLY NESTED
   Input: "((((term))))"
   
   Handling:
   - Set max nesting depth (10)
   - Beyond that, flatten to simple AND

Deep Dive 2: Multi-Tenant Isolation (Week 7, Day 1 & Day 5 — Security)

Interviewer: "You mentioned tenant isolation for private documents. How do you ensure a law firm can never see another firm's documents?"

You: "This is absolutely critical for a legal platform. A data leak between law firms would be catastrophic. Let me explain our defense-in-depth approach."

The Problem

TENANT ISOLATION REQUIREMENTS

Law Firm A uploads confidential merger contract.
Law Firm B must NEVER be able to:
  - Search and find this document
  - Access it by ID if they guess
  - See it in autocomplete suggestions
  - See it in "related documents"
  - See aggregate stats that leak info

Even if:
  - There's a bug in our code
  - An attacker manipulates API calls
  - A developer makes a mistake
  - The query somehow bypasses filters

This is a ZERO TOLERANCE requirement.

The Solution

You: "We implement multiple layers of isolation:"

MULTI-LAYER TENANT ISOLATION

┌────────────────────────────────────────────────────────────────────────────┐
│                    DEFENSE IN DEPTH                                        │
│                                                                            │
│  LAYER 1: SEPARATE INDICES                                                 │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐             │
│  │ private_docs_   │  │ private_docs_   │  │ private_docs_   │             │
│  │ firm_abc123     │  │ firm_def456     │  │ firm_ghi789     │             │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘             │
│  Each tenant has own index - no cross-index queries possible               │
│                                                                            │
│  LAYER 2: QUERY FILTER INJECTION                                           │
│  Every query automatically includes:                                       │
│  {"filter": {"term": {"tenant_id": "firm_abc123"}}}                        │
│  Applied at service layer, cannot be bypassed by API                       │
│                                                                            │
│  LAYER 3: FIELD-LEVEL SECURITY                                             │
│  Elasticsearch document-level security:                                    │
│  User can only see docs where tenant_id matches their JWT                  │
│                                                                            │
│  LAYER 4: API GATEWAY VALIDATION                                           │
│  - Extract tenant_id from JWT token                                        │
│  - Inject into request context                                             │
│  - Cannot be overridden by request body                                    │
│                                                                            │
│  LAYER 5: AUDIT LOGGING                                                    │
│  Every search logged with:                                                 │
│  - User ID, Tenant ID, Query, Results returned                             │
│  - Anomaly detection for cross-tenant access attempts                      │
│                                                                            │
└────────────────────────────────────────────────────────────────────────────┘

Implementation

# security/tenant_isolation.py

"""
Multi-tenant isolation for legal document search.

Applies: Week 7, Day 1 - Partitioning (tenant per index)
         Week 7, Day 5 - Security and Operations
"""

from dataclasses import dataclass
from typing import Optional, List
from functools import wraps
import logging

logger = logging.getLogger(__name__)


@dataclass
class TenantContext:
    """Immutable tenant context from JWT."""
    tenant_id: str
    user_id: str
    roles: List[str]
    
    @property
    def private_index(self) -> str:
        """Get tenant-specific private index name."""
        return f"private_docs_{self.tenant_id}"
    
    @property
    def can_search_public(self) -> bool:
        """Check if user can search public documents."""
        return "search:public" in self.roles
    
    @property
    def can_search_private(self) -> bool:
        """Check if user can search private documents."""
        return "search:private" in self.roles


class TenantIsolationMiddleware:
    """
    Middleware to enforce tenant isolation.
    
    This runs BEFORE any search service code.
    """
    
    def __init__(self, jwt_validator):
        self.jwt_validator = jwt_validator
    
    async def __call__(self, request):
        """Extract and validate tenant context."""
        
        # Get token from header
        auth_header = request.headers.get("Authorization", "")
        if not auth_header.startswith("Bearer "):
            raise AuthenticationError("Missing token")
        
        token = auth_header[7:]
        
        # Validate and extract claims
        claims = await self.jwt_validator.validate(token)
        
        # Create immutable context
        context = TenantContext(
            tenant_id=claims["tenant_id"],
            user_id=claims["user_id"],
            roles=claims.get("roles", [])
        )
        
        # Attach to request (cannot be modified)
        request.state.tenant = context
        
        # Log for audit
        logger.info(
            "Request authenticated",
            extra={
                "tenant_id": context.tenant_id,
                "user_id": context.user_id,
                "path": request.url.path
            }
        )
        
        return context


def require_tenant_isolation(func):
    """
    Decorator ensuring tenant isolation in search functions.
    """
    
    @wraps(func)
    async def wrapper(*args, **kwargs):
        # Get tenant context (must exist)
        tenant: TenantContext = kwargs.get("tenant")
        if not tenant:
            raise SecurityError("Tenant context required")
        
        # Verify tenant_id is not overridable in query
        query = kwargs.get("query", {})
        if _contains_tenant_override(query):
            logger.warning(
                "Blocked tenant override attempt",
                extra={
                    "tenant_id": tenant.tenant_id,
                    "attempted_query": query
                }
            )
            raise SecurityError("Tenant override not allowed")
        
        return await func(*args, **kwargs)
    
    return wrapper


def _contains_tenant_override(query: dict) -> bool:
    """Check if query tries to override tenant filter."""
    
    query_str = str(query).lower()
    
    # Block any attempt to query tenant_id field directly
    if "tenant_id" in query_str:
        return True
    
    # Block attempts to query other tenant indices
    if "private_docs_" in query_str:
        return True
    
    return False


class IsolatedSearchService:
    """
    Search service with built-in tenant isolation.
    """
    
    def __init__(self, es_client):
        self.es = es_client
        self.public_index = "public_legal_docs"
    
    @require_tenant_isolation
    async def search(
        self,
        query_string: str,
        tenant: TenantContext,
        include_public: bool = True,
        include_private: bool = True,
        filters: dict = None
    ) -> dict:
        """
        Execute search with automatic tenant isolation.
        
        The tenant context comes from JWT, not from the request body.
        It cannot be overridden by the caller.
        """
        
        indices = []
        
        # Public documents (if permitted and requested)
        if include_public and tenant.can_search_public:
            indices.append(self.public_index)
        
        # Private documents (tenant-specific index)
        if include_private and tenant.can_search_private:
            indices.append(tenant.private_index)
        
        if not indices:
            return {"hits": [], "total": 0}
        
        # Build query with MANDATORY tenant filter
        es_query = self._build_isolated_query(
            query_string,
            tenant,
            filters
        )
        
        # Execute search
        response = await self.es.search(
            index=",".join(indices),
            body=es_query
        )
        
        # Audit log
        await self._audit_log(
            tenant=tenant,
            query=query_string,
            indices=indices,
            result_count=response["hits"]["total"]["value"]
        )
        
        return self._process_response(response)
    
    def _build_isolated_query(
        self,
        query_string: str,
        tenant: TenantContext,
        filters: dict
    ) -> dict:
        """Build query with mandatory tenant isolation."""
        
        # Parse the user's query
        parsed_query = LegalQueryBuilder().build_query(
            query_string,
            tenant_id=None,  # We add filter separately
            search_fields=["content", "title^3", "summary^2"]
        )
        
        # CRITICAL: Add tenant filter that CANNOT be bypassed
        # This filter is added by the service, not from user input
        tenant_filter = {
            "bool": {
                "should": [
                    # Public documents (no tenant restriction)
                    {"term": {"is_public": True}},
                    # Private documents (must match tenant)
                    {
                        "bool": {
                            "must": [
                                {"term": {"is_public": False}},
                                {"term": {"tenant_id": tenant.tenant_id}}
                            ]
                        }
                    }
                ],
                "minimum_should_match": 1
            }
        }
        
        # Inject filter
        parsed_query["query"]["bool"]["filter"].append(tenant_filter)
        
        # Add any additional filters
        if filters:
            for key, value in filters.items():
                if key in ["jurisdiction", "doc_type", "court"]:
                    parsed_query["query"]["bool"]["filter"].append({
                        "term": {key: value}
                    })
        
        return parsed_query
    
    async def _audit_log(
        self,
        tenant: TenantContext,
        query: str,
        indices: List[str],
        result_count: int
    ):
        """Log search for audit trail."""
        
        await self.es.index(
            index="search_audit_log",
            document={
                "timestamp": "now",
                "tenant_id": tenant.tenant_id,
                "user_id": tenant.user_id,
                "query": query,
                "indices_searched": indices,
                "result_count": result_count,
                "client_ip": "extracted_from_request"
            }
        )
    
    def _process_response(self, response: dict) -> dict:
        """Process ES response, ensuring no cross-tenant leakage."""
        
        hits = []
        for hit in response["hits"]["hits"]:
            # Double-check: never return tenant_id in response
            source = hit["_source"]
            if "tenant_id" in source:
                del source["tenant_id"]
            
            hits.append({
                "id": hit["_id"],
                "score": hit["_score"],
                "source": source,
                "highlights": hit.get("highlight", {})
            })
        
        return {
            "hits": hits,
            "total": response["hits"]["total"]["value"]
        }

Verification Tests

You: "We also have automated tests that verify isolation:"

# tests/test_tenant_isolation.py

"""
Critical security tests for tenant isolation.

These tests MUST pass before any deployment.
"""

import pytest


class TestTenantIsolation:
    """Tests that verify tenant isolation cannot be bypassed."""
    
    async def test_cannot_search_other_tenant_documents(
        self,
        search_service,
        tenant_a,
        tenant_b
    ):
        """Tenant A cannot find Tenant B's documents."""
        
        # Tenant B uploads a document
        doc = await upload_document(
            tenant=tenant_b,
            content="This is a secret merger agreement"
        )
        
        # Tenant A searches for it
        results = await search_service.search(
            query_string="secret merger agreement",
            tenant=tenant_a,
            include_private=True
        )
        
        # Must NOT find it
        assert doc["id"] not in [h["id"] for h in results["hits"]]
    
    async def test_cannot_access_by_id_guess(
        self,
        document_service,
        tenant_a,
        tenant_b
    ):
        """Tenant A cannot access Tenant B's document by ID."""
        
        # Tenant B's document
        doc = await upload_document(tenant=tenant_b, content="Secret")
        
        # Tenant A tries to access by ID
        with pytest.raises(NotFoundError):
            await document_service.get_document(
                document_id=doc["id"],
                tenant=tenant_a
            )
    
    async def test_cannot_override_tenant_filter(
        self,
        search_service,
        tenant_a
    ):
        """Query cannot override tenant filter."""
        
        # Try to inject tenant_id in query
        with pytest.raises(SecurityError):
            await search_service.search(
                query_string="contract",
                tenant=tenant_a,
                filters={"tenant_id": "other_tenant"}  # Should be blocked
            )
    
    async def test_cannot_query_other_tenant_index(
        self,
        search_service,
        tenant_a
    ):
        """Cannot query another tenant's index directly."""
        
        # Try to search other tenant's index
        with pytest.raises(SecurityError):
            await search_service.search_raw(
                index="private_docs_other_tenant",
                query={"match_all": {}},
                tenant=tenant_a
            )
    
    async def test_public_documents_visible_to_all(
        self,
        search_service,
        tenant_a,
        tenant_b
    ):
        """Public documents are searchable by all tenants."""
        
        # Search public case law
        results_a = await search_service.search(
            query_string="Roe v. Wade",
            tenant=tenant_a,
            include_public=True
        )
        
        results_b = await search_service.search(
            query_string="Roe v. Wade",
            tenant=tenant_b,
            include_public=True
        )
        
        # Both should find the same public documents
        assert results_a["total"] > 0
        assert results_a["total"] == results_b["total"]

Deep Dive 3: Citation Network Search (Week 7, Day 2 & Day 4 — Indexing & Advanced Features)

Interviewer: "Tell me about the citation search. How do you build and query the citation network?"

You: "Citation analysis is crucial for legal research. Lawyers need to find all cases that cite a given case, understand if it's still 'good law', and explore citation chains."

The Problem

CITATION SEARCH REQUIREMENTS

Given a case like "Brown v. Board of Education":

1. CITING CASES
   Find all cases that cite this case
   Expected: Thousands of results

2. CITED BY
   Find all cases this case cites
   Expected: Dozens of results

3. CITATION DEPTH
   Find cases citing cases that cite this case
   "2-hop" citation network

4. CITATION TREATMENT
   How was the case cited?
   - Followed (positive)
   - Distinguished (neutral)
   - Overruled (negative)
   - Mentioned (neutral)

5. CITATION VALIDITY
   Is this case still "good law"?
   Has it been overruled or limited?

The Solution

CITATION DATA MODEL

┌────────────────────────────────────────────────────────────────────────────┐
│                    CITATION INDEX STRUCTURE                                │
│                                                                            │
│  Document: Brown v. Board of Education                                     │
│  ├── ID: "347_us_483"                                                      │
│  ├── citations_outbound: [                                                 │
│  │     {"id": "163_us_537", "treatment": "overruled"},                     │
│  │     {"id": "305_us_337", "treatment": "followed"},                      │
│  │     ...                                                                 │
│  │   ]                                                                     │
│  ├── citation_count_inbound: 15234                                         │
│  └── treatment_summary: {                                                  │
│        "followed": 12000,                                                  │
│        "distinguished": 2500,                                              │
│        "mentioned": 700,                                                   │
│        "overruled": 0                                                      │
│      }                                                                     │
│                                                                            │
│  Separate Citation Edges Index:                                            │
│  ├── {citing_id, cited_id, treatment, context_snippet, date}               │
│  └── Enables efficient graph queries                                       │
│                                                                            │
└────────────────────────────────────────────────────────────────────────────┘

Implementation

# citations/citation_service.py

"""
Citation network search and analysis.

Applies: Week 7, Day 2 - Indexing Pipeline (citation extraction)
         Week 7, Day 4 - Advanced Features (graph-like queries)
"""

from dataclasses import dataclass
from typing import List, Optional, Dict
from enum import Enum


class CitationTreatment(Enum):
    """How a case was cited."""
    FOLLOWED = "followed"
    DISTINGUISHED = "distinguished"
    OVERRULED = "overruled"
    QUESTIONED = "questioned"
    MENTIONED = "mentioned"


@dataclass
class Citation:
    """A citation between two legal documents."""
    citing_id: str
    citing_name: str
    cited_id: str
    cited_name: str
    treatment: CitationTreatment
    context: str  # Surrounding text
    date: str


@dataclass
class CitationAnalysis:
    """Analysis of a document's citation profile."""
    document_id: str
    document_name: str
    total_citations: int
    treatment_breakdown: Dict[str, int]
    is_good_law: bool
    overruled_by: Optional[str]
    key_citing_cases: List[dict]


class CitationService:
    """
    Manages citation network queries.
    """
    
    def __init__(self, es_client):
        self.es = es_client
        self.docs_index = "legal_documents"
        self.citations_index = "citation_edges"
    
    async def find_citing_cases(
        self,
        document_id: str,
        treatment: Optional[CitationTreatment] = None,
        limit: int = 100
    ) -> List[dict]:
        """
        Find all cases that cite a given document.
        
        This is the "cited by" query - who cites this case?
        """
        
        query = {
            "query": {
                "bool": {
                    "must": [
                        {"term": {"cited_id": document_id}}
                    ]
                }
            },
            "sort": [
                {"citation_date": "desc"}
            ],
            "size": limit
        }
        
        if treatment:
            query["query"]["bool"]["must"].append({
                "term": {"treatment": treatment.value}
            })
        
        response = await self.es.search(
            index=self.citations_index,
            body=query
        )
        
        # Enrich with document details
        citing_ids = [hit["_source"]["citing_id"] for hit in response["hits"]["hits"]]
        
        docs = await self._get_documents(citing_ids)
        
        results = []
        for hit in response["hits"]["hits"]:
            citation = hit["_source"]
            doc = docs.get(citation["citing_id"], {})
            
            results.append({
                "document_id": citation["citing_id"],
                "name": doc.get("name", citation["citing_id"]),
                "court": doc.get("court"),
                "date": doc.get("decision_date"),
                "treatment": citation["treatment"],
                "context": citation.get("context_snippet", "")
            })
        
        return results
    
    async def find_cited_cases(
        self,
        document_id: str,
        limit: int = 100
    ) -> List[dict]:
        """
        Find all cases cited by a given document.
        
        This is the "cites" query - what does this case cite?
        """
        
        query = {
            "query": {
                "term": {"citing_id": document_id}
            },
            "size": limit
        }
        
        response = await self.es.search(
            index=self.citations_index,
            body=query
        )
        
        # Enrich with document details
        cited_ids = [hit["_source"]["cited_id"] for hit in response["hits"]["hits"]]
        
        docs = await self._get_documents(cited_ids)
        
        results = []
        for hit in response["hits"]["hits"]:
            citation = hit["_source"]
            doc = docs.get(citation["cited_id"], {})
            
            results.append({
                "document_id": citation["cited_id"],
                "name": doc.get("name", citation["cited_id"]),
                "court": doc.get("court"),
                "date": doc.get("decision_date"),
                "treatment": citation["treatment"]
            })
        
        return results
    
    async def analyze_citation_profile(
        self,
        document_id: str
    ) -> CitationAnalysis:
        """
        Analyze the citation profile of a document.
        
        Determines if case is "good law" and how it's been treated.
        """
        
        # Get the document
        doc = await self._get_document(document_id)
        
        # Get citation treatment aggregation
        agg_query = {
            "query": {
                "term": {"cited_id": document_id}
            },
            "size": 0,
            "aggs": {
                "treatments": {
                    "terms": {"field": "treatment"}
                },
                "recent_overruled": {
                    "filter": {"term": {"treatment": "overruled"}},
                    "aggs": {
                        "cases": {
                            "top_hits": {
                                "size": 1,
                                "sort": [{"citation_date": "desc"}]
                            }
                        }
                    }
                },
                "key_citations": {
                    "top_hits": {
                        "size": 10,
                        "sort": [{"citing_court_rank": "desc"}]
                    }
                }
            }
        }
        
        response = await self.es.search(
            index=self.citations_index,
            body=agg_query
        )
        
        aggs = response["aggregations"]
        
        # Build treatment breakdown
        treatment_breakdown = {
            bucket["key"]: bucket["doc_count"]
            for bucket in aggs["treatments"]["buckets"]
        }
        
        # Determine if good law
        overruled_hits = aggs["recent_overruled"]["cases"]["hits"]["hits"]
        is_overruled = len(overruled_hits) > 0
        overruled_by = None
        
        if is_overruled:
            overruled_by = overruled_hits[0]["_source"]["citing_id"]
        
        # Key citing cases
        key_cases = [
            {
                "id": hit["_source"]["citing_id"],
                "treatment": hit["_source"]["treatment"]
            }
            for hit in aggs["key_citations"]["hits"]["hits"]
        ]
        
        return CitationAnalysis(
            document_id=document_id,
            document_name=doc.get("name", ""),
            total_citations=response["hits"]["total"]["value"],
            treatment_breakdown=treatment_breakdown,
            is_good_law=not is_overruled,
            overruled_by=overruled_by,
            key_citing_cases=key_cases
        )
    
    async def find_citation_chain(
        self,
        document_id: str,
        depth: int = 2,
        limit_per_level: int = 10
    ) -> dict:
        """
        Find citation chain up to N levels deep.
        
        Example: Cases citing cases that cite the target case.
        """
        
        chain = {
            "root": document_id,
            "levels": []
        }
        
        current_ids = [document_id]
        
        for level in range(depth):
            # Find all cases citing the current level
            query = {
                "query": {
                    "terms": {"cited_id": current_ids}
                },
                "aggs": {
                    "citing_cases": {
                        "terms": {
                            "field": "citing_id",
                            "size": limit_per_level
                        }
                    }
                },
                "size": 0
            }
            
            response = await self.es.search(
                index=self.citations_index,
                body=query
            )
            
            citing_ids = [
                bucket["key"]
                for bucket in response["aggregations"]["citing_cases"]["buckets"]
            ]
            
            if not citing_ids:
                break
            
            chain["levels"].append({
                "depth": level + 1,
                "count": len(citing_ids),
                "sample_ids": citing_ids[:5]
            })
            
            current_ids = citing_ids
        
        return chain
    
    async def _get_document(self, document_id: str) -> dict:
        """Get a single document by ID."""
        try:
            response = await self.es.get(
                index=self.docs_index,
                id=document_id
            )
            return response["_source"]
        except:
            return {}
    
    async def _get_documents(self, document_ids: List[str]) -> Dict[str, dict]:
        """Get multiple documents by ID."""
        if not document_ids:
            return {}
        
        response = await self.es.mget(
            index=self.docs_index,
            body={"ids": document_ids}
        )
        
        return {
            doc["_id"]: doc["_source"]
            for doc in response["docs"]
            if doc.get("found")
        }


# =============================================================================
# Citation Extraction Pipeline
# =============================================================================

class CitationExtractor:
    """
    Extracts citations from legal document text.
    
    Applies: Week 7, Day 2 - Indexing Pipeline
    """
    
    # Patterns for different citation formats
    PATTERNS = {
        # US Supreme Court: 410 U.S. 113
        "us_reports": r"(\d+)\s+U\.S\.\s+(\d+)",
        
        # Federal Reporter: 123 F.2d 456
        "federal_reporter": r"(\d+)\s+F\.(2d|3d)?\s+(\d+)",
        
        # State reporters: 123 Cal.App.4th 456
        "state_reporter": r"(\d+)\s+([A-Z][a-z]+\.?\s*(?:App\.?)?\s*(?:\d+[a-z]+)?)\s+(\d+)",
        
        # Case names: Brown v. Board of Education
        "case_name": r"([A-Z][a-z]+)\s+v\.\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)",
    }
    
    def extract_citations(self, text: str) -> List[dict]:
        """Extract all citations from document text."""
        
        import re
        
        citations = []
        
        for pattern_name, pattern in self.PATTERNS.items():
            for match in re.finditer(pattern, text):
                # Get surrounding context
                start = max(0, match.start() - 100)
                end = min(len(text), match.end() + 100)
                context = text[start:end]
                
                # Detect treatment from context
                treatment = self._detect_treatment(context)
                
                citations.append({
                    "raw_citation": match.group(0),
                    "pattern_type": pattern_name,
                    "context": context,
                    "treatment": treatment,
                    "position": match.start()
                })
        
        return citations
    
    def _detect_treatment(self, context: str) -> str:
        """Detect how a citation is being used."""
        
        context_lower = context.lower()
        
        # Negative treatments
        if any(word in context_lower for word in 
               ["overruled", "overrule", "rejected", "abrogated"]):
            return "overruled"
        
        if any(word in context_lower for word in 
               ["questioned", "doubted", "criticized"]):
            return "questioned"
        
        # Neutral/distinguishing
        if any(word in context_lower for word in 
               ["distinguished", "distinguishing", "unlike"]):
            return "distinguished"
        
        # Positive
        if any(word in context_lower for word in 
               ["followed", "following", "accord", "see also", "affirmed"]):
            return "followed"
        
        # Default
        return "mentioned"

Deep Dive 4: Document Processing Pipeline (Week 7, Day 2 — CDC and Indexing)

Interviewer: "How do you handle document ingestion, especially for scanned documents that need OCR?"

You: "This is a critical part of the pipeline. Law firms often upload scanned contracts, old case files, and handwritten notes. We need robust processing."

The Pipeline

DOCUMENT PROCESSING PIPELINE

┌────────────────────────────────────────────────────────────────────────────┐
│                    DOCUMENT INGESTION FLOW                                 │
│                                                                            │
│  ┌───────────┐     ┌───────────┐     ┌───────────┐     ┌───────────┐       │
│  │  Upload   │────▶│  Triage   │────▶│  Process  │────▶│   Index   │       │
│  │  (API)    │     │  (Queue)  │     │ (Workers) │     │   (ES)    │       │
│  └───────────┘     └───────────┘     └───────────┘     └───────────┘       │
│                                                                            │
│  TRIAGE DECISIONS:                                                         │
│  ├── PDF (text-based) ──▶ Extract text directly                            │
│  ├── PDF (scanned) ──▶ OCR pipeline                                        │
│  ├── Word/DOCX ──▶ Apache Tika extraction                                  │
│  ├── Image (JPG/PNG) ──▶ OCR pipeline                                      │
│  └── Unknown ──▶ Tika with fallback                                        │
│                                                                            │
│  OCR PIPELINE:                                                             │
│  ┌───────────┐     ┌───────────┐     ┌───────────┐     ┌───────────┐       │
│  │  Image    │────▶│  Enhance  │────▶│  OCR      │────▶│  Post-    │       │
│  │  Extract  │     │  Quality  │     │ (Tessera) │     │  Process  │       │
│  └───────────┘     └───────────┘     └───────────┘     └───────────┘       │
│                     deskew,          multi-lang       spell check,         │
│                     denoise          confidence       legal terms          │
│                                                                            │
└────────────────────────────────────────────────────────────────────────────┘

Implementation

# ingestion/document_processor.py

"""
Document processing pipeline for legal documents.

Applies: Week 7, Day 2 - Indexing Pipeline
"""

from dataclasses import dataclass
from typing import Optional, List
from enum import Enum
import asyncio
import logging

logger = logging.getLogger(__name__)


class DocumentType(Enum):
    PDF_TEXT = "pdf_text"
    PDF_SCANNED = "pdf_scanned"
    WORD = "word"
    IMAGE = "image"
    UNKNOWN = "unknown"


@dataclass
class ProcessedDocument:
    """Result of document processing."""
    document_id: str
    tenant_id: str
    original_filename: str
    content: str
    metadata: dict
    citations: List[dict]
    page_count: int
    word_count: int
    ocr_confidence: Optional[float]
    processing_time_ms: int


class DocumentProcessor:
    """
    Processes uploaded documents for indexing.
    """
    
    def __init__(
        self,
        s3_client,
        tika_client,
        ocr_service,
        citation_extractor,
        kafka_producer
    ):
        self.s3 = s3_client
        self.tika = tika_client
        self.ocr = ocr_service
        self.citations = citation_extractor
        self.kafka = kafka_producer
    
    async def process_document(
        self,
        document_id: str,
        tenant_id: str,
        s3_key: str,
        filename: str
    ) -> ProcessedDocument:
        """
        Process a document through the full pipeline.
        """
        
        import time
        start_time = time.time()
        
        # Download from S3
        file_bytes = await self.s3.download(s3_key)
        
        # Determine document type
        doc_type = self._detect_document_type(filename, file_bytes)
        
        logger.info(
            f"Processing document {document_id}",
            extra={"type": doc_type.value, "size": len(file_bytes)}
        )
        
        # Extract content based on type
        if doc_type == DocumentType.PDF_TEXT:
            content, metadata = await self._extract_pdf_text(file_bytes)
            ocr_confidence = None
            
        elif doc_type == DocumentType.PDF_SCANNED:
            content, metadata, ocr_confidence = await self._process_scanned_pdf(
                file_bytes
            )
            
        elif doc_type == DocumentType.WORD:
            content, metadata = await self._extract_word(file_bytes)
            ocr_confidence = None
            
        elif doc_type == DocumentType.IMAGE:
            content, ocr_confidence = await self._process_image(file_bytes)
            metadata = {}
            
        else:
            # Fallback to Tika
            content, metadata = await self._extract_with_tika(file_bytes)
            ocr_confidence = None
        
        # Extract citations
        citations = self.citations.extract_citations(content)
        
        # Build processed document
        processing_time = int((time.time() - start_time) * 1000)
        
        processed = ProcessedDocument(
            document_id=document_id,
            tenant_id=tenant_id,
            original_filename=filename,
            content=content,
            metadata=metadata,
            citations=citations,
            page_count=metadata.get("page_count", 1),
            word_count=len(content.split()),
            ocr_confidence=ocr_confidence,
            processing_time_ms=processing_time
        )
        
        # Publish to Kafka for indexing
        await self._publish_for_indexing(processed)
        
        return processed
    
    def _detect_document_type(
        self,
        filename: str,
        file_bytes: bytes
    ) -> DocumentType:
        """Detect the type of document for processing."""
        
        ext = filename.lower().split(".")[-1]
        
        if ext in ("doc", "docx"):
            return DocumentType.WORD
        
        if ext in ("jpg", "jpeg", "png", "tiff", "bmp"):
            return DocumentType.IMAGE
        
        if ext == "pdf":
            # Check if PDF has extractable text
            if self._pdf_has_text(file_bytes):
                return DocumentType.PDF_TEXT
            else:
                return DocumentType.PDF_SCANNED
        
        return DocumentType.UNKNOWN
    
    def _pdf_has_text(self, file_bytes: bytes) -> bool:
        """Check if PDF contains extractable text or is scanned."""
        
        import fitz  # PyMuPDF
        
        doc = fitz.open(stream=file_bytes, filetype="pdf")
        
        # Check first few pages
        for page_num in range(min(3, len(doc))):
            page = doc[page_num]
            text = page.get_text()
            if len(text.strip()) > 100:
                return True
        
        return False
    
    async def _extract_pdf_text(self, file_bytes: bytes) -> tuple:
        """Extract text from text-based PDF."""
        
        import fitz
        
        doc = fitz.open(stream=file_bytes, filetype="pdf")
        
        content_parts = []
        for page in doc:
            content_parts.append(page.get_text())
        
        content = "\n\n".join(content_parts)
        
        metadata = {
            "page_count": len(doc),
            "title": doc.metadata.get("title", ""),
            "author": doc.metadata.get("author", ""),
            "creation_date": doc.metadata.get("creationDate", "")
        }
        
        return content, metadata
    
    async def _process_scanned_pdf(self, file_bytes: bytes) -> tuple:
        """Process scanned PDF through OCR."""
        
        import fitz
        
        doc = fitz.open(stream=file_bytes, filetype="pdf")
        
        content_parts = []
        confidences = []
        
        for page_num, page in enumerate(doc):
            # Extract page as image
            pix = page.get_pixmap(dpi=300)
            image_bytes = pix.tobytes("png")
            
            # OCR the image
            text, confidence = await self.ocr.process_image(image_bytes)
            
            content_parts.append(text)
            confidences.append(confidence)
        
        content = "\n\n".join(content_parts)
        avg_confidence = sum(confidences) / len(confidences) if confidences else 0
        
        metadata = {"page_count": len(doc)}
        
        return content, metadata, avg_confidence
    
    async def _process_image(self, file_bytes: bytes) -> tuple:
        """Process single image through OCR."""
        
        text, confidence = await self.ocr.process_image(file_bytes)
        return text, confidence
    
    async def _extract_word(self, file_bytes: bytes) -> tuple:
        """Extract content from Word document."""
        
        result = await self.tika.extract(file_bytes, "application/msword")
        
        return result["content"], result.get("metadata", {})
    
    async def _extract_with_tika(self, file_bytes: bytes) -> tuple:
        """Fallback extraction with Apache Tika."""
        
        result = await self.tika.extract(file_bytes)
        return result.get("content", ""), result.get("metadata", {})
    
    async def _publish_for_indexing(self, document: ProcessedDocument):
        """Publish processed document to Kafka for indexing."""
        
        event = {
            "event_type": "document.processed",
            "document_id": document.document_id,
            "tenant_id": document.tenant_id,
            "content": document.content,
            "metadata": document.metadata,
            "citations": document.citations,
            "stats": {
                "page_count": document.page_count,
                "word_count": document.word_count,
                "ocr_confidence": document.ocr_confidence,
                "processing_time_ms": document.processing_time_ms
            }
        }
        
        await self.kafka.produce(
            topic="documents.processed",
            key=document.document_id,
            value=event
        )

Phase 5: Scaling and Edge Cases (5 minutes)

Interviewer: "How would this system scale to 10x the document volume?"

Scaling Strategy

You: "Let me walk through the scaling vectors..."

SCALING FROM 100M TO 1B DOCUMENTS

CURRENT STATE (100M docs)
├── 12 data nodes (64GB RAM, 2TB SSD each)
├── 26 TB total storage with replicas
├── 300 QPS peak

10X SCALE (1B docs)
├── Storage: 260 TB with replicas
├── Data nodes: 60+ (or larger instances)
├── QPS: 3000 (if usage scales linearly)

SCALING APPROACH:

1. INDEX SHARDING STRATEGY
   Current: 24 primary shards
   Scaled: Split by document type + date
   ├── cases_federal_2020-2024 (5 shards)
   ├── cases_federal_2015-2019 (5 shards)
   ├── cases_state_california (3 shards)
   ├── statutes_federal (2 shards)
   └── private_docs_* (per tenant, 1-3 shards each)

2. TIERED STORAGE
   Hot (last 2 years): Fast SSD, 1 replica
   Warm (2-5 years): Standard SSD, 1 replica
   Cold (5+ years): HDD, 1 replica, fewer nodes
   
   Use Index Lifecycle Management (ILM) to move automatically.

3. QUERY ROUTING
   Route queries to relevant time-based indices
   User searching 2023 cases doesn't hit 2010 indices
   
4. READ REPLICAS FOR GEOGRAPHIC DISTRIBUTION
   Primary cluster: US East
   Read replica: US West
   Read replica: EU (for international firms)

Edge Cases

Interviewer: "What edge cases should we handle?"

You: "Several important ones for legal search:"

EDGE CASES

1. VERY LARGE DOCUMENTS
   Problem: 500-page Supreme Court opinions
   Solution: 
   ├── Split into sections for indexing
   ├── Store section metadata (page ranges)
   ├── Aggregate results by parent document
   └── Lazy-load full content

2. SPECIAL CHARACTERS IN LEGAL TEXT
   Problem: § (section), ¶ (paragraph), legal symbols
   Solution:
   ├── Custom character filter in analyzer
   ├── Map § → "section"
   ├── Preserve symbols in stored content
   └── Normalize for search

3. CITATION VARIATIONS
   Problem: Same case cited many ways
   "Brown v. Board", "Brown v. Bd. of Ed.", "347 U.S. 483"
   Solution:
   ├── Citation normalization during extraction
   ├── Canonical ID for each case
   ├── Synonym expansion for case names
   └── All variations map to same document

4. TENANT WITH MILLIONS OF DOCUMENTS
   Problem: One large law firm skews resources
   Solution:
   ├── Per-tenant resource limits
   ├── Multiple shards for large tenants
   ├── Query timeout enforcement
   └── Fair scheduling across tenants

5. SIMULTANEOUS UPDATES TO SAME DOCUMENT
   Problem: Multiple users editing annotations
   Solution:
   ├── Annotations stored separately from content
   ├── User-specific annotation layer
   ├── Optimistic locking for shared edits
   └── Content itself is immutable

Phase 6: Monitoring and Operations (5 minutes)

Interviewer: "How would you monitor this system in production?"

Key Metrics

You: "I'd track metrics at multiple levels..."

┌────────────────────────────────────────────────────────────────────────────┐
│                    LEGAL SEARCH MONITORING DASHBOARD                       │
│                                                                            │
│  SEARCH PERFORMANCE                                                        │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │ Query Latency p99           │ QPS                 │ Error Rate      │   │
│  │ ┌─────────────────────┐     │ ┌─────────────────┐ │ ┌─────────────┐ │   │
│  │ │       320ms         │     │ │      180        │ │ │    0.02%    │ │   │
│  │ │  Target: <500ms ✓   │     │ │  ▄▅▆▇▆▅▄▅▆▇     │ │ │  ▁▁▁▁▁▁▁▁▁  │ │   │
│  │ └─────────────────────┘     │ └─────────────────┘ │ └─────────────┘ │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                            │
│  DOCUMENT PROCESSING                                                       │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │ Processing Lag    │ Queue Depth      │ OCR Success Rate             │   │
│  │ ┌───────────────┐ │ ┌──────────────┐ │ ┌──────────────────────────┐ │   │
│  │ │     45s       │ │ │     23       │ │ │          98.5%           │ │   │
│  │ │ Target: <5m ✓ │ │ │ ▂▃▄▃▂▂▃▄▃▂   │ │ │ ████████████████████░░   │ │   │
│  │ └───────────────┘ │ └──────────────┘ │ └──────────────────────────┘ │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                            │
│  CLUSTER HEALTH                                                            │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │ Status: GREEN │ Nodes: 12/12 │ Shards: 100% │ Disk: 62%             │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                            │
│  TENANT ISOLATION VERIFICATION                                             │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │ Cross-tenant query attempts: 0 (last 24h) ✓                         │   │
│  │ Isolation test (automated): PASSED 2 minutes ago                    │   │
│  │ Audit log anomalies: 0 detected                                     │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                            │
│  SEARCH QUALITY                                                            │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │ Zero Result Rate: 3.2% │ Avg Results/Query: 45 │ Citation Hit: 89%  │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                            │
└────────────────────────────────────────────────────────────────────────────┘

Critical Alerts

ALERTING CONFIGURATION

CRITICAL (PagerDuty):
├── Cluster status RED for >1 minute
├── Search latency p99 >2s for >5 minutes
├── Any cross-tenant data access attempt
├── Document processing queue >10,000
├── Error rate >5%

WARNING (Slack):
├── Cluster status YELLOW for >10 minutes
├── Search latency p99 >1s for >15 minutes
├── OCR success rate <95%
├── Disk usage >80%
├── Zero result rate >10%

INFO (Dashboard only):
├── New tenant onboarded
├── Large document processed (>100 pages)
├── Unusual query patterns

Interview Conclusion

Interviewer: "Excellent work. You've covered a lot of ground — from complex query parsing to security to operations. A few final questions: What would you prioritize building first?"

You: "I'd prioritize in this order:

  1. Tenant isolation — Security is non-negotiable for a legal platform
  2. Basic search with boolean operators — Core functionality lawyers expect
  3. Document ingestion with OCR — Getting content into the system
  4. Citation extraction and linking — Differentiating feature for legal research
  5. Advanced features (proximity search, analytics) — Once the core is solid

The key insight is that legal search has unique requirements: complex queries, strict security, and domain-specific features like citations. A generic search solution wouldn't work here."

Interviewer: "Great. Any questions for me?"

You: "I'd love to hear how you currently handle the citation network — do you use a graph database, or is it all in Elasticsearch? And what's your experience with OCR accuracy on older legal documents?"


Concepts Applied Summary

Week 7 Concepts Used

Day Concept Application in This Design
Day 1 Inverted Index Full-text search for legal documents
Day 1 Text Analysis Legal-specific analyzers (§ symbols, citations)
Day 1 Document Modeling Separate indices for public/private/citations
Day 2 CDC Pipeline Document processing → Kafka → Elasticsearch
Day 2 Bulk Indexing Initial load of 50M court opinions
Day 2 Zero-Downtime Reindex Schema updates without service interruption
Day 3 Query vs Filter Boolean logic in query, tenant in filter
Day 3 BM25 Tuning Legal terminology relevance
Day 3 Function Scores Boost by citation count, recency
Day 4 Autocomplete Recent payees, frequent searches
Day 4 Synonyms Legal term synonyms ("contract"/"agreement")
Day 4 Multi-language N/A (English legal docs)
Day 5 Cluster Architecture Master/data/coordinating node separation
Day 5 Capacity Planning Storage and compute for 100M docs
Day 5 Monitoring Search quality, processing lag
Day 5 Disaster Recovery Snapshots, cross-region replication

Code Patterns Demonstrated

1. LEGAL QUERY PARSER
   - Custom lexer/parser for boolean + proximity
   - Converts legal syntax to ES query DSL
   
2. MULTI-TENANT ISOLATION
   - JWT-based tenant context
   - Mandatory filter injection
   - Per-tenant indices
   - Audit logging
   
3. CITATION NETWORK
   - Citation extraction from text
   - Treatment detection
   - Graph queries (citing, cited-by)
   
4. DOCUMENT PROCESSING
   - Type detection and routing
   - OCR pipeline for scanned docs
   - Kafka for async processing

Self-Assessment Checklist

After studying this capstone, you should be able to:

  • Design a search system for domain-specific content (legal, medical, etc.)
  • Implement complex query parsing beyond simple keyword search
  • Build multi-tenant search with strict isolation guarantees
  • Create a document processing pipeline with OCR support
  • Model citation/reference networks in a search index
  • Size an Elasticsearch cluster for specific requirements
  • Design monitoring for search quality and security
  • Handle edge cases like large documents and special characters
  • Explain trade-offs between different indexing strategies
  • Articulate security considerations for sensitive data

This capstone integrates all concepts from Week 7 of the System Design Mastery Series. The legal domain demonstrates how search systems must be customized for specific use cases while maintaining security, performance, and reliability.


Next Week Preview: Week 8 — Analytics Pipeline

We'll design a data pipeline from event ingestion to queryable analytics, covering streaming vs batch processing, data modeling for OLAP, and handling late-arriving data.