Himanshu Kukreja
0%
LearnSystem DesignWeek 8Query Optimization
Day 05

Week 8 — Day 5: Query Layer and Optimization

System Design Mastery Series — Analytics Pipeline Week


Preface

We've built an impressive analytics pipeline this week. Events flow through Kafka, get processed by Flink and Spark, land in a beautifully designed star schema, and late data is handled gracefully.

There's just one problem:

THE QUERY PERFORMANCE CRISIS

Marketing team runs a dashboard query:
  "Show revenue by product category, by region, by day
   for the last 90 days"

What happens:
├── Query scans 2.7 billion rows
├── Shuffles 50GB of data across cluster
├── Takes 47 seconds to return
└── Dashboard times out

Users complain:
├── "The dashboard is unusable"
├── "I can't get answers to simple questions"
├── "We're paying $50K/month and it's still slow"
└── "Can we just use Excel?"

The pipeline is working perfectly.
The query layer is broken.

Today, we'll learn to make queries over billions of rows return in milliseconds — without breaking the bank.


Part I: Foundations

Chapter 1: OLAP Database Landscape

1.1 Categories of OLAP Systems

OLAP DATABASE CATEGORIES

1. CLOUD DATA WAREHOUSES
   ├── Snowflake
   ├── Google BigQuery
   ├── Amazon Redshift
   ├── Azure Synapse
   └── Databricks SQL
   
   Characteristics:
   ├── Managed, serverless (mostly)
   ├── Separation of storage and compute
   ├── Pay per query or compute time
   ├── Best for: Ad-hoc analytics, SQL users


2. REAL-TIME OLAP DATABASES
   ├── Apache Druid
   ├── ClickHouse
   ├── Apache Pinot
   ├── StarRocks
   └── Apache Doris
   
   Characteristics:
   ├── Sub-second queries on fresh data
   ├── Columnar storage with indexing
   ├── Designed for high concurrency
   ├── Best for: Real-time dashboards, user-facing analytics


3. DATA LAKE QUERY ENGINES
   ├── Trino (Presto)
   ├── Apache Spark SQL
   ├── Dremio
   └── Starburst
   
   Characteristics:
   ├── Query data in place (S3, HDFS)
   ├── Schema-on-read flexibility
   ├── Federated queries across sources
   ├── Best for: Data exploration, cross-system queries


4. EMBEDDED ANALYTICS
   ├── DuckDB
   ├── SQLite (with extensions)
   └── Apache DataFusion
   
   Characteristics:
   ├── Runs in-process
   ├── No server to manage
   ├── Great for local analytics
   ├── Best for: Edge computing, embedded BI

1.2 Choosing the Right Database

DECISION MATRIX

                    │ Latency │ Concurrency │ Freshness │ Cost    │ Complexity
────────────────────┼─────────┼─────────────┼───────────┼─────────┼───────────
Snowflake           │ Seconds │ Medium      │ Near-RT   │ High    │ Low
BigQuery            │ Seconds │ Medium      │ Streaming │ Medium  │ Low
ClickHouse          │ Millis  │ High        │ Real-time │ Medium  │ Medium
Druid               │ Millis  │ Very High   │ Real-time │ Medium  │ High
Trino               │ Seconds │ Medium      │ On-demand │ Low     │ Medium
DuckDB              │ Millis  │ Single      │ Batch     │ Free    │ Very Low


USE CASE MAPPING:

"Executive dashboard, 10 users, daily refresh"
  → Snowflake or BigQuery (simple, managed)

"Customer-facing analytics, 10K users, real-time"
  → ClickHouse or Druid (low latency, high concurrency)

"Ad-hoc exploration across data lake"
  → Trino (flexible, federated)

"Embedded analytics in application"
  → DuckDB (in-process, no infra)

"Real-time metrics for operations"
  → Druid or Pinot (sub-second, streaming ingestion)

1.3 Architecture Patterns

OLAP ARCHITECTURE PATTERNS

PATTERN 1: SINGLE WAREHOUSE
────────────────────────────
┌─────────────┐    ┌─────────────┐
│   Sources   │───▶│  Snowflake  │───▶ Dashboards
└─────────────┘    └─────────────┘

Simple, works for most companies.
Limitation: Cost scales with query volume.


PATTERN 2: LAKE + WAREHOUSE
────────────────────────────
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Sources   │───▶│  Data Lake  │───▶│  Warehouse  │───▶ Dashboards
└─────────────┘    │   (S3)      │    │ (Snowflake) │
                   └─────────────┘    └─────────────┘
                          │
                          ▼
                   ┌─────────────┐
                   │   Trino     │───▶ Ad-hoc queries
                   └─────────────┘

Lake for cheap storage, warehouse for performance.


PATTERN 3: LAMBDA QUERY LAYER
─────────────────────────────
┌─────────────┐    ┌─────────────┐
│ Real-time   │───▶│ClickHouse  │───▶ Real-time dashboards
│ (Kafka)     │    └─────────────┘
└─────────────┘
       │
       ▼
┌─────────────┐    ┌─────────────┐
│ Batch       │───▶│  BigQuery   │───▶ Historical analysis
│ (S3)        │    └─────────────┘
└─────────────┘

Different engines for different latency needs.


PATTERN 4: SEMANTIC LAYER
─────────────────────────
┌─────────────┐
│   Sources   │
└──────┬──────┘
       │
       ▼
┌─────────────┐
│ Semantic    │  (dbt, Cube.js, AtScale)
│   Layer     │  Metrics definitions, caching
└──────┬──────┘
       │
   ┌───┴───┐
   ▼       ▼
┌─────┐ ┌─────┐
│ DW  │ │OLAP │
└─────┘ └─────┘

Consistent metrics across query engines.

Chapter 2: Query Optimization Fundamentals

2.1 Why Queries Are Slow

ANATOMY OF A SLOW QUERY

Query:
  SELECT category, region, SUM(revenue)
  FROM fact_sales f
  JOIN dim_product p ON f.product_key = p.product_key
  JOIN dim_store s ON f.store_key = s.store_key
  WHERE f.date_key BETWEEN 20240101 AND 20240331
  GROUP BY category, region

What makes it slow:

1. DATA VOLUME
   ├── 2.7B rows in fact_sales
   ├── Even with date filter: 675M rows for Q1
   └── Raw scan: 50GB+ of data

2. JOINS
   ├── fact_sales → dim_product (1M products)
   ├── fact_sales → dim_store (10K stores)
   └── Each join requires shuffle/broadcast

3. AGGREGATION
   ├── GROUP BY creates many groups
   ├── Partial aggregates on each node
   └── Final merge on coordinator

4. NO OPTIMIZATION
   ├── No partitioning → full scan
   ├── No pre-aggregation → compute from scratch
   ├── No indexes → linear scan
   └── No caching → repeat work every query

2.2 Optimization Techniques Overview

QUERY OPTIMIZATION TECHNIQUES

REDUCE DATA SCANNED
├── Partitioning (skip irrelevant files)
├── Clustering/Sorting (data locality)
├── Column pruning (read only needed columns)
├── Predicate pushdown (filter early)
└── Bloom filters (skip blocks without matches)

REDUCE COMPUTATION
├── Pre-aggregation (rollup tables)
├── Materialized views (precomputed results)
├── Approximate algorithms (HyperLogLog, sketches)
└── Sampling (analyze subset)

REDUCE LATENCY
├── Caching (query results, metadata)
├── Concurrency (parallel execution)
├── Vectorization (SIMD operations)
└── Pipelining (stream results)

REDUCE COST
├── Compression (smaller storage/transfer)
├── Tiered storage (hot/warm/cold)
├── Query scheduling (off-peak)
└── Result caching (don't recompute)

Chapter 3: Partitioning and Clustering

3.1 Partitioning Strategies

# query_optimization/partitioning.py

"""
Partitioning strategies for OLAP tables.

Partitioning divides data into manageable chunks that can be
queried independently. Good partitioning = orders of magnitude
faster queries.
"""

from dataclasses import dataclass
from typing import List, Optional
from datetime import date, timedelta


@dataclass
class PartitionConfig:
    """Configuration for table partitioning."""
    column: str
    type: str  # "range", "hash", "list"
    granularity: Optional[str] = None  # For time: "day", "month", "year"


PARTITIONING_STRATEGIES = """
TIME-BASED PARTITIONING (Most Common)
─────────────────────────────────────

Daily partitions:
  fact_sales/
  ├── date=2024-01-01/
  ├── date=2024-01-02/
  ├── date=2024-01-03/
  └── ...

Query: WHERE date BETWEEN '2024-01-01' AND '2024-01-31'
Effect: Read 31 partitions, skip 334 others

Choosing granularity:
├── Daily: Best for recent data queries
├── Monthly: Best for historical analysis
├── Yearly: Best for archival data
└── Hourly: Best for real-time systems (if needed)

Rule of thumb: 100MB - 1GB per partition


HASH PARTITIONING
─────────────────

Distribute by hash of column (e.g., user_id):
  fact_user_events/
  ├── user_hash=0/
  ├── user_hash=1/
  ├── ...
  └── user_hash=255/

Query: WHERE user_id = 'user_123'
Effect: Read 1 partition (where hash(user_123) lands)

Use when:
├── Point lookups are common
├── Need even distribution
└── No natural time dimension


COMPOSITE PARTITIONING
──────────────────────

Multiple partition levels:
  fact_sales/
  ├── year=2024/
  │   ├── month=01/
  │   │   ├── region=US/
  │   │   ├── region=EU/
  │   │   └── region=APAC/
  │   └── month=02/
  └── year=2023/

Query: WHERE year = 2024 AND month = 1 AND region = 'US'
Effect: Read single leaf partition

Warning: Can lead to too many small partitions!
"""


class PartitionPruner:
    """
    Determines which partitions to read for a query.
    """
    
    def __init__(self, partition_config: PartitionConfig):
        self.config = partition_config
    
    def get_partitions_for_query(
        self,
        filters: dict,
        all_partitions: List[str]
    ) -> List[str]:
        """
        Return only partitions needed for query filters.
        """
        
        if self.config.column not in filters:
            # No filter on partition column - read all
            return all_partitions
        
        filter_value = filters[self.config.column]
        
        if self.config.type == "range":
            return self._prune_range(filter_value, all_partitions)
        elif self.config.type == "list":
            return self._prune_list(filter_value, all_partitions)
        elif self.config.type == "hash":
            return self._prune_hash(filter_value, all_partitions)
        
        return all_partitions
    
    def _prune_range(self, filter_value, partitions: List[str]) -> List[str]:
        """Prune partitions for range filter."""
        
        # filter_value is tuple (start, end)
        start, end = filter_value
        
        matching = []
        for p in partitions:
            partition_value = self._extract_partition_value(p)
            if start <= partition_value <= end:
                matching.append(p)
        
        return matching
    
    def _prune_list(self, filter_value, partitions: List[str]) -> List[str]:
        """Prune partitions for IN list filter."""
        
        # filter_value is list of values
        matching = []
        for p in partitions:
            partition_value = self._extract_partition_value(p)
            if partition_value in filter_value:
                matching.append(p)
        
        return matching
    
    def _prune_hash(self, filter_value, partitions: List[str]) -> List[str]:
        """Prune partitions for hash-partitioned table."""
        
        # Calculate which partition the value hashes to
        hash_bucket = hash(filter_value) % len(partitions)
        return [partitions[hash_bucket]]
    
    def _extract_partition_value(self, partition_path: str):
        """Extract partition value from path."""
        # e.g., "date=2024-01-15" -> "2024-01-15"
        return partition_path.split("=")[1]


def estimate_partition_savings(
    total_partitions: int,
    partitions_scanned: int,
    avg_partition_size_gb: float
) -> dict:
    """Estimate savings from partition pruning."""
    
    full_scan_gb = total_partitions * avg_partition_size_gb
    actual_scan_gb = partitions_scanned * avg_partition_size_gb
    
    return {
        "full_scan_gb": full_scan_gb,
        "actual_scan_gb": actual_scan_gb,
        "savings_gb": full_scan_gb - actual_scan_gb,
        "savings_pct": (1 - partitions_scanned / total_partitions) * 100,
        "partitions_skipped": total_partitions - partitions_scanned
    }

3.2 Clustering and Sorting

# query_optimization/clustering.py

"""
Clustering (sorting) within partitions for better query performance.

Partitioning decides WHICH files to read.
Clustering decides HOW data is organized WITHIN files.
"""

CLUSTERING_EXPLAINED = """
CLUSTERING (Z-ORDERING) VISUALIZATION

Without clustering:
  File 1: [A, C, B, A, D, C, B, A, C, D]
  File 2: [B, A, D, C, A, B, C, D, A, B]
  
  Query: WHERE category = 'A'
  → Must read ALL files (A is scattered everywhere)


With clustering on category:
  File 1: [A, A, A, A, A, B, B, B, B, B]
  File 2: [C, C, C, C, D, D, D, D, D, D]
  
  Query: WHERE category = 'A'
  → Read only File 1 (A is grouped together)


Z-ORDERING (Multi-column clustering):
  
  Without Z-ordering:
    Sorted by date only
    Query on date + product: Good on date, scattered on product
  
  With Z-ordering on (date, product):
    Data interleaved by both columns using Z-curve
    Query on date + product: Good locality for both
    
    
  Visualization of Z-curve:
  
    product
       ↑
     4 │ 5  6  9  10
     3 │ 4  7  8  11
     2 │ 1  2  13 14
     1 │ 0  3  12 15
       └──────────────→ date
         1  2  3  4
    
    Z-order visits: 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
    
    Query: date IN (1,2) AND product IN (1,2)
    → Values 0,1,2,3 are close together!
"""


class ClusteringConfig:
    """Configure clustering for a table."""
    
    def __init__(
        self,
        columns: List[str],
        method: str = "sort",  # "sort", "zorder", "hilbert"
        auto_cluster: bool = True
    ):
        self.columns = columns
        self.method = method
        self.auto_cluster = auto_cluster


# BigQuery clustering example
BIGQUERY_CLUSTERING = """
-- Create clustered table in BigQuery
CREATE TABLE project.dataset.fact_sales
PARTITION BY DATE(order_timestamp)
CLUSTER BY product_category, customer_region
AS SELECT * FROM source_table;

-- Query benefits from both partition AND cluster pruning
SELECT product_category, SUM(revenue)
FROM project.dataset.fact_sales
WHERE DATE(order_timestamp) = '2024-01-15'  -- Partition pruning
  AND customer_region = 'US-WEST'            -- Cluster pruning
GROUP BY product_category;
"""


# Delta Lake Z-ordering example
DELTA_ZORDER = """
-- Optimize Delta table with Z-ordering
OPTIMIZE fact_sales
ZORDER BY (product_id, customer_id);

-- This helps queries that filter on either or both columns
SELECT * FROM fact_sales
WHERE product_id = 'P123'
  AND customer_id = 'C456';
"""


# ClickHouse ordering
CLICKHOUSE_ORDER = """
-- ClickHouse uses ORDER BY for clustering
CREATE TABLE fact_sales (
    date Date,
    category String,
    region String,
    revenue Float64
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (category, region, date);  -- Data sorted by these columns

-- Queries on category, region are very fast
SELECT category, SUM(revenue)
FROM fact_sales
WHERE category = 'Electronics'
GROUP BY category;
"""

Chapter 4: Pre-Aggregation and Materialized Views

4.1 When to Pre-Aggregate

PRE-AGGREGATION DECISION FRAMEWORK

                                           ┌─────────────────────┐
                                           │ Is the query pattern │
                                           │    predictable?      │
                                           └──────────┬──────────┘
                                                      │
                                    ┌─────────────────┴─────────────────┐
                                    │                                   │
                                   YES                                  NO
                                    │                                   │
                                    ▼                                   ▼
                        ┌───────────────────┐             ┌───────────────────┐
                        │ Is it queried     │             │ Use columnar      │
                        │ frequently?       │             │ storage + indexes │
                        └─────────┬─────────┘             └───────────────────┘
                                  │
                    ┌─────────────┴─────────────┐
                    │                           │
                  YES (>10/day)              NO (<10/day)
                    │                           │
                    ▼                           ▼
        ┌───────────────────┐       ┌───────────────────┐
        │ PRE-AGGREGATE     │       │ On-demand compute │
        │ (Rollup table or  │       │ is probably fine  │
        │  materialized view│       │                   │
        └───────────────────┘       └───────────────────┘


EXAMPLES:

Daily revenue by category (executive dashboard)
├── Queried: 100+ times/day by 20 executives
├── Pattern: Always last 30/90/365 days
├── Decision: PRE-AGGREGATE (daily rollup table)
└── Result: 50ms instead of 45 seconds

Revenue by product by hour by region (ad-hoc exploration)
├── Queried: 2-3 times/month by analyst
├── Pattern: Different dimensions each time
├── Decision: DON'T pre-aggregate
└── Result: 45 seconds is acceptable for rare ad-hoc

4.2 Rollup Tables

# query_optimization/rollups.py

"""
Rollup table implementation for pre-aggregation.

Rollups trade storage for query speed.
"""

from dataclasses import dataclass
from typing import List, Dict, Any
from datetime import date


@dataclass
class RollupDefinition:
    """Defines a rollup table."""
    name: str
    source_table: str
    dimensions: List[str]  # GROUP BY columns
    measures: Dict[str, str]  # column -> aggregation
    refresh_schedule: str
    retention_days: int = 365


# Common rollup patterns
ROLLUP_PATTERNS = """
ROLLUP HIERARCHY PATTERN
────────────────────────

Create multiple rollups at different granularities:

Level 0: Raw facts (grain: transaction)
  fact_sales: 10B rows, 2TB

Level 1: Hourly rollup
  agg_hourly_sales: 8.7M rows (365 days × 24 hours × 1000 combos)
  Columns: hour, category, region, SUM(revenue), COUNT(orders)

Level 2: Daily rollup
  agg_daily_sales: 365K rows (365 days × 1000 combos)
  Columns: date, category, region, SUM(revenue), COUNT(orders)

Level 3: Monthly rollup
  agg_monthly_sales: 12K rows (12 months × 1000 combos)
  Columns: month, category, region, SUM(revenue), COUNT(orders)


QUERY ROUTING:
├── "Last 24 hours by hour" → Level 1 (hourly)
├── "Last 90 days by day" → Level 2 (daily)
├── "Last 2 years by month" → Level 3 (monthly)
└── "Drill into specific transactions" → Level 0 (raw)


CUBE ROLLUP PATTERN
───────────────────

Pre-compute all dimension combinations:

SELECT
    COALESCE(category, 'ALL') as category,
    COALESCE(region, 'ALL') as region,
    COALESCE(channel, 'ALL') as channel,
    SUM(revenue) as revenue,
    COUNT(*) as orders
FROM fact_sales
WHERE date = '2024-01-15'
GROUP BY CUBE(category, region, channel);

This creates rows for:
├── (Electronics, US, Web) - specific
├── (Electronics, US, ALL) - channel total
├── (Electronics, ALL, ALL) - category total
├── (ALL, ALL, ALL) - grand total
└── ... all 27 combinations (3×3×3)

Query "revenue by category" → filter WHERE region='ALL' AND channel='ALL'
Instant response from pre-computed row!
"""


class RollupManager:
    """
    Manages rollup table creation and refresh.
    """
    
    def __init__(self, db_connection):
        self.db = db_connection
        self.rollups: Dict[str, RollupDefinition] = {}
    
    def create_rollup(self, rollup: RollupDefinition):
        """Create a new rollup table."""
        
        # Build CREATE TABLE statement
        dimension_cols = ", ".join(rollup.dimensions)
        measure_cols = ", ".join([
            f"{agg}({col}) as {col}_{agg.lower()}"
            for col, agg in rollup.measures.items()
        ])
        
        sql = f"""
        CREATE TABLE {rollup.name} AS
        SELECT
            {dimension_cols},
            {measure_cols},
            CURRENT_TIMESTAMP as refreshed_at
        FROM {rollup.source_table}
        GROUP BY {dimension_cols}
        """
        
        self.db.execute(sql)
        self.rollups[rollup.name] = rollup
    
    def refresh_rollup(
        self,
        rollup_name: str,
        incremental: bool = True,
        target_date: date = None
    ):
        """
        Refresh a rollup table.
        
        Incremental: Only refresh specific date partition
        Full: Rebuild entire rollup
        """
        
        rollup = self.rollups[rollup_name]
        
        if incremental and target_date:
            # Delete and re-insert for specific date
            self.db.execute(f"""
                DELETE FROM {rollup.name}
                WHERE date = '{target_date}'
            """)
            
            dimension_cols = ", ".join(rollup.dimensions)
            measure_cols = ", ".join([
                f"{agg}({col}) as {col}_{agg.lower()}"
                for col, agg in rollup.measures.items()
            ])
            
            self.db.execute(f"""
                INSERT INTO {rollup.name}
                SELECT
                    {dimension_cols},
                    {measure_cols},
                    CURRENT_TIMESTAMP as refreshed_at
                FROM {rollup.source_table}
                WHERE date = '{target_date}'
                GROUP BY {dimension_cols}
            """)
        else:
            # Full refresh - recreate
            self.db.execute(f"DROP TABLE IF EXISTS {rollup.name}")
            self.create_rollup(rollup)
    
    def get_best_rollup_for_query(
        self,
        dimensions_needed: List[str],
        time_granularity: str
    ) -> Optional[str]:
        """
        Find the most efficient rollup for a query.
        
        Returns rollup name or None if should use base table.
        """
        
        candidates = []
        
        for name, rollup in self.rollups.items():
            # Check if rollup has all needed dimensions
            if all(d in rollup.dimensions for d in dimensions_needed):
                # Check if granularity matches
                if self._matches_granularity(rollup, time_granularity):
                    candidates.append((name, len(rollup.dimensions)))
        
        if not candidates:
            return None
        
        # Return rollup with fewest dimensions (most aggregated)
        candidates.sort(key=lambda x: x[1])
        return candidates[0][0]
    
    def _matches_granularity(self, rollup: RollupDefinition, granularity: str) -> bool:
        """Check if rollup supports the requested time granularity."""
        
        granularity_hierarchy = ["hour", "day", "week", "month", "quarter", "year"]
        
        rollup_granularity = "day"  # Default
        for dim in rollup.dimensions:
            if dim in granularity_hierarchy:
                rollup_granularity = dim
                break
        
        # Rollup must be at same or finer granularity
        return granularity_hierarchy.index(rollup_granularity) <= \
               granularity_hierarchy.index(granularity)

4.3 Materialized Views

# query_optimization/materialized_views.py

"""
Materialized view strategies for different databases.
"""

MATERIALIZED_VIEW_PATTERNS = """
MATERIALIZED VIEWS VS ROLLUP TABLES

Materialized Views:
├── Auto-refresh (database manages)
├── Query optimizer can use automatically
├── May have limitations on supported queries
└── Less control over refresh timing

Rollup Tables:
├── Manual refresh (you control)
├── Must explicitly query the rollup
├── Full SQL flexibility
└── More control, more responsibility


SNOWFLAKE MATERIALIZED VIEWS
────────────────────────────

CREATE MATERIALIZED VIEW mv_daily_revenue AS
SELECT
    DATE_TRUNC('day', order_timestamp) as date,
    product_category,
    SUM(revenue) as total_revenue,
    COUNT(*) as order_count
FROM fact_sales
GROUP BY 1, 2;

-- Snowflake automatically:
-- 1. Keeps view in sync with base table
-- 2. Uses view when query matches pattern
-- 3. Charges for storage and maintenance


BIGQUERY MATERIALIZED VIEWS
───────────────────────────

CREATE MATERIALIZED VIEW project.dataset.mv_daily_revenue
OPTIONS (
    enable_refresh = true,
    refresh_interval_minutes = 60
)
AS
SELECT
    DATE(order_timestamp) as date,
    product_category,
    SUM(revenue) as total_revenue,
    COUNT(*) as order_count
FROM project.dataset.fact_sales
GROUP BY 1, 2;

-- BigQuery automatically:
-- 1. Incrementally refreshes
-- 2. Rewrites queries to use MV
-- 3. Combines MV + base table for fresh data


CLICKHOUSE MATERIALIZED VIEWS
─────────────────────────────

-- ClickHouse MVs are triggers that populate a target table
CREATE MATERIALIZED VIEW mv_daily_revenue
ENGINE = SummingMergeTree()
ORDER BY (date, category)
AS
SELECT
    toDate(order_timestamp) as date,
    category,
    sum(revenue) as total_revenue,
    count() as order_count
FROM fact_sales
GROUP BY date, category;

-- ClickHouse:
-- 1. Populates MV on INSERT to source
-- 2. Uses SummingMergeTree to merge aggregates
-- 3. No automatic query rewrite - query MV directly
"""


class MaterializedViewManager:
    """
    Manages materialized views with freshness tracking.
    """
    
    def __init__(self, db_connection):
        self.db = db_connection
        self.views: Dict[str, Dict] = {}
    
    def create_view(
        self,
        name: str,
        query: str,
        refresh_interval_minutes: int = 60
    ):
        """Create a materialized view."""
        
        # Create the view
        self.db.execute(f"""
            CREATE MATERIALIZED VIEW {name} AS
            {query}
        """)
        
        self.views[name] = {
            "query": query,
            "refresh_interval": refresh_interval_minutes,
            "last_refresh": None,
            "row_count": 0
        }
    
    def get_freshness(self, name: str) -> Dict:
        """Get view freshness information."""
        
        if name not in self.views:
            return {"error": "View not found"}
        
        view = self.views[name]
        
        # Query for actual freshness
        result = self.db.execute(f"""
            SELECT 
                COUNT(*) as row_count,
                MAX(refreshed_at) as last_refresh
            FROM {name}
        """)
        
        return {
            "name": name,
            "last_refresh": result["last_refresh"],
            "row_count": result["row_count"],
            "refresh_interval_minutes": view["refresh_interval"],
            "is_stale": self._is_stale(result["last_refresh"], view["refresh_interval"])
        }
    
    def _is_stale(self, last_refresh, interval_minutes: int) -> bool:
        """Check if view is stale."""
        if not last_refresh:
            return True
        
        from datetime import datetime, timedelta
        threshold = datetime.utcnow() - timedelta(minutes=interval_minutes)
        return last_refresh < threshold

Chapter 5: Caching Strategies

5.1 Multi-Layer Caching

CACHING LAYERS FOR ANALYTICS

┌────────────────────────────────────────────────────────────────────────┐
│                         CACHING ARCHITECTURE                           │
│                                                                        │
│  Layer 1: CDN / Edge Cache                                             │
│  ├── Cache: Static dashboard HTML/JS                                   │
│  ├── TTL: Hours to days                                                │
│  └── Hit rate: 95%+ for static assets                                  │
│                                                                        │
│  Layer 2: API Response Cache (Redis)                                   │
│  ├── Cache: Query results as JSON                                      │
│  ├── Key: Hash of query + parameters                                   │
│  ├── TTL: Minutes to hours                                             │
│  └── Hit rate: 60-80% for popular queries                              │
│                                                                        │
│  Layer 3: Query Result Cache (Database)                                │
│  ├── Cache: Query results in DB memory                                 │
│  ├── Managed by: Database query cache                                  │
│  ├── TTL: Until invalidation                                           │
│  └── Hit rate: Varies by database                                      │
│                                                                        │
│  Layer 4: Block/Page Cache (Storage)                                   │
│  ├── Cache: Data blocks in memory                                      │
│  ├── Managed by: OS or database                                        │
│  └── Hit rate: High for repeated scans                                 │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘


CACHE INVALIDATION STRATEGIES

1. TIME-BASED (TTL)
   ├── Simple: Expire after X minutes
   ├── Good for: Dashboards that can be slightly stale
   └── Risk: Serving outdated data

2. EVENT-BASED
   ├── Invalidate when source data changes
   ├── Good for: Data that changes infrequently
   └── Risk: Complex to implement correctly

3. VERSION-BASED
   ├── Include data version in cache key
   ├── Good for: Point-in-time queries
   └── Risk: Cache bloat with many versions

4. HYBRID
   ├── TTL + event-based invalidation
   ├── Best of both worlds
   └── Most common in production

5.2 Query Result Caching

# query_optimization/caching.py

"""
Query result caching implementation.
"""

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
import hashlib
import json
import redis


@dataclass
class CacheConfig:
    """Cache configuration."""
    default_ttl_seconds: int = 300  # 5 minutes
    max_result_size_mb: int = 10
    redis_url: str = "redis://localhost:6379"


class QueryCache:
    """
    Caches query results for fast repeated access.
    """
    
    def __init__(self, config: CacheConfig):
        self.config = config
        self.redis = redis.from_url(config.redis_url)
        
        # Metrics
        self.hits = 0
        self.misses = 0
    
    def get(self, query: str, params: Dict = None) -> Optional[Dict]:
        """
        Get cached result for query.
        
        Returns None if not cached or expired.
        """
        
        cache_key = self._make_key(query, params)
        
        cached = self.redis.get(cache_key)
        
        if cached:
            self.hits += 1
            result = json.loads(cached)
            result["_cache"] = {
                "hit": True,
                "cached_at": result.get("_cached_at"),
                "key": cache_key[:20] + "..."
            }
            return result
        
        self.misses += 1
        return None
    
    def set(
        self,
        query: str,
        params: Dict,
        result: Dict,
        ttl_seconds: int = None
    ):
        """
        Cache a query result.
        """
        
        cache_key = self._make_key(query, params)
        ttl = ttl_seconds or self.config.default_ttl_seconds
        
        # Add metadata
        result["_cached_at"] = datetime.utcnow().isoformat()
        
        # Check size
        result_json = json.dumps(result)
        size_mb = len(result_json) / (1024 * 1024)
        
        if size_mb > self.config.max_result_size_mb:
            # Too large to cache
            return False
        
        self.redis.setex(cache_key, ttl, result_json)
        return True
    
    def invalidate(self, pattern: str = None, query: str = None):
        """
        Invalidate cached results.
        
        pattern: Redis key pattern (e.g., "query:sales:*")
        query: Specific query to invalidate
        """
        
        if query:
            cache_key = self._make_key(query, {})
            # Invalidate all variants of this query
            keys = self.redis.keys(f"{cache_key}*")
            if keys:
                self.redis.delete(*keys)
        
        elif pattern:
            keys = self.redis.keys(pattern)
            if keys:
                self.redis.delete(*keys)
    
    def _make_key(self, query: str, params: Dict = None) -> str:
        """Generate cache key from query and params."""
        
        # Normalize query
        normalized = " ".join(query.lower().split())
        
        # Include params in key
        key_data = {
            "query": normalized,
            "params": params or {}
        }
        
        key_hash = hashlib.sha256(
            json.dumps(key_data, sort_keys=True).encode()
        ).hexdigest()[:16]
        
        return f"query:{key_hash}"
    
    def get_stats(self) -> Dict:
        """Get cache statistics."""
        
        total = self.hits + self.misses
        
        return {
            "hits": self.hits,
            "misses": self.misses,
            "hit_rate": self.hits / total if total > 0 else 0,
            "total_requests": total
        }


class SmartQueryCache(QueryCache):
    """
    Smart caching with query-specific TTLs and warming.
    """
    
    def __init__(self, config: CacheConfig):
        super().__init__(config)
        
        # Query-specific TTL rules
        self.ttl_rules = {
            "realtime": 60,       # 1 minute for real-time queries
            "hourly": 300,        # 5 minutes for hourly data
            "daily": 3600,        # 1 hour for daily data
            "historical": 86400,  # 24 hours for historical
        }
    
    def get_ttl_for_query(self, query: str, params: Dict) -> int:
        """Determine appropriate TTL based on query characteristics."""
        
        query_lower = query.lower()
        
        # Real-time queries (today's data)
        if "today" in query_lower or "current" in query_lower:
            return self.ttl_rules["realtime"]
        
        # Check date range in params
        if params:
            if "date" in params:
                date_param = params["date"]
                # If querying old data, cache longer
                if isinstance(date_param, str) and date_param < "2024-01-01":
                    return self.ttl_rules["historical"]
        
        # Default to daily TTL
        return self.ttl_rules["daily"]
    
    async def warm_cache(self, queries: list):
        """
        Pre-warm cache with common queries.
        
        Run during off-peak hours.
        """
        
        for query_def in queries:
            query = query_def["query"]
            params = query_def.get("params", {})
            
            # Execute query and cache result
            result = await self._execute_query(query, params)
            
            ttl = self.get_ttl_for_query(query, params)
            self.set(query, params, result, ttl)
    
    async def _execute_query(self, query: str, params: Dict) -> Dict:
        """Execute query against database."""
        # Implementation depends on database
        pass

Chapter 6: Multi-Tenant Analytics

6.1 Tenant Isolation Strategies

MULTI-TENANT ANALYTICS PATTERNS

PATTERN 1: ROW-LEVEL SECURITY
─────────────────────────────

Single table with tenant_id column:

  fact_sales:
    tenant_id | date | product | revenue
    ──────────┼──────┼─────────┼────────
    tenant_a  | ...  | ...     | ...
    tenant_b  | ...  | ...     | ...

Query filter:
  SELECT * FROM fact_sales WHERE tenant_id = :current_tenant

Pros:
├── Simple to implement
├── Easy to add new tenants
└── Shared resources = lower cost

Cons:
├── Risk of data leakage (forgot WHERE clause)
├── Noisy neighbor (one tenant's queries affect others)
└── Scaling limits


PATTERN 2: SCHEMA PER TENANT
────────────────────────────

Separate schema for each tenant:

  tenant_a.fact_sales
  tenant_b.fact_sales
  tenant_c.fact_sales

Query:
  SELECT * FROM {tenant_schema}.fact_sales

Pros:
├── Strong isolation
├── Easy per-tenant customization
└── Simple backup/restore per tenant

Cons:
├── Schema proliferation
├── Harder to query across tenants
└── More maintenance


PATTERN 3: DATABASE PER TENANT
──────────────────────────────

Completely separate databases:

  analytics_tenant_a (ClickHouse cluster 1)
  analytics_tenant_b (ClickHouse cluster 2)

Pros:
├── Complete isolation
├── Independent scaling
├── Compliance-friendly (data residency)

Cons:
├── Highest cost
├── Most operational complexity
└── No cross-tenant analytics

6.2 Resource Management

# query_optimization/multi_tenant.py

"""
Multi-tenant query management with fair resource allocation.
"""

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Optional
import asyncio


@dataclass
class TenantQuota:
    """Resource quota for a tenant."""
    max_concurrent_queries: int = 10
    max_scanned_bytes_per_day: int = 100 * 1024**3  # 100GB
    max_query_timeout_seconds: int = 300
    priority: int = 1  # Higher = more priority


class TenantResourceManager:
    """
    Manages query resources across tenants fairly.
    """
    
    def __init__(self):
        self.quotas: Dict[str, TenantQuota] = {}
        self.usage: Dict[str, Dict] = {}
        self.active_queries: Dict[str, int] = {}
    
    def register_tenant(self, tenant_id: str, quota: TenantQuota):
        """Register a tenant with their quota."""
        self.quotas[tenant_id] = quota
        self.usage[tenant_id] = {
            "bytes_scanned_today": 0,
            "queries_today": 0,
            "last_reset": datetime.utcnow().date()
        }
        self.active_queries[tenant_id] = 0
    
    async def acquire_query_slot(self, tenant_id: str) -> bool:
        """
        Try to acquire a query slot for tenant.
        
        Returns False if quota exceeded.
        """
        
        self._maybe_reset_daily_usage(tenant_id)
        
        quota = self.quotas.get(tenant_id)
        if not quota:
            return False
        
        # Check concurrent query limit
        if self.active_queries[tenant_id] >= quota.max_concurrent_queries:
            return False
        
        # Check daily byte limit
        if self.usage[tenant_id]["bytes_scanned_today"] >= quota.max_scanned_bytes_per_day:
            return False
        
        self.active_queries[tenant_id] += 1
        return True
    
    def release_query_slot(self, tenant_id: str, bytes_scanned: int):
        """Release query slot and record usage."""
        
        self.active_queries[tenant_id] -= 1
        self.usage[tenant_id]["bytes_scanned_today"] += bytes_scanned
        self.usage[tenant_id]["queries_today"] += 1
    
    def get_query_timeout(self, tenant_id: str) -> int:
        """Get query timeout for tenant."""
        quota = self.quotas.get(tenant_id)
        return quota.max_query_timeout_seconds if quota else 60
    
    def _maybe_reset_daily_usage(self, tenant_id: str):
        """Reset usage counters if new day."""
        today = datetime.utcnow().date()
        if self.usage[tenant_id]["last_reset"] < today:
            self.usage[tenant_id] = {
                "bytes_scanned_today": 0,
                "queries_today": 0,
                "last_reset": today
            }
    
    def get_usage_report(self, tenant_id: str) -> Dict:
        """Get usage report for tenant."""
        
        self._maybe_reset_daily_usage(tenant_id)
        
        quota = self.quotas.get(tenant_id)
        usage = self.usage.get(tenant_id, {})
        
        return {
            "tenant_id": tenant_id,
            "today": {
                "bytes_scanned": usage.get("bytes_scanned_today", 0),
                "bytes_limit": quota.max_scanned_bytes_per_day if quota else 0,
                "bytes_remaining": max(0, 
                    (quota.max_scanned_bytes_per_day if quota else 0) - 
                    usage.get("bytes_scanned_today", 0)
                ),
                "queries_executed": usage.get("queries_today", 0)
            },
            "active_queries": self.active_queries.get(tenant_id, 0),
            "max_concurrent": quota.max_concurrent_queries if quota else 0
        }


class QueryRouter:
    """
    Routes queries to appropriate resources based on tenant and query type.
    """
    
    def __init__(
        self,
        resource_manager: TenantResourceManager,
        query_cache: QueryCache
    ):
        self.resources = resource_manager
        self.cache = query_cache
        
        # Query executors by type
        self.executors = {}
    
    async def execute_query(
        self,
        tenant_id: str,
        query: str,
        params: Dict = None
    ) -> Dict:
        """
        Execute query with tenant isolation and resource management.
        """
        
        # Check cache first
        cached = self.cache.get(f"{tenant_id}:{query}", params)
        if cached:
            return cached
        
        # Acquire resources
        if not await self.resources.acquire_query_slot(tenant_id):
            return {
                "error": "Resource quota exceeded",
                "retry_after_seconds": 60
            }
        
        try:
            # Get timeout for tenant
            timeout = self.resources.get_query_timeout(tenant_id)
            
            # Execute with timeout
            result = await asyncio.wait_for(
                self._execute(tenant_id, query, params),
                timeout=timeout
            )
            
            # Cache result
            self.cache.set(f"{tenant_id}:{query}", params, result)
            
            # Release resources
            bytes_scanned = result.get("_metadata", {}).get("bytes_scanned", 0)
            self.resources.release_query_slot(tenant_id, bytes_scanned)
            
            return result
            
        except asyncio.TimeoutError:
            self.resources.release_query_slot(tenant_id, 0)
            return {"error": f"Query timeout after {timeout}s"}
        
        except Exception as e:
            self.resources.release_query_slot(tenant_id, 0)
            raise
    
    async def _execute(self, tenant_id: str, query: str, params: Dict) -> Dict:
        """Execute query with tenant context."""
        
        # Add tenant filter to query
        safe_query = self._add_tenant_filter(query, tenant_id)
        
        # Route to appropriate executor
        executor = self._get_executor(query)
        
        return await executor.execute(safe_query, params)
    
    def _add_tenant_filter(self, query: str, tenant_id: str) -> str:
        """
        Add tenant filter to query for row-level security.
        
        In production, use database-level RLS instead.
        """
        
        # Simple approach - inject WHERE clause
        # WARNING: This is simplified - use parameterized queries!
        if "WHERE" in query.upper():
            return query.replace("WHERE", f"WHERE tenant_id = '{tenant_id}' AND")
        else:
            # Find insertion point
            return query + f" WHERE tenant_id = '{tenant_id}'"
    
    def _get_executor(self, query: str):
        """Get appropriate executor based on query type."""
        
        query_lower = query.lower()
        
        if "sum(" in query_lower or "count(" in query_lower:
            return self.executors.get("olap", self.executors["default"])
        
        return self.executors["default"]

Part II: Implementation

Chapter 7: Production Query Service

7.1 Complete Query Service

# query_optimization/query_service.py

"""
Production query service with all optimizations.
"""

from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any, List, Optional
import logging

logger = logging.getLogger(__name__)


@dataclass
class QueryServiceConfig:
    """Query service configuration."""
    cache_enabled: bool = True
    cache_ttl_seconds: int = 300
    max_result_rows: int = 10000
    default_timeout_seconds: int = 30
    enable_query_rewriting: bool = True


class QueryService:
    """
    Production query service with caching, optimization, and monitoring.
    """
    
    def __init__(
        self,
        config: QueryServiceConfig,
        cache: QueryCache,
        rollup_manager: RollupManager,
        resource_manager: TenantResourceManager,
        db_connection
    ):
        self.config = config
        self.cache = cache
        self.rollups = rollup_manager
        self.resources = resource_manager
        self.db = db_connection
        
        # Metrics
        self.query_count = 0
        self.cache_hits = 0
        self.rollup_hits = 0
        self.avg_latency_ms = 0
    
    async def execute(
        self,
        tenant_id: str,
        query: str,
        params: Dict = None,
        options: Dict = None
    ) -> Dict:
        """
        Execute query with full optimization pipeline.
        
        Pipeline:
        1. Check cache
        2. Acquire resources
        3. Rewrite query (use rollups)
        4. Execute
        5. Cache result
        6. Return with metadata
        """
        
        start_time = datetime.utcnow()
        self.query_count += 1
        
        options = options or {}
        result_metadata = {
            "query_id": f"q_{self.query_count}_{start_time.timestamp()}",
            "tenant_id": tenant_id,
            "cache_hit": False,
            "rollup_used": None,
            "bytes_scanned": 0
        }
        
        try:
            # Step 1: Check cache
            if self.config.cache_enabled and not options.get("skip_cache"):
                cache_key = f"{tenant_id}:{query}"
                cached = self.cache.get(cache_key, params)
                
                if cached:
                    self.cache_hits += 1
                    result_metadata["cache_hit"] = True
                    result_metadata["latency_ms"] = self._elapsed_ms(start_time)
                    return self._format_response(cached, result_metadata)
            
            # Step 2: Acquire resources
            if not await self.resources.acquire_query_slot(tenant_id):
                return self._quota_exceeded_response(tenant_id)
            
            try:
                # Step 3: Query rewriting
                optimized_query = query
                if self.config.enable_query_rewriting:
                    optimized_query, rollup_used = self._try_rewrite_to_rollup(
                        query, params
                    )
                    if rollup_used:
                        self.rollup_hits += 1
                        result_metadata["rollup_used"] = rollup_used
                
                # Step 4: Add tenant filter
                safe_query = self._add_tenant_filter(optimized_query, tenant_id)
                
                # Step 5: Execute with timeout
                timeout = options.get("timeout", self.config.default_timeout_seconds)
                result = await self._execute_query(safe_query, params, timeout)
                
                result_metadata["bytes_scanned"] = result.get("_bytes_scanned", 0)
                result_metadata["rows_returned"] = len(result.get("data", []))
                
                # Step 6: Cache result
                if self.config.cache_enabled:
                    ttl = self._get_cache_ttl(query, params)
                    self.cache.set(f"{tenant_id}:{query}", params, result, ttl)
                
                # Release resources
                self.resources.release_query_slot(
                    tenant_id,
                    result_metadata["bytes_scanned"]
                )
                
                result_metadata["latency_ms"] = self._elapsed_ms(start_time)
                self._update_avg_latency(result_metadata["latency_ms"])
                
                return self._format_response(result, result_metadata)
                
            except Exception as e:
                self.resources.release_query_slot(tenant_id, 0)
                raise
        
        except Exception as e:
            logger.error(f"Query execution failed: {e}", exc_info=True)
            result_metadata["latency_ms"] = self._elapsed_ms(start_time)
            result_metadata["error"] = str(e)
            return self._error_response(e, result_metadata)
    
    def _try_rewrite_to_rollup(
        self,
        query: str,
        params: Dict
    ) -> tuple:
        """
        Try to rewrite query to use a rollup table.
        
        Returns (rewritten_query, rollup_name) or (original_query, None)
        """
        
        # Parse query to understand dimensions and granularity
        dimensions = self._extract_dimensions(query)
        granularity = self._extract_time_granularity(query, params)
        
        # Find best rollup
        rollup_name = self.rollups.get_best_rollup_for_query(
            dimensions, granularity
        )
        
        if rollup_name:
            # Rewrite query to use rollup
            rewritten = self._rewrite_query_for_rollup(query, rollup_name)
            return rewritten, rollup_name
        
        return query, None
    
    def _extract_dimensions(self, query: str) -> List[str]:
        """Extract GROUP BY dimensions from query."""
        
        # Simplified parsing - use proper SQL parser in production
        query_upper = query.upper()
        
        if "GROUP BY" not in query_upper:
            return []
        
        group_by_idx = query_upper.index("GROUP BY")
        group_by_clause = query[group_by_idx + 8:].split("ORDER BY")[0].split("LIMIT")[0]
        
        dimensions = [d.strip() for d in group_by_clause.split(",")]
        return dimensions
    
    def _extract_time_granularity(self, query: str, params: Dict) -> str:
        """Determine time granularity of query."""
        
        query_lower = query.lower()
        
        if "hour" in query_lower or "hourly" in query_lower:
            return "hour"
        elif "day" in query_lower or "daily" in query_lower:
            return "day"
        elif "week" in query_lower or "weekly" in query_lower:
            return "week"
        elif "month" in query_lower or "monthly" in query_lower:
            return "month"
        
        return "day"  # Default
    
    def _rewrite_query_for_rollup(self, query: str, rollup_name: str) -> str:
        """Rewrite query to use rollup table."""
        
        # Replace fact table with rollup
        # Simplified - use proper SQL rewriting in production
        rewritten = query.replace("fact_sales", rollup_name)
        rewritten = rewritten.replace("fact_orders", rollup_name)
        
        return rewritten
    
    def _add_tenant_filter(self, query: str, tenant_id: str) -> str:
        """Add tenant isolation filter."""
        
        # In production, use database-level RLS
        # This is a simplified approach
        
        if "WHERE" in query.upper():
            # Insert after WHERE
            where_idx = query.upper().index("WHERE")
            return (
                query[:where_idx + 5] +
                f" tenant_id = '{tenant_id}' AND" +
                query[where_idx + 5:]
            )
        else:
            # Add WHERE clause
            # Find safe insertion point
            for keyword in ["GROUP BY", "ORDER BY", "LIMIT", ";"]:
                if keyword in query.upper():
                    idx = query.upper().index(keyword)
                    return (
                        query[:idx] +
                        f" WHERE tenant_id = '{tenant_id}' " +
                        query[idx:]
                    )
            
            return query + f" WHERE tenant_id = '{tenant_id}'"
    
    async def _execute_query(
        self,
        query: str,
        params: Dict,
        timeout: int
    ) -> Dict:
        """Execute query against database."""
        
        # Add LIMIT if not present
        if "LIMIT" not in query.upper():
            query = query + f" LIMIT {self.config.max_result_rows}"
        
        # Execute with parameterized query
        result = await self.db.execute(query, params, timeout=timeout)
        
        return result
    
    def _get_cache_ttl(self, query: str, params: Dict) -> int:
        """Determine cache TTL based on query characteristics."""
        
        query_lower = query.lower()
        
        # Real-time queries: short TTL
        if "today" in query_lower or "last_hour" in query_lower:
            return 60
        
        # Historical queries: long TTL
        if params and params.get("end_date", "").startswith("2023"):
            return 3600
        
        return self.config.cache_ttl_seconds
    
    def _elapsed_ms(self, start: datetime) -> int:
        """Calculate elapsed time in milliseconds."""
        return int((datetime.utcnow() - start).total_seconds() * 1000)
    
    def _update_avg_latency(self, latency_ms: int):
        """Update rolling average latency."""
        # Simple exponential moving average
        alpha = 0.1
        self.avg_latency_ms = alpha * latency_ms + (1 - alpha) * self.avg_latency_ms
    
    def _format_response(self, result: Dict, metadata: Dict) -> Dict:
        """Format successful response."""
        return {
            "data": result.get("data", []),
            "metadata": metadata,
            "status": "success"
        }
    
    def _error_response(self, error: Exception, metadata: Dict) -> Dict:
        """Format error response."""
        return {
            "data": [],
            "metadata": metadata,
            "status": "error",
            "error": {
                "type": type(error).__name__,
                "message": str(error)
            }
        }
    
    def _quota_exceeded_response(self, tenant_id: str) -> Dict:
        """Format quota exceeded response."""
        usage = self.resources.get_usage_report(tenant_id)
        
        return {
            "data": [],
            "metadata": {"tenant_id": tenant_id},
            "status": "error",
            "error": {
                "type": "QuotaExceeded",
                "message": "Query quota exceeded",
                "usage": usage,
                "retry_after_seconds": 60
            }
        }
    
    def get_stats(self) -> Dict:
        """Get service statistics."""
        return {
            "total_queries": self.query_count,
            "cache_hit_rate": self.cache_hits / self.query_count if self.query_count > 0 else 0,
            "rollup_hit_rate": self.rollup_hits / self.query_count if self.query_count > 0 else 0,
            "avg_latency_ms": self.avg_latency_ms,
            "cache_stats": self.cache.get_stats()
        }

Part III: Real-World Application

Chapter 8: Case Studies

8.1 Amplitude's Query Engine

AMPLITUDE'S ANALYTICS QUERY ARCHITECTURE

Challenge:
├── 50K+ customers
├── Trillions of events
├── Sub-second query responses
├── Complex behavioral analytics (funnels, retention)

Architecture:

┌────────────────────────────────────────────────────────────────────────┐
│                     AMPLITUDE NOVA ENGINE                              │
│                                                                        │
│  Ingestion:                                                            │
│  ├── Real-time events → Kafka → Druid (hot data)                       │
│  └── Historical → S3 → Trino (cold data)                               │
│                                                                        │
│  Query Optimization:                                                   │
│  ├── Pre-computed metrics per customer                                 │
│  ├── Materialized views for common charts                              │
│  ├── Multi-level caching (edge, API, database)                         │
│  └── Query routing: hot → Druid, cold → Trino                          │
│                                                                        │
│  Multi-tenancy:                                                        │
│  ├── Logical isolation (tenant_id column)                              │
│  ├── Per-tenant query quotas                                           │
│  ├── Priority queues (paid tiers get priority)                         │
│  └── Resource pools per customer size                                  │
│                                                                        │
│  Key Innovation: BEHAVIORAL PRE-COMPUTATION                            │
│  ├── Pre-compute common metrics (DAU, WAU, MAU)                        │
│  ├── Pre-build funnel snapshots nightly                                │
│  ├── Retention matrices updated hourly                                 │
│  └── Custom dashboards compile to optimized queries                    │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Results:
├── p50 latency: 200ms
├── p99 latency: 2s
├── Queries/day: 100M+
└── Cost efficiency: 10x improvement over naive approach

8.2 Mixpanel's JQL Engine

MIXPANEL'S JQL (JavaScript Query Language)

Challenge:
├── Flexible query language for analysts
├── Interactive exploration (fast iteration)
├── Massive scale (billions of events)

Solution:

┌────────────────────────────────────────────────────────────────────────┐
│                        MIXPANEL ARIADNE                                │
│                                                                        │
│  Query Language:                                                       │
│  ├── JQL: JavaScript-like syntax for flexibility                       │
│  ├── Compiles to optimized execution plan                              │
│  └── Sandboxed execution for safety                                    │
│                                                                        │
│  Storage:                                                              │
│  ├── Custom columnar format (not Parquet)                              │
│  ├── Per-project data isolation                                        │
│  ├── Time-partitioned with custom indexing                             │
│  └── Compression optimized for event data                              │
│                                                                        │
│  Query Execution:                                                      │
│  ├── Query planner rewrites to optimal form                            │
│  ├── Predicate pushdown to storage layer                               │
│  ├── Parallel execution across partitions                              │
│  └── Streaming results (don't wait for full result)                    │
│                                                                        │
│  Optimization Tricks:                                                  │
│  ├── Sampling for exploration (fast approximate)                       │
│  ├── Full scan for reports (accurate)                                  │
│  ├── Progressive result loading                                        │
│  └── Query similarity detection for caching                            │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Chapter 9: Common Mistakes

9.1 Query Optimization Mistakes

QUERY OPTIMIZATION ANTI-PATTERNS

❌ MISTAKE 1: Over-Caching

Wrong:
  # Cache everything for 24 hours
  cache.set(query, result, ttl=86400)

Problem:
  Stale data shown to users
  Cache invalidation nightmares
  Memory bloat

Right:
  # TTL based on data freshness requirements
  if is_realtime_query(query):
      cache.set(query, result, ttl=60)
  elif is_historical_query(query):
      cache.set(query, result, ttl=3600)


❌ MISTAKE 2: Wrong Rollup Granularity

Wrong:
  # One rollup to rule them all
  CREATE TABLE agg_everything AS
  SELECT date, category, region, store, product, customer_segment...
  GROUP BY ALL

Problem:
  Rollup is as big as fact table
  No query benefit
  Maintenance overhead

Right:
  # Purpose-specific rollups
  agg_daily_category     -- Executive dashboard
  agg_hourly_region      -- Operations
  agg_daily_product_top100  -- Product team


❌ MISTAKE 3: No Query Timeouts

Wrong:
  result = db.execute(user_query)  # Wait forever

Problem:
  Runaway queries consume resources
  Other users blocked
  Cost explosion

Right:
  result = db.execute(
      user_query,
      timeout_seconds=30,
      max_bytes_scanned=10_000_000_000
  )


❌ MISTAKE 4: Ignoring Query Patterns

Wrong:
  # Optimize based on intuition
  "Let's add an index on product_id"

Problem:
  Index might not be used
  Wasted storage and maintenance

Right:
  # Analyze actual query patterns
  SELECT query_text, count(*), avg(duration_ms)
  FROM query_log
  GROUP BY query_text
  ORDER BY count(*) DESC
  LIMIT 20
  -- Then optimize top queries

9.2 Cost Management Mistakes

COST MANAGEMENT ANTI-PATTERNS

❌ MISTAKE 1: No Cost Visibility

Wrong:
  # No idea what queries cost
  Users run expensive queries freely

Problem:
  $50K/month surprise bills
  No accountability
  Can't optimize

Right:
  # Track cost per query, per tenant
  Show estimated cost before execution
  Alert on expensive queries
  Dashboard for cost attribution


❌ MISTAKE 2: Scanning Full Tables

Wrong:
  SELECT * FROM fact_sales WHERE product_id = 'P123'
  -- No partition filter!

Problem:
  Scans 10TB instead of 10GB
  100x more expensive

Right:
  SELECT * FROM fact_sales 
  WHERE date >= '2024-01-01'  -- Partition pruning
    AND product_id = 'P123'


❌ MISTAKE 3: SELECT *

Wrong:
  SELECT * FROM fact_sales  -- 50 columns

Problem:
  Reads unnecessary columns
  Network overhead
  Memory waste

Right:
  SELECT date, revenue, quantity  -- Only needed columns
  FROM fact_sales

Part IV: Interview Preparation

Chapter 10: Interview Tips

10.1 When to Discuss Query Optimization

QUERY OPTIMIZATION INTERVIEW TRIGGERS

"The dashboard is slow"
  → Caching, pre-aggregation, partitioning

"We're paying too much for BigQuery/Snowflake"
  → Cost optimization, rollups, tiered storage

"Design a multi-tenant analytics platform"
  → Tenant isolation, resource management, fair queuing

"How would you handle 10K concurrent dashboard users?"
  → Caching layers, pre-computation, database choice

"Users complain queries take too long"
  → Query analysis, indexing, materialized views

10.2 Key Phrases

TALKING ABOUT QUERY OPTIMIZATION

"For dashboard queries, I'd implement a multi-layer caching
strategy: CDN for static assets, Redis for query results,
and database-level caching. The key is matching TTL to
data freshness requirements."

"Pre-aggregation is crucial for analytics at scale. I'd
analyze query patterns to identify the top 20 queries
that account for 80% of load, then create rollup tables
for those specific patterns."

"For multi-tenant analytics, I'd use row-level security
with tenant_id filtering, combined with per-tenant
resource quotas. This prevents noisy neighbor issues
while keeping costs manageable."

"The choice of OLAP database depends on the access pattern.
For real-time dashboards with high concurrency, ClickHouse
or Druid. For ad-hoc exploration, BigQuery or Snowflake.
Often you need both."

Chapter 11: Practice Problems

Problem 1: Dashboard Performance

Setup: Your executive dashboard takes 45 seconds to load. It shows:

  • Revenue by region by day (last 90 days)
  • Top 10 products by revenue
  • Conversion funnel by channel

Data: 5B events, 10TB total.

Questions:

  1. How would you get load time under 1 second?
  2. How do you keep data fresh while maintaining speed?
  3. What's your caching invalidation strategy?
  • Pre-aggregate all three views as rollup tables
  • Refresh rollups every hour (or on schedule)
  • Cache rendered dashboard for 5 minutes
  • Use streaming for "today" portion

Problem 2: Multi-Tenant Cost Control

Setup: You have 500 tenants sharing a BigQuery cluster. Three tenants are running expensive queries that cost $10K/month each, while most tenants cost $100/month.

Questions:

  1. How do you identify the expensive queries?
  2. How do you implement fair usage without blocking legitimate use?
  3. How do you communicate limits to tenants?
  • Query logging with cost attribution per tenant
  • Tiered quotas based on plan (bytes/day, concurrency)
  • Pre-query cost estimation with user warning
  • Usage dashboard visible to tenants

Chapter 12: Sample Interview Dialogue

Interviewer: "Our analytics queries are too slow and too expensive. How would you fix this?"

You: "I'd approach this systematically. First, I need to understand the current state. What's the query latency distribution? What's driving costs? Which queries are most frequent?"

Interviewer: "p50 is 5 seconds, p99 is 60 seconds. Costs are $80K/month on BigQuery. The top 10 queries account for 70% of both time and cost."

You: "That's actually good news — concentrated pain means concentrated opportunity. For those top 10 queries, I'd do three things:

First, pre-aggregation. If these are dashboard queries hitting the same patterns repeatedly, I'd create materialized views or rollup tables. A query that scans 1TB becomes a query that scans 1GB.

Second, caching. For queries that don't need up-to-the-second freshness, cache results in Redis. A dashboard that 50 people load every morning doesn't need 50 BigQuery scans — one scan, 49 cache hits.

Third, partitioning and clustering. Make sure tables are partitioned by date and clustered by common filter columns. This is table stakes for BigQuery cost control.

For the long tail of queries, I'd implement query governance: byte scan limits, timeout limits, and cost estimation before execution. Users should know 'this query will cost $50 and take 2 minutes' before they run it."

Interviewer: "What about the freshness vs speed trade-off?"

You: "Great question. I'd segment by use case:

Real-time dashboards (operations): Streaming ingestion to a real-time OLAP like Druid or ClickHouse. Sub-second queries, always fresh. More expensive but worth it for ops.

Business dashboards (executives): Pre-computed views refreshed hourly or daily. They're okay with 'as of this morning' data. Cheap and fast.

Ad-hoc exploration (analysts): Query the raw data when needed. They expect slower queries and accept the cost for flexibility.

The key is matching the architecture to the use case, not trying to make one system do everything optimally."


Summary

DAY 5 SUMMARY: QUERY LAYER AND OPTIMIZATION

OLAP DATABASE CHOICE
├── Cloud warehouses: Snowflake, BigQuery (simple, managed)
├── Real-time OLAP: ClickHouse, Druid (fast, concurrent)
├── Lake query engines: Trino (flexible, federated)
└── Match database to access pattern

OPTIMIZATION TECHNIQUES
├── Partitioning: Skip irrelevant files
├── Clustering: Group related data
├── Pre-aggregation: Compute once, query fast
├── Caching: Don't recompute unchanged data
└── Query rewriting: Use rollups automatically

PRE-AGGREGATION
├── Analyze query patterns first
├── Create purpose-specific rollups
├── Hierarchy: hourly → daily → monthly
├── Cube pattern for drill-down
└── Refresh strategy matters

CACHING STRATEGY
├── Multi-layer: CDN → API → Database
├── TTL based on freshness needs
├── Invalidation: time or event-based
├── Warm cache during off-peak
└── Monitor hit rates

MULTI-TENANCY
├── Row-level security: Simple, shared
├── Schema per tenant: Isolated, more work
├── Database per tenant: Complete isolation, highest cost
├── Resource quotas: Fair usage, prevent noisy neighbor
└── Cost attribution: Know who's spending what

Key Takeaways

QUERY OPTIMIZATION KEY TAKEAWAYS

1. KNOW YOUR QUERY PATTERNS
   Can't optimize what you don't measure

2. PRE-COMPUTE WHAT'S PREDICTABLE
   Rollups for dashboards, raw for ad-hoc

3. CACHE AGGRESSIVELY, INVALIDATE CAREFULLY
   Right TTL is critical

4. PARTITION EVERYTHING
   Date partitioning is table stakes

5. MATCH DATABASE TO USE CASE
   No one database does it all

GOLDEN RULES:
├── Analyze before optimizing
├── 20% of queries cause 80% of cost
├── Pre-aggregate predictable patterns
├── Cache with appropriate TTL
└── Monitor, iterate, improve

Week 8 Complete!

WEEK 8 SUMMARY: ANALYTICS PIPELINE

Day 1: EVENT INGESTION
├── Event schema design
├── Kafka as backbone
├── Validation and enrichment
└── Exactly-once delivery

Day 2: STREAMING VS BATCH
├── Lambda vs Kappa architecture
├── Flink for streaming
├── Spark for batch
└── Watermarks and state

Day 3: DATA MODELING
├── Star schema
├── Fact and dimension tables
├── SCD Type 2
└── Columnar storage

Day 4: LATE DATA
├── Event time vs processing time
├── Watermarks deep dive
├── Versioned results
└── Reprocessing patterns

Day 5: QUERY LAYER
├── OLAP databases
├── Pre-aggregation
├── Caching strategies
└── Multi-tenant management

You now understand the complete analytics pipeline
from event capture to fast dashboard queries!

What's Next

The Week 8 Capstone will integrate all these concepts into designing a complete analytics system for a video streaming platform.

CAPSTONE PREVIEW: STREAMING ANALYTICS PLATFORM

Design Netflix-style analytics:
├── Track video plays, pauses, completions
├── Real-time "trending now" dashboard
├── Historical viewing patterns
├── Content performance for creators
├── A/B test analysis
└── Handle 10M concurrent viewers

This will test everything from Week 8:
├── Event ingestion at scale
├── Streaming + batch processing
├── Star schema for viewing data
├── Late data from mobile apps
└── Fast queries for dashboards

End of Week 8, Day 5

Next: Week 8 Capstone — Real-Time Analytics for Streaming Platform