Week 8 — Day 5: Query Layer and Optimization
System Design Mastery Series — Analytics Pipeline Week
Preface
We've built an impressive analytics pipeline this week. Events flow through Kafka, get processed by Flink and Spark, land in a beautifully designed star schema, and late data is handled gracefully.
There's just one problem:
THE QUERY PERFORMANCE CRISIS
Marketing team runs a dashboard query:
"Show revenue by product category, by region, by day
for the last 90 days"
What happens:
├── Query scans 2.7 billion rows
├── Shuffles 50GB of data across cluster
├── Takes 47 seconds to return
└── Dashboard times out
Users complain:
├── "The dashboard is unusable"
├── "I can't get answers to simple questions"
├── "We're paying $50K/month and it's still slow"
└── "Can we just use Excel?"
The pipeline is working perfectly.
The query layer is broken.
Today, we'll learn to make queries over billions of rows return in milliseconds — without breaking the bank.
Part I: Foundations
Chapter 1: OLAP Database Landscape
1.1 Categories of OLAP Systems
OLAP DATABASE CATEGORIES
1. CLOUD DATA WAREHOUSES
├── Snowflake
├── Google BigQuery
├── Amazon Redshift
├── Azure Synapse
└── Databricks SQL
Characteristics:
├── Managed, serverless (mostly)
├── Separation of storage and compute
├── Pay per query or compute time
├── Best for: Ad-hoc analytics, SQL users
2. REAL-TIME OLAP DATABASES
├── Apache Druid
├── ClickHouse
├── Apache Pinot
├── StarRocks
└── Apache Doris
Characteristics:
├── Sub-second queries on fresh data
├── Columnar storage with indexing
├── Designed for high concurrency
├── Best for: Real-time dashboards, user-facing analytics
3. DATA LAKE QUERY ENGINES
├── Trino (Presto)
├── Apache Spark SQL
├── Dremio
└── Starburst
Characteristics:
├── Query data in place (S3, HDFS)
├── Schema-on-read flexibility
├── Federated queries across sources
├── Best for: Data exploration, cross-system queries
4. EMBEDDED ANALYTICS
├── DuckDB
├── SQLite (with extensions)
└── Apache DataFusion
Characteristics:
├── Runs in-process
├── No server to manage
├── Great for local analytics
├── Best for: Edge computing, embedded BI
1.2 Choosing the Right Database
DECISION MATRIX
│ Latency │ Concurrency │ Freshness │ Cost │ Complexity
────────────────────┼─────────┼─────────────┼───────────┼─────────┼───────────
Snowflake │ Seconds │ Medium │ Near-RT │ High │ Low
BigQuery │ Seconds │ Medium │ Streaming │ Medium │ Low
ClickHouse │ Millis │ High │ Real-time │ Medium │ Medium
Druid │ Millis │ Very High │ Real-time │ Medium │ High
Trino │ Seconds │ Medium │ On-demand │ Low │ Medium
DuckDB │ Millis │ Single │ Batch │ Free │ Very Low
USE CASE MAPPING:
"Executive dashboard, 10 users, daily refresh"
→ Snowflake or BigQuery (simple, managed)
"Customer-facing analytics, 10K users, real-time"
→ ClickHouse or Druid (low latency, high concurrency)
"Ad-hoc exploration across data lake"
→ Trino (flexible, federated)
"Embedded analytics in application"
→ DuckDB (in-process, no infra)
"Real-time metrics for operations"
→ Druid or Pinot (sub-second, streaming ingestion)
1.3 Architecture Patterns
OLAP ARCHITECTURE PATTERNS
PATTERN 1: SINGLE WAREHOUSE
────────────────────────────
┌─────────────┐ ┌─────────────┐
│ Sources │───▶│ Snowflake │───▶ Dashboards
└─────────────┘ └─────────────┘
Simple, works for most companies.
Limitation: Cost scales with query volume.
PATTERN 2: LAKE + WAREHOUSE
────────────────────────────
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Sources │───▶│ Data Lake │───▶│ Warehouse │───▶ Dashboards
└─────────────┘ │ (S3) │ │ (Snowflake) │
└─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Trino │───▶ Ad-hoc queries
└─────────────┘
Lake for cheap storage, warehouse for performance.
PATTERN 3: LAMBDA QUERY LAYER
─────────────────────────────
┌─────────────┐ ┌─────────────┐
│ Real-time │───▶│ClickHouse │───▶ Real-time dashboards
│ (Kafka) │ └─────────────┘
└─────────────┘
│
▼
┌─────────────┐ ┌─────────────┐
│ Batch │───▶│ BigQuery │───▶ Historical analysis
│ (S3) │ └─────────────┘
└─────────────┘
Different engines for different latency needs.
PATTERN 4: SEMANTIC LAYER
─────────────────────────
┌─────────────┐
│ Sources │
└──────┬──────┘
│
▼
┌─────────────┐
│ Semantic │ (dbt, Cube.js, AtScale)
│ Layer │ Metrics definitions, caching
└──────┬──────┘
│
┌───┴───┐
▼ ▼
┌─────┐ ┌─────┐
│ DW │ │OLAP │
└─────┘ └─────┘
Consistent metrics across query engines.
Chapter 2: Query Optimization Fundamentals
2.1 Why Queries Are Slow
ANATOMY OF A SLOW QUERY
Query:
SELECT category, region, SUM(revenue)
FROM fact_sales f
JOIN dim_product p ON f.product_key = p.product_key
JOIN dim_store s ON f.store_key = s.store_key
WHERE f.date_key BETWEEN 20240101 AND 20240331
GROUP BY category, region
What makes it slow:
1. DATA VOLUME
├── 2.7B rows in fact_sales
├── Even with date filter: 675M rows for Q1
└── Raw scan: 50GB+ of data
2. JOINS
├── fact_sales → dim_product (1M products)
├── fact_sales → dim_store (10K stores)
└── Each join requires shuffle/broadcast
3. AGGREGATION
├── GROUP BY creates many groups
├── Partial aggregates on each node
└── Final merge on coordinator
4. NO OPTIMIZATION
├── No partitioning → full scan
├── No pre-aggregation → compute from scratch
├── No indexes → linear scan
└── No caching → repeat work every query
2.2 Optimization Techniques Overview
QUERY OPTIMIZATION TECHNIQUES
REDUCE DATA SCANNED
├── Partitioning (skip irrelevant files)
├── Clustering/Sorting (data locality)
├── Column pruning (read only needed columns)
├── Predicate pushdown (filter early)
└── Bloom filters (skip blocks without matches)
REDUCE COMPUTATION
├── Pre-aggregation (rollup tables)
├── Materialized views (precomputed results)
├── Approximate algorithms (HyperLogLog, sketches)
└── Sampling (analyze subset)
REDUCE LATENCY
├── Caching (query results, metadata)
├── Concurrency (parallel execution)
├── Vectorization (SIMD operations)
└── Pipelining (stream results)
REDUCE COST
├── Compression (smaller storage/transfer)
├── Tiered storage (hot/warm/cold)
├── Query scheduling (off-peak)
└── Result caching (don't recompute)
Chapter 3: Partitioning and Clustering
3.1 Partitioning Strategies
# query_optimization/partitioning.py
"""
Partitioning strategies for OLAP tables.
Partitioning divides data into manageable chunks that can be
queried independently. Good partitioning = orders of magnitude
faster queries.
"""
from dataclasses import dataclass
from typing import List, Optional
from datetime import date, timedelta
@dataclass
class PartitionConfig:
"""Configuration for table partitioning."""
column: str
type: str # "range", "hash", "list"
granularity: Optional[str] = None # For time: "day", "month", "year"
PARTITIONING_STRATEGIES = """
TIME-BASED PARTITIONING (Most Common)
─────────────────────────────────────
Daily partitions:
fact_sales/
├── date=2024-01-01/
├── date=2024-01-02/
├── date=2024-01-03/
└── ...
Query: WHERE date BETWEEN '2024-01-01' AND '2024-01-31'
Effect: Read 31 partitions, skip 334 others
Choosing granularity:
├── Daily: Best for recent data queries
├── Monthly: Best for historical analysis
├── Yearly: Best for archival data
└── Hourly: Best for real-time systems (if needed)
Rule of thumb: 100MB - 1GB per partition
HASH PARTITIONING
─────────────────
Distribute by hash of column (e.g., user_id):
fact_user_events/
├── user_hash=0/
├── user_hash=1/
├── ...
└── user_hash=255/
Query: WHERE user_id = 'user_123'
Effect: Read 1 partition (where hash(user_123) lands)
Use when:
├── Point lookups are common
├── Need even distribution
└── No natural time dimension
COMPOSITE PARTITIONING
──────────────────────
Multiple partition levels:
fact_sales/
├── year=2024/
│ ├── month=01/
│ │ ├── region=US/
│ │ ├── region=EU/
│ │ └── region=APAC/
│ └── month=02/
└── year=2023/
Query: WHERE year = 2024 AND month = 1 AND region = 'US'
Effect: Read single leaf partition
Warning: Can lead to too many small partitions!
"""
class PartitionPruner:
"""
Determines which partitions to read for a query.
"""
def __init__(self, partition_config: PartitionConfig):
self.config = partition_config
def get_partitions_for_query(
self,
filters: dict,
all_partitions: List[str]
) -> List[str]:
"""
Return only partitions needed for query filters.
"""
if self.config.column not in filters:
# No filter on partition column - read all
return all_partitions
filter_value = filters[self.config.column]
if self.config.type == "range":
return self._prune_range(filter_value, all_partitions)
elif self.config.type == "list":
return self._prune_list(filter_value, all_partitions)
elif self.config.type == "hash":
return self._prune_hash(filter_value, all_partitions)
return all_partitions
def _prune_range(self, filter_value, partitions: List[str]) -> List[str]:
"""Prune partitions for range filter."""
# filter_value is tuple (start, end)
start, end = filter_value
matching = []
for p in partitions:
partition_value = self._extract_partition_value(p)
if start <= partition_value <= end:
matching.append(p)
return matching
def _prune_list(self, filter_value, partitions: List[str]) -> List[str]:
"""Prune partitions for IN list filter."""
# filter_value is list of values
matching = []
for p in partitions:
partition_value = self._extract_partition_value(p)
if partition_value in filter_value:
matching.append(p)
return matching
def _prune_hash(self, filter_value, partitions: List[str]) -> List[str]:
"""Prune partitions for hash-partitioned table."""
# Calculate which partition the value hashes to
hash_bucket = hash(filter_value) % len(partitions)
return [partitions[hash_bucket]]
def _extract_partition_value(self, partition_path: str):
"""Extract partition value from path."""
# e.g., "date=2024-01-15" -> "2024-01-15"
return partition_path.split("=")[1]
def estimate_partition_savings(
total_partitions: int,
partitions_scanned: int,
avg_partition_size_gb: float
) -> dict:
"""Estimate savings from partition pruning."""
full_scan_gb = total_partitions * avg_partition_size_gb
actual_scan_gb = partitions_scanned * avg_partition_size_gb
return {
"full_scan_gb": full_scan_gb,
"actual_scan_gb": actual_scan_gb,
"savings_gb": full_scan_gb - actual_scan_gb,
"savings_pct": (1 - partitions_scanned / total_partitions) * 100,
"partitions_skipped": total_partitions - partitions_scanned
}
3.2 Clustering and Sorting
# query_optimization/clustering.py
"""
Clustering (sorting) within partitions for better query performance.
Partitioning decides WHICH files to read.
Clustering decides HOW data is organized WITHIN files.
"""
CLUSTERING_EXPLAINED = """
CLUSTERING (Z-ORDERING) VISUALIZATION
Without clustering:
File 1: [A, C, B, A, D, C, B, A, C, D]
File 2: [B, A, D, C, A, B, C, D, A, B]
Query: WHERE category = 'A'
→ Must read ALL files (A is scattered everywhere)
With clustering on category:
File 1: [A, A, A, A, A, B, B, B, B, B]
File 2: [C, C, C, C, D, D, D, D, D, D]
Query: WHERE category = 'A'
→ Read only File 1 (A is grouped together)
Z-ORDERING (Multi-column clustering):
Without Z-ordering:
Sorted by date only
Query on date + product: Good on date, scattered on product
With Z-ordering on (date, product):
Data interleaved by both columns using Z-curve
Query on date + product: Good locality for both
Visualization of Z-curve:
product
↑
4 │ 5 6 9 10
3 │ 4 7 8 11
2 │ 1 2 13 14
1 │ 0 3 12 15
└──────────────→ date
1 2 3 4
Z-order visits: 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
Query: date IN (1,2) AND product IN (1,2)
→ Values 0,1,2,3 are close together!
"""
class ClusteringConfig:
"""Configure clustering for a table."""
def __init__(
self,
columns: List[str],
method: str = "sort", # "sort", "zorder", "hilbert"
auto_cluster: bool = True
):
self.columns = columns
self.method = method
self.auto_cluster = auto_cluster
# BigQuery clustering example
BIGQUERY_CLUSTERING = """
-- Create clustered table in BigQuery
CREATE TABLE project.dataset.fact_sales
PARTITION BY DATE(order_timestamp)
CLUSTER BY product_category, customer_region
AS SELECT * FROM source_table;
-- Query benefits from both partition AND cluster pruning
SELECT product_category, SUM(revenue)
FROM project.dataset.fact_sales
WHERE DATE(order_timestamp) = '2024-01-15' -- Partition pruning
AND customer_region = 'US-WEST' -- Cluster pruning
GROUP BY product_category;
"""
# Delta Lake Z-ordering example
DELTA_ZORDER = """
-- Optimize Delta table with Z-ordering
OPTIMIZE fact_sales
ZORDER BY (product_id, customer_id);
-- This helps queries that filter on either or both columns
SELECT * FROM fact_sales
WHERE product_id = 'P123'
AND customer_id = 'C456';
"""
# ClickHouse ordering
CLICKHOUSE_ORDER = """
-- ClickHouse uses ORDER BY for clustering
CREATE TABLE fact_sales (
date Date,
category String,
region String,
revenue Float64
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (category, region, date); -- Data sorted by these columns
-- Queries on category, region are very fast
SELECT category, SUM(revenue)
FROM fact_sales
WHERE category = 'Electronics'
GROUP BY category;
"""
Chapter 4: Pre-Aggregation and Materialized Views
4.1 When to Pre-Aggregate
PRE-AGGREGATION DECISION FRAMEWORK
┌─────────────────────┐
│ Is the query pattern │
│ predictable? │
└──────────┬──────────┘
│
┌─────────────────┴─────────────────┐
│ │
YES NO
│ │
▼ ▼
┌───────────────────┐ ┌───────────────────┐
│ Is it queried │ │ Use columnar │
│ frequently? │ │ storage + indexes │
└─────────┬─────────┘ └───────────────────┘
│
┌─────────────┴─────────────┐
│ │
YES (>10/day) NO (<10/day)
│ │
▼ ▼
┌───────────────────┐ ┌───────────────────┐
│ PRE-AGGREGATE │ │ On-demand compute │
│ (Rollup table or │ │ is probably fine │
│ materialized view│ │ │
└───────────────────┘ └───────────────────┘
EXAMPLES:
Daily revenue by category (executive dashboard)
├── Queried: 100+ times/day by 20 executives
├── Pattern: Always last 30/90/365 days
├── Decision: PRE-AGGREGATE (daily rollup table)
└── Result: 50ms instead of 45 seconds
Revenue by product by hour by region (ad-hoc exploration)
├── Queried: 2-3 times/month by analyst
├── Pattern: Different dimensions each time
├── Decision: DON'T pre-aggregate
└── Result: 45 seconds is acceptable for rare ad-hoc
4.2 Rollup Tables
# query_optimization/rollups.py
"""
Rollup table implementation for pre-aggregation.
Rollups trade storage for query speed.
"""
from dataclasses import dataclass
from typing import List, Dict, Any
from datetime import date
@dataclass
class RollupDefinition:
"""Defines a rollup table."""
name: str
source_table: str
dimensions: List[str] # GROUP BY columns
measures: Dict[str, str] # column -> aggregation
refresh_schedule: str
retention_days: int = 365
# Common rollup patterns
ROLLUP_PATTERNS = """
ROLLUP HIERARCHY PATTERN
────────────────────────
Create multiple rollups at different granularities:
Level 0: Raw facts (grain: transaction)
fact_sales: 10B rows, 2TB
Level 1: Hourly rollup
agg_hourly_sales: 8.7M rows (365 days × 24 hours × 1000 combos)
Columns: hour, category, region, SUM(revenue), COUNT(orders)
Level 2: Daily rollup
agg_daily_sales: 365K rows (365 days × 1000 combos)
Columns: date, category, region, SUM(revenue), COUNT(orders)
Level 3: Monthly rollup
agg_monthly_sales: 12K rows (12 months × 1000 combos)
Columns: month, category, region, SUM(revenue), COUNT(orders)
QUERY ROUTING:
├── "Last 24 hours by hour" → Level 1 (hourly)
├── "Last 90 days by day" → Level 2 (daily)
├── "Last 2 years by month" → Level 3 (monthly)
└── "Drill into specific transactions" → Level 0 (raw)
CUBE ROLLUP PATTERN
───────────────────
Pre-compute all dimension combinations:
SELECT
COALESCE(category, 'ALL') as category,
COALESCE(region, 'ALL') as region,
COALESCE(channel, 'ALL') as channel,
SUM(revenue) as revenue,
COUNT(*) as orders
FROM fact_sales
WHERE date = '2024-01-15'
GROUP BY CUBE(category, region, channel);
This creates rows for:
├── (Electronics, US, Web) - specific
├── (Electronics, US, ALL) - channel total
├── (Electronics, ALL, ALL) - category total
├── (ALL, ALL, ALL) - grand total
└── ... all 27 combinations (3×3×3)
Query "revenue by category" → filter WHERE region='ALL' AND channel='ALL'
Instant response from pre-computed row!
"""
class RollupManager:
"""
Manages rollup table creation and refresh.
"""
def __init__(self, db_connection):
self.db = db_connection
self.rollups: Dict[str, RollupDefinition] = {}
def create_rollup(self, rollup: RollupDefinition):
"""Create a new rollup table."""
# Build CREATE TABLE statement
dimension_cols = ", ".join(rollup.dimensions)
measure_cols = ", ".join([
f"{agg}({col}) as {col}_{agg.lower()}"
for col, agg in rollup.measures.items()
])
sql = f"""
CREATE TABLE {rollup.name} AS
SELECT
{dimension_cols},
{measure_cols},
CURRENT_TIMESTAMP as refreshed_at
FROM {rollup.source_table}
GROUP BY {dimension_cols}
"""
self.db.execute(sql)
self.rollups[rollup.name] = rollup
def refresh_rollup(
self,
rollup_name: str,
incremental: bool = True,
target_date: date = None
):
"""
Refresh a rollup table.
Incremental: Only refresh specific date partition
Full: Rebuild entire rollup
"""
rollup = self.rollups[rollup_name]
if incremental and target_date:
# Delete and re-insert for specific date
self.db.execute(f"""
DELETE FROM {rollup.name}
WHERE date = '{target_date}'
""")
dimension_cols = ", ".join(rollup.dimensions)
measure_cols = ", ".join([
f"{agg}({col}) as {col}_{agg.lower()}"
for col, agg in rollup.measures.items()
])
self.db.execute(f"""
INSERT INTO {rollup.name}
SELECT
{dimension_cols},
{measure_cols},
CURRENT_TIMESTAMP as refreshed_at
FROM {rollup.source_table}
WHERE date = '{target_date}'
GROUP BY {dimension_cols}
""")
else:
# Full refresh - recreate
self.db.execute(f"DROP TABLE IF EXISTS {rollup.name}")
self.create_rollup(rollup)
def get_best_rollup_for_query(
self,
dimensions_needed: List[str],
time_granularity: str
) -> Optional[str]:
"""
Find the most efficient rollup for a query.
Returns rollup name or None if should use base table.
"""
candidates = []
for name, rollup in self.rollups.items():
# Check if rollup has all needed dimensions
if all(d in rollup.dimensions for d in dimensions_needed):
# Check if granularity matches
if self._matches_granularity(rollup, time_granularity):
candidates.append((name, len(rollup.dimensions)))
if not candidates:
return None
# Return rollup with fewest dimensions (most aggregated)
candidates.sort(key=lambda x: x[1])
return candidates[0][0]
def _matches_granularity(self, rollup: RollupDefinition, granularity: str) -> bool:
"""Check if rollup supports the requested time granularity."""
granularity_hierarchy = ["hour", "day", "week", "month", "quarter", "year"]
rollup_granularity = "day" # Default
for dim in rollup.dimensions:
if dim in granularity_hierarchy:
rollup_granularity = dim
break
# Rollup must be at same or finer granularity
return granularity_hierarchy.index(rollup_granularity) <= \
granularity_hierarchy.index(granularity)
4.3 Materialized Views
# query_optimization/materialized_views.py
"""
Materialized view strategies for different databases.
"""
MATERIALIZED_VIEW_PATTERNS = """
MATERIALIZED VIEWS VS ROLLUP TABLES
Materialized Views:
├── Auto-refresh (database manages)
├── Query optimizer can use automatically
├── May have limitations on supported queries
└── Less control over refresh timing
Rollup Tables:
├── Manual refresh (you control)
├── Must explicitly query the rollup
├── Full SQL flexibility
└── More control, more responsibility
SNOWFLAKE MATERIALIZED VIEWS
────────────────────────────
CREATE MATERIALIZED VIEW mv_daily_revenue AS
SELECT
DATE_TRUNC('day', order_timestamp) as date,
product_category,
SUM(revenue) as total_revenue,
COUNT(*) as order_count
FROM fact_sales
GROUP BY 1, 2;
-- Snowflake automatically:
-- 1. Keeps view in sync with base table
-- 2. Uses view when query matches pattern
-- 3. Charges for storage and maintenance
BIGQUERY MATERIALIZED VIEWS
───────────────────────────
CREATE MATERIALIZED VIEW project.dataset.mv_daily_revenue
OPTIONS (
enable_refresh = true,
refresh_interval_minutes = 60
)
AS
SELECT
DATE(order_timestamp) as date,
product_category,
SUM(revenue) as total_revenue,
COUNT(*) as order_count
FROM project.dataset.fact_sales
GROUP BY 1, 2;
-- BigQuery automatically:
-- 1. Incrementally refreshes
-- 2. Rewrites queries to use MV
-- 3. Combines MV + base table for fresh data
CLICKHOUSE MATERIALIZED VIEWS
─────────────────────────────
-- ClickHouse MVs are triggers that populate a target table
CREATE MATERIALIZED VIEW mv_daily_revenue
ENGINE = SummingMergeTree()
ORDER BY (date, category)
AS
SELECT
toDate(order_timestamp) as date,
category,
sum(revenue) as total_revenue,
count() as order_count
FROM fact_sales
GROUP BY date, category;
-- ClickHouse:
-- 1. Populates MV on INSERT to source
-- 2. Uses SummingMergeTree to merge aggregates
-- 3. No automatic query rewrite - query MV directly
"""
class MaterializedViewManager:
"""
Manages materialized views with freshness tracking.
"""
def __init__(self, db_connection):
self.db = db_connection
self.views: Dict[str, Dict] = {}
def create_view(
self,
name: str,
query: str,
refresh_interval_minutes: int = 60
):
"""Create a materialized view."""
# Create the view
self.db.execute(f"""
CREATE MATERIALIZED VIEW {name} AS
{query}
""")
self.views[name] = {
"query": query,
"refresh_interval": refresh_interval_minutes,
"last_refresh": None,
"row_count": 0
}
def get_freshness(self, name: str) -> Dict:
"""Get view freshness information."""
if name not in self.views:
return {"error": "View not found"}
view = self.views[name]
# Query for actual freshness
result = self.db.execute(f"""
SELECT
COUNT(*) as row_count,
MAX(refreshed_at) as last_refresh
FROM {name}
""")
return {
"name": name,
"last_refresh": result["last_refresh"],
"row_count": result["row_count"],
"refresh_interval_minutes": view["refresh_interval"],
"is_stale": self._is_stale(result["last_refresh"], view["refresh_interval"])
}
def _is_stale(self, last_refresh, interval_minutes: int) -> bool:
"""Check if view is stale."""
if not last_refresh:
return True
from datetime import datetime, timedelta
threshold = datetime.utcnow() - timedelta(minutes=interval_minutes)
return last_refresh < threshold
Chapter 5: Caching Strategies
5.1 Multi-Layer Caching
CACHING LAYERS FOR ANALYTICS
┌────────────────────────────────────────────────────────────────────────┐
│ CACHING ARCHITECTURE │
│ │
│ Layer 1: CDN / Edge Cache │
│ ├── Cache: Static dashboard HTML/JS │
│ ├── TTL: Hours to days │
│ └── Hit rate: 95%+ for static assets │
│ │
│ Layer 2: API Response Cache (Redis) │
│ ├── Cache: Query results as JSON │
│ ├── Key: Hash of query + parameters │
│ ├── TTL: Minutes to hours │
│ └── Hit rate: 60-80% for popular queries │
│ │
│ Layer 3: Query Result Cache (Database) │
│ ├── Cache: Query results in DB memory │
│ ├── Managed by: Database query cache │
│ ├── TTL: Until invalidation │
│ └── Hit rate: Varies by database │
│ │
│ Layer 4: Block/Page Cache (Storage) │
│ ├── Cache: Data blocks in memory │
│ ├── Managed by: OS or database │
│ └── Hit rate: High for repeated scans │
│ │
└────────────────────────────────────────────────────────────────────────┘
CACHE INVALIDATION STRATEGIES
1. TIME-BASED (TTL)
├── Simple: Expire after X minutes
├── Good for: Dashboards that can be slightly stale
└── Risk: Serving outdated data
2. EVENT-BASED
├── Invalidate when source data changes
├── Good for: Data that changes infrequently
└── Risk: Complex to implement correctly
3. VERSION-BASED
├── Include data version in cache key
├── Good for: Point-in-time queries
└── Risk: Cache bloat with many versions
4. HYBRID
├── TTL + event-based invalidation
├── Best of both worlds
└── Most common in production
5.2 Query Result Caching
# query_optimization/caching.py
"""
Query result caching implementation.
"""
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
import hashlib
import json
import redis
@dataclass
class CacheConfig:
"""Cache configuration."""
default_ttl_seconds: int = 300 # 5 minutes
max_result_size_mb: int = 10
redis_url: str = "redis://localhost:6379"
class QueryCache:
"""
Caches query results for fast repeated access.
"""
def __init__(self, config: CacheConfig):
self.config = config
self.redis = redis.from_url(config.redis_url)
# Metrics
self.hits = 0
self.misses = 0
def get(self, query: str, params: Dict = None) -> Optional[Dict]:
"""
Get cached result for query.
Returns None if not cached or expired.
"""
cache_key = self._make_key(query, params)
cached = self.redis.get(cache_key)
if cached:
self.hits += 1
result = json.loads(cached)
result["_cache"] = {
"hit": True,
"cached_at": result.get("_cached_at"),
"key": cache_key[:20] + "..."
}
return result
self.misses += 1
return None
def set(
self,
query: str,
params: Dict,
result: Dict,
ttl_seconds: int = None
):
"""
Cache a query result.
"""
cache_key = self._make_key(query, params)
ttl = ttl_seconds or self.config.default_ttl_seconds
# Add metadata
result["_cached_at"] = datetime.utcnow().isoformat()
# Check size
result_json = json.dumps(result)
size_mb = len(result_json) / (1024 * 1024)
if size_mb > self.config.max_result_size_mb:
# Too large to cache
return False
self.redis.setex(cache_key, ttl, result_json)
return True
def invalidate(self, pattern: str = None, query: str = None):
"""
Invalidate cached results.
pattern: Redis key pattern (e.g., "query:sales:*")
query: Specific query to invalidate
"""
if query:
cache_key = self._make_key(query, {})
# Invalidate all variants of this query
keys = self.redis.keys(f"{cache_key}*")
if keys:
self.redis.delete(*keys)
elif pattern:
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
def _make_key(self, query: str, params: Dict = None) -> str:
"""Generate cache key from query and params."""
# Normalize query
normalized = " ".join(query.lower().split())
# Include params in key
key_data = {
"query": normalized,
"params": params or {}
}
key_hash = hashlib.sha256(
json.dumps(key_data, sort_keys=True).encode()
).hexdigest()[:16]
return f"query:{key_hash}"
def get_stats(self) -> Dict:
"""Get cache statistics."""
total = self.hits + self.misses
return {
"hits": self.hits,
"misses": self.misses,
"hit_rate": self.hits / total if total > 0 else 0,
"total_requests": total
}
class SmartQueryCache(QueryCache):
"""
Smart caching with query-specific TTLs and warming.
"""
def __init__(self, config: CacheConfig):
super().__init__(config)
# Query-specific TTL rules
self.ttl_rules = {
"realtime": 60, # 1 minute for real-time queries
"hourly": 300, # 5 minutes for hourly data
"daily": 3600, # 1 hour for daily data
"historical": 86400, # 24 hours for historical
}
def get_ttl_for_query(self, query: str, params: Dict) -> int:
"""Determine appropriate TTL based on query characteristics."""
query_lower = query.lower()
# Real-time queries (today's data)
if "today" in query_lower or "current" in query_lower:
return self.ttl_rules["realtime"]
# Check date range in params
if params:
if "date" in params:
date_param = params["date"]
# If querying old data, cache longer
if isinstance(date_param, str) and date_param < "2024-01-01":
return self.ttl_rules["historical"]
# Default to daily TTL
return self.ttl_rules["daily"]
async def warm_cache(self, queries: list):
"""
Pre-warm cache with common queries.
Run during off-peak hours.
"""
for query_def in queries:
query = query_def["query"]
params = query_def.get("params", {})
# Execute query and cache result
result = await self._execute_query(query, params)
ttl = self.get_ttl_for_query(query, params)
self.set(query, params, result, ttl)
async def _execute_query(self, query: str, params: Dict) -> Dict:
"""Execute query against database."""
# Implementation depends on database
pass
Chapter 6: Multi-Tenant Analytics
6.1 Tenant Isolation Strategies
MULTI-TENANT ANALYTICS PATTERNS
PATTERN 1: ROW-LEVEL SECURITY
─────────────────────────────
Single table with tenant_id column:
fact_sales:
tenant_id | date | product | revenue
──────────┼──────┼─────────┼────────
tenant_a | ... | ... | ...
tenant_b | ... | ... | ...
Query filter:
SELECT * FROM fact_sales WHERE tenant_id = :current_tenant
Pros:
├── Simple to implement
├── Easy to add new tenants
└── Shared resources = lower cost
Cons:
├── Risk of data leakage (forgot WHERE clause)
├── Noisy neighbor (one tenant's queries affect others)
└── Scaling limits
PATTERN 2: SCHEMA PER TENANT
────────────────────────────
Separate schema for each tenant:
tenant_a.fact_sales
tenant_b.fact_sales
tenant_c.fact_sales
Query:
SELECT * FROM {tenant_schema}.fact_sales
Pros:
├── Strong isolation
├── Easy per-tenant customization
└── Simple backup/restore per tenant
Cons:
├── Schema proliferation
├── Harder to query across tenants
└── More maintenance
PATTERN 3: DATABASE PER TENANT
──────────────────────────────
Completely separate databases:
analytics_tenant_a (ClickHouse cluster 1)
analytics_tenant_b (ClickHouse cluster 2)
Pros:
├── Complete isolation
├── Independent scaling
├── Compliance-friendly (data residency)
Cons:
├── Highest cost
├── Most operational complexity
└── No cross-tenant analytics
6.2 Resource Management
# query_optimization/multi_tenant.py
"""
Multi-tenant query management with fair resource allocation.
"""
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Optional
import asyncio
@dataclass
class TenantQuota:
"""Resource quota for a tenant."""
max_concurrent_queries: int = 10
max_scanned_bytes_per_day: int = 100 * 1024**3 # 100GB
max_query_timeout_seconds: int = 300
priority: int = 1 # Higher = more priority
class TenantResourceManager:
"""
Manages query resources across tenants fairly.
"""
def __init__(self):
self.quotas: Dict[str, TenantQuota] = {}
self.usage: Dict[str, Dict] = {}
self.active_queries: Dict[str, int] = {}
def register_tenant(self, tenant_id: str, quota: TenantQuota):
"""Register a tenant with their quota."""
self.quotas[tenant_id] = quota
self.usage[tenant_id] = {
"bytes_scanned_today": 0,
"queries_today": 0,
"last_reset": datetime.utcnow().date()
}
self.active_queries[tenant_id] = 0
async def acquire_query_slot(self, tenant_id: str) -> bool:
"""
Try to acquire a query slot for tenant.
Returns False if quota exceeded.
"""
self._maybe_reset_daily_usage(tenant_id)
quota = self.quotas.get(tenant_id)
if not quota:
return False
# Check concurrent query limit
if self.active_queries[tenant_id] >= quota.max_concurrent_queries:
return False
# Check daily byte limit
if self.usage[tenant_id]["bytes_scanned_today"] >= quota.max_scanned_bytes_per_day:
return False
self.active_queries[tenant_id] += 1
return True
def release_query_slot(self, tenant_id: str, bytes_scanned: int):
"""Release query slot and record usage."""
self.active_queries[tenant_id] -= 1
self.usage[tenant_id]["bytes_scanned_today"] += bytes_scanned
self.usage[tenant_id]["queries_today"] += 1
def get_query_timeout(self, tenant_id: str) -> int:
"""Get query timeout for tenant."""
quota = self.quotas.get(tenant_id)
return quota.max_query_timeout_seconds if quota else 60
def _maybe_reset_daily_usage(self, tenant_id: str):
"""Reset usage counters if new day."""
today = datetime.utcnow().date()
if self.usage[tenant_id]["last_reset"] < today:
self.usage[tenant_id] = {
"bytes_scanned_today": 0,
"queries_today": 0,
"last_reset": today
}
def get_usage_report(self, tenant_id: str) -> Dict:
"""Get usage report for tenant."""
self._maybe_reset_daily_usage(tenant_id)
quota = self.quotas.get(tenant_id)
usage = self.usage.get(tenant_id, {})
return {
"tenant_id": tenant_id,
"today": {
"bytes_scanned": usage.get("bytes_scanned_today", 0),
"bytes_limit": quota.max_scanned_bytes_per_day if quota else 0,
"bytes_remaining": max(0,
(quota.max_scanned_bytes_per_day if quota else 0) -
usage.get("bytes_scanned_today", 0)
),
"queries_executed": usage.get("queries_today", 0)
},
"active_queries": self.active_queries.get(tenant_id, 0),
"max_concurrent": quota.max_concurrent_queries if quota else 0
}
class QueryRouter:
"""
Routes queries to appropriate resources based on tenant and query type.
"""
def __init__(
self,
resource_manager: TenantResourceManager,
query_cache: QueryCache
):
self.resources = resource_manager
self.cache = query_cache
# Query executors by type
self.executors = {}
async def execute_query(
self,
tenant_id: str,
query: str,
params: Dict = None
) -> Dict:
"""
Execute query with tenant isolation and resource management.
"""
# Check cache first
cached = self.cache.get(f"{tenant_id}:{query}", params)
if cached:
return cached
# Acquire resources
if not await self.resources.acquire_query_slot(tenant_id):
return {
"error": "Resource quota exceeded",
"retry_after_seconds": 60
}
try:
# Get timeout for tenant
timeout = self.resources.get_query_timeout(tenant_id)
# Execute with timeout
result = await asyncio.wait_for(
self._execute(tenant_id, query, params),
timeout=timeout
)
# Cache result
self.cache.set(f"{tenant_id}:{query}", params, result)
# Release resources
bytes_scanned = result.get("_metadata", {}).get("bytes_scanned", 0)
self.resources.release_query_slot(tenant_id, bytes_scanned)
return result
except asyncio.TimeoutError:
self.resources.release_query_slot(tenant_id, 0)
return {"error": f"Query timeout after {timeout}s"}
except Exception as e:
self.resources.release_query_slot(tenant_id, 0)
raise
async def _execute(self, tenant_id: str, query: str, params: Dict) -> Dict:
"""Execute query with tenant context."""
# Add tenant filter to query
safe_query = self._add_tenant_filter(query, tenant_id)
# Route to appropriate executor
executor = self._get_executor(query)
return await executor.execute(safe_query, params)
def _add_tenant_filter(self, query: str, tenant_id: str) -> str:
"""
Add tenant filter to query for row-level security.
In production, use database-level RLS instead.
"""
# Simple approach - inject WHERE clause
# WARNING: This is simplified - use parameterized queries!
if "WHERE" in query.upper():
return query.replace("WHERE", f"WHERE tenant_id = '{tenant_id}' AND")
else:
# Find insertion point
return query + f" WHERE tenant_id = '{tenant_id}'"
def _get_executor(self, query: str):
"""Get appropriate executor based on query type."""
query_lower = query.lower()
if "sum(" in query_lower or "count(" in query_lower:
return self.executors.get("olap", self.executors["default"])
return self.executors["default"]
Part II: Implementation
Chapter 7: Production Query Service
7.1 Complete Query Service
# query_optimization/query_service.py
"""
Production query service with all optimizations.
"""
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any, List, Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class QueryServiceConfig:
"""Query service configuration."""
cache_enabled: bool = True
cache_ttl_seconds: int = 300
max_result_rows: int = 10000
default_timeout_seconds: int = 30
enable_query_rewriting: bool = True
class QueryService:
"""
Production query service with caching, optimization, and monitoring.
"""
def __init__(
self,
config: QueryServiceConfig,
cache: QueryCache,
rollup_manager: RollupManager,
resource_manager: TenantResourceManager,
db_connection
):
self.config = config
self.cache = cache
self.rollups = rollup_manager
self.resources = resource_manager
self.db = db_connection
# Metrics
self.query_count = 0
self.cache_hits = 0
self.rollup_hits = 0
self.avg_latency_ms = 0
async def execute(
self,
tenant_id: str,
query: str,
params: Dict = None,
options: Dict = None
) -> Dict:
"""
Execute query with full optimization pipeline.
Pipeline:
1. Check cache
2. Acquire resources
3. Rewrite query (use rollups)
4. Execute
5. Cache result
6. Return with metadata
"""
start_time = datetime.utcnow()
self.query_count += 1
options = options or {}
result_metadata = {
"query_id": f"q_{self.query_count}_{start_time.timestamp()}",
"tenant_id": tenant_id,
"cache_hit": False,
"rollup_used": None,
"bytes_scanned": 0
}
try:
# Step 1: Check cache
if self.config.cache_enabled and not options.get("skip_cache"):
cache_key = f"{tenant_id}:{query}"
cached = self.cache.get(cache_key, params)
if cached:
self.cache_hits += 1
result_metadata["cache_hit"] = True
result_metadata["latency_ms"] = self._elapsed_ms(start_time)
return self._format_response(cached, result_metadata)
# Step 2: Acquire resources
if not await self.resources.acquire_query_slot(tenant_id):
return self._quota_exceeded_response(tenant_id)
try:
# Step 3: Query rewriting
optimized_query = query
if self.config.enable_query_rewriting:
optimized_query, rollup_used = self._try_rewrite_to_rollup(
query, params
)
if rollup_used:
self.rollup_hits += 1
result_metadata["rollup_used"] = rollup_used
# Step 4: Add tenant filter
safe_query = self._add_tenant_filter(optimized_query, tenant_id)
# Step 5: Execute with timeout
timeout = options.get("timeout", self.config.default_timeout_seconds)
result = await self._execute_query(safe_query, params, timeout)
result_metadata["bytes_scanned"] = result.get("_bytes_scanned", 0)
result_metadata["rows_returned"] = len(result.get("data", []))
# Step 6: Cache result
if self.config.cache_enabled:
ttl = self._get_cache_ttl(query, params)
self.cache.set(f"{tenant_id}:{query}", params, result, ttl)
# Release resources
self.resources.release_query_slot(
tenant_id,
result_metadata["bytes_scanned"]
)
result_metadata["latency_ms"] = self._elapsed_ms(start_time)
self._update_avg_latency(result_metadata["latency_ms"])
return self._format_response(result, result_metadata)
except Exception as e:
self.resources.release_query_slot(tenant_id, 0)
raise
except Exception as e:
logger.error(f"Query execution failed: {e}", exc_info=True)
result_metadata["latency_ms"] = self._elapsed_ms(start_time)
result_metadata["error"] = str(e)
return self._error_response(e, result_metadata)
def _try_rewrite_to_rollup(
self,
query: str,
params: Dict
) -> tuple:
"""
Try to rewrite query to use a rollup table.
Returns (rewritten_query, rollup_name) or (original_query, None)
"""
# Parse query to understand dimensions and granularity
dimensions = self._extract_dimensions(query)
granularity = self._extract_time_granularity(query, params)
# Find best rollup
rollup_name = self.rollups.get_best_rollup_for_query(
dimensions, granularity
)
if rollup_name:
# Rewrite query to use rollup
rewritten = self._rewrite_query_for_rollup(query, rollup_name)
return rewritten, rollup_name
return query, None
def _extract_dimensions(self, query: str) -> List[str]:
"""Extract GROUP BY dimensions from query."""
# Simplified parsing - use proper SQL parser in production
query_upper = query.upper()
if "GROUP BY" not in query_upper:
return []
group_by_idx = query_upper.index("GROUP BY")
group_by_clause = query[group_by_idx + 8:].split("ORDER BY")[0].split("LIMIT")[0]
dimensions = [d.strip() for d in group_by_clause.split(",")]
return dimensions
def _extract_time_granularity(self, query: str, params: Dict) -> str:
"""Determine time granularity of query."""
query_lower = query.lower()
if "hour" in query_lower or "hourly" in query_lower:
return "hour"
elif "day" in query_lower or "daily" in query_lower:
return "day"
elif "week" in query_lower or "weekly" in query_lower:
return "week"
elif "month" in query_lower or "monthly" in query_lower:
return "month"
return "day" # Default
def _rewrite_query_for_rollup(self, query: str, rollup_name: str) -> str:
"""Rewrite query to use rollup table."""
# Replace fact table with rollup
# Simplified - use proper SQL rewriting in production
rewritten = query.replace("fact_sales", rollup_name)
rewritten = rewritten.replace("fact_orders", rollup_name)
return rewritten
def _add_tenant_filter(self, query: str, tenant_id: str) -> str:
"""Add tenant isolation filter."""
# In production, use database-level RLS
# This is a simplified approach
if "WHERE" in query.upper():
# Insert after WHERE
where_idx = query.upper().index("WHERE")
return (
query[:where_idx + 5] +
f" tenant_id = '{tenant_id}' AND" +
query[where_idx + 5:]
)
else:
# Add WHERE clause
# Find safe insertion point
for keyword in ["GROUP BY", "ORDER BY", "LIMIT", ";"]:
if keyword in query.upper():
idx = query.upper().index(keyword)
return (
query[:idx] +
f" WHERE tenant_id = '{tenant_id}' " +
query[idx:]
)
return query + f" WHERE tenant_id = '{tenant_id}'"
async def _execute_query(
self,
query: str,
params: Dict,
timeout: int
) -> Dict:
"""Execute query against database."""
# Add LIMIT if not present
if "LIMIT" not in query.upper():
query = query + f" LIMIT {self.config.max_result_rows}"
# Execute with parameterized query
result = await self.db.execute(query, params, timeout=timeout)
return result
def _get_cache_ttl(self, query: str, params: Dict) -> int:
"""Determine cache TTL based on query characteristics."""
query_lower = query.lower()
# Real-time queries: short TTL
if "today" in query_lower or "last_hour" in query_lower:
return 60
# Historical queries: long TTL
if params and params.get("end_date", "").startswith("2023"):
return 3600
return self.config.cache_ttl_seconds
def _elapsed_ms(self, start: datetime) -> int:
"""Calculate elapsed time in milliseconds."""
return int((datetime.utcnow() - start).total_seconds() * 1000)
def _update_avg_latency(self, latency_ms: int):
"""Update rolling average latency."""
# Simple exponential moving average
alpha = 0.1
self.avg_latency_ms = alpha * latency_ms + (1 - alpha) * self.avg_latency_ms
def _format_response(self, result: Dict, metadata: Dict) -> Dict:
"""Format successful response."""
return {
"data": result.get("data", []),
"metadata": metadata,
"status": "success"
}
def _error_response(self, error: Exception, metadata: Dict) -> Dict:
"""Format error response."""
return {
"data": [],
"metadata": metadata,
"status": "error",
"error": {
"type": type(error).__name__,
"message": str(error)
}
}
def _quota_exceeded_response(self, tenant_id: str) -> Dict:
"""Format quota exceeded response."""
usage = self.resources.get_usage_report(tenant_id)
return {
"data": [],
"metadata": {"tenant_id": tenant_id},
"status": "error",
"error": {
"type": "QuotaExceeded",
"message": "Query quota exceeded",
"usage": usage,
"retry_after_seconds": 60
}
}
def get_stats(self) -> Dict:
"""Get service statistics."""
return {
"total_queries": self.query_count,
"cache_hit_rate": self.cache_hits / self.query_count if self.query_count > 0 else 0,
"rollup_hit_rate": self.rollup_hits / self.query_count if self.query_count > 0 else 0,
"avg_latency_ms": self.avg_latency_ms,
"cache_stats": self.cache.get_stats()
}
Part III: Real-World Application
Chapter 8: Case Studies
8.1 Amplitude's Query Engine
AMPLITUDE'S ANALYTICS QUERY ARCHITECTURE
Challenge:
├── 50K+ customers
├── Trillions of events
├── Sub-second query responses
├── Complex behavioral analytics (funnels, retention)
Architecture:
┌────────────────────────────────────────────────────────────────────────┐
│ AMPLITUDE NOVA ENGINE │
│ │
│ Ingestion: │
│ ├── Real-time events → Kafka → Druid (hot data) │
│ └── Historical → S3 → Trino (cold data) │
│ │
│ Query Optimization: │
│ ├── Pre-computed metrics per customer │
│ ├── Materialized views for common charts │
│ ├── Multi-level caching (edge, API, database) │
│ └── Query routing: hot → Druid, cold → Trino │
│ │
│ Multi-tenancy: │
│ ├── Logical isolation (tenant_id column) │
│ ├── Per-tenant query quotas │
│ ├── Priority queues (paid tiers get priority) │
│ └── Resource pools per customer size │
│ │
│ Key Innovation: BEHAVIORAL PRE-COMPUTATION │
│ ├── Pre-compute common metrics (DAU, WAU, MAU) │
│ ├── Pre-build funnel snapshots nightly │
│ ├── Retention matrices updated hourly │
│ └── Custom dashboards compile to optimized queries │
│ │
└────────────────────────────────────────────────────────────────────────┘
Results:
├── p50 latency: 200ms
├── p99 latency: 2s
├── Queries/day: 100M+
└── Cost efficiency: 10x improvement over naive approach
8.2 Mixpanel's JQL Engine
MIXPANEL'S JQL (JavaScript Query Language)
Challenge:
├── Flexible query language for analysts
├── Interactive exploration (fast iteration)
├── Massive scale (billions of events)
Solution:
┌────────────────────────────────────────────────────────────────────────┐
│ MIXPANEL ARIADNE │
│ │
│ Query Language: │
│ ├── JQL: JavaScript-like syntax for flexibility │
│ ├── Compiles to optimized execution plan │
│ └── Sandboxed execution for safety │
│ │
│ Storage: │
│ ├── Custom columnar format (not Parquet) │
│ ├── Per-project data isolation │
│ ├── Time-partitioned with custom indexing │
│ └── Compression optimized for event data │
│ │
│ Query Execution: │
│ ├── Query planner rewrites to optimal form │
│ ├── Predicate pushdown to storage layer │
│ ├── Parallel execution across partitions │
│ └── Streaming results (don't wait for full result) │
│ │
│ Optimization Tricks: │
│ ├── Sampling for exploration (fast approximate) │
│ ├── Full scan for reports (accurate) │
│ ├── Progressive result loading │
│ └── Query similarity detection for caching │
│ │
└────────────────────────────────────────────────────────────────────────┘
Chapter 9: Common Mistakes
9.1 Query Optimization Mistakes
QUERY OPTIMIZATION ANTI-PATTERNS
❌ MISTAKE 1: Over-Caching
Wrong:
# Cache everything for 24 hours
cache.set(query, result, ttl=86400)
Problem:
Stale data shown to users
Cache invalidation nightmares
Memory bloat
Right:
# TTL based on data freshness requirements
if is_realtime_query(query):
cache.set(query, result, ttl=60)
elif is_historical_query(query):
cache.set(query, result, ttl=3600)
❌ MISTAKE 2: Wrong Rollup Granularity
Wrong:
# One rollup to rule them all
CREATE TABLE agg_everything AS
SELECT date, category, region, store, product, customer_segment...
GROUP BY ALL
Problem:
Rollup is as big as fact table
No query benefit
Maintenance overhead
Right:
# Purpose-specific rollups
agg_daily_category -- Executive dashboard
agg_hourly_region -- Operations
agg_daily_product_top100 -- Product team
❌ MISTAKE 3: No Query Timeouts
Wrong:
result = db.execute(user_query) # Wait forever
Problem:
Runaway queries consume resources
Other users blocked
Cost explosion
Right:
result = db.execute(
user_query,
timeout_seconds=30,
max_bytes_scanned=10_000_000_000
)
❌ MISTAKE 4: Ignoring Query Patterns
Wrong:
# Optimize based on intuition
"Let's add an index on product_id"
Problem:
Index might not be used
Wasted storage and maintenance
Right:
# Analyze actual query patterns
SELECT query_text, count(*), avg(duration_ms)
FROM query_log
GROUP BY query_text
ORDER BY count(*) DESC
LIMIT 20
-- Then optimize top queries
9.2 Cost Management Mistakes
COST MANAGEMENT ANTI-PATTERNS
❌ MISTAKE 1: No Cost Visibility
Wrong:
# No idea what queries cost
Users run expensive queries freely
Problem:
$50K/month surprise bills
No accountability
Can't optimize
Right:
# Track cost per query, per tenant
Show estimated cost before execution
Alert on expensive queries
Dashboard for cost attribution
❌ MISTAKE 2: Scanning Full Tables
Wrong:
SELECT * FROM fact_sales WHERE product_id = 'P123'
-- No partition filter!
Problem:
Scans 10TB instead of 10GB
100x more expensive
Right:
SELECT * FROM fact_sales
WHERE date >= '2024-01-01' -- Partition pruning
AND product_id = 'P123'
❌ MISTAKE 3: SELECT *
Wrong:
SELECT * FROM fact_sales -- 50 columns
Problem:
Reads unnecessary columns
Network overhead
Memory waste
Right:
SELECT date, revenue, quantity -- Only needed columns
FROM fact_sales
Part IV: Interview Preparation
Chapter 10: Interview Tips
10.1 When to Discuss Query Optimization
QUERY OPTIMIZATION INTERVIEW TRIGGERS
"The dashboard is slow"
→ Caching, pre-aggregation, partitioning
"We're paying too much for BigQuery/Snowflake"
→ Cost optimization, rollups, tiered storage
"Design a multi-tenant analytics platform"
→ Tenant isolation, resource management, fair queuing
"How would you handle 10K concurrent dashboard users?"
→ Caching layers, pre-computation, database choice
"Users complain queries take too long"
→ Query analysis, indexing, materialized views
10.2 Key Phrases
TALKING ABOUT QUERY OPTIMIZATION
"For dashboard queries, I'd implement a multi-layer caching
strategy: CDN for static assets, Redis for query results,
and database-level caching. The key is matching TTL to
data freshness requirements."
"Pre-aggregation is crucial for analytics at scale. I'd
analyze query patterns to identify the top 20 queries
that account for 80% of load, then create rollup tables
for those specific patterns."
"For multi-tenant analytics, I'd use row-level security
with tenant_id filtering, combined with per-tenant
resource quotas. This prevents noisy neighbor issues
while keeping costs manageable."
"The choice of OLAP database depends on the access pattern.
For real-time dashboards with high concurrency, ClickHouse
or Druid. For ad-hoc exploration, BigQuery or Snowflake.
Often you need both."
Chapter 11: Practice Problems
Problem 1: Dashboard Performance
Setup: Your executive dashboard takes 45 seconds to load. It shows:
- Revenue by region by day (last 90 days)
- Top 10 products by revenue
- Conversion funnel by channel
Data: 5B events, 10TB total.
Questions:
- How would you get load time under 1 second?
- How do you keep data fresh while maintaining speed?
- What's your caching invalidation strategy?
- Pre-aggregate all three views as rollup tables
- Refresh rollups every hour (or on schedule)
- Cache rendered dashboard for 5 minutes
- Use streaming for "today" portion
Problem 2: Multi-Tenant Cost Control
Setup: You have 500 tenants sharing a BigQuery cluster. Three tenants are running expensive queries that cost $10K/month each, while most tenants cost $100/month.
Questions:
- How do you identify the expensive queries?
- How do you implement fair usage without blocking legitimate use?
- How do you communicate limits to tenants?
- Query logging with cost attribution per tenant
- Tiered quotas based on plan (bytes/day, concurrency)
- Pre-query cost estimation with user warning
- Usage dashboard visible to tenants
Chapter 12: Sample Interview Dialogue
Interviewer: "Our analytics queries are too slow and too expensive. How would you fix this?"
You: "I'd approach this systematically. First, I need to understand the current state. What's the query latency distribution? What's driving costs? Which queries are most frequent?"
Interviewer: "p50 is 5 seconds, p99 is 60 seconds. Costs are $80K/month on BigQuery. The top 10 queries account for 70% of both time and cost."
You: "That's actually good news — concentrated pain means concentrated opportunity. For those top 10 queries, I'd do three things:
First, pre-aggregation. If these are dashboard queries hitting the same patterns repeatedly, I'd create materialized views or rollup tables. A query that scans 1TB becomes a query that scans 1GB.
Second, caching. For queries that don't need up-to-the-second freshness, cache results in Redis. A dashboard that 50 people load every morning doesn't need 50 BigQuery scans — one scan, 49 cache hits.
Third, partitioning and clustering. Make sure tables are partitioned by date and clustered by common filter columns. This is table stakes for BigQuery cost control.
For the long tail of queries, I'd implement query governance: byte scan limits, timeout limits, and cost estimation before execution. Users should know 'this query will cost $50 and take 2 minutes' before they run it."
Interviewer: "What about the freshness vs speed trade-off?"
You: "Great question. I'd segment by use case:
Real-time dashboards (operations): Streaming ingestion to a real-time OLAP like Druid or ClickHouse. Sub-second queries, always fresh. More expensive but worth it for ops.
Business dashboards (executives): Pre-computed views refreshed hourly or daily. They're okay with 'as of this morning' data. Cheap and fast.
Ad-hoc exploration (analysts): Query the raw data when needed. They expect slower queries and accept the cost for flexibility.
The key is matching the architecture to the use case, not trying to make one system do everything optimally."
Summary
DAY 5 SUMMARY: QUERY LAYER AND OPTIMIZATION
OLAP DATABASE CHOICE
├── Cloud warehouses: Snowflake, BigQuery (simple, managed)
├── Real-time OLAP: ClickHouse, Druid (fast, concurrent)
├── Lake query engines: Trino (flexible, federated)
└── Match database to access pattern
OPTIMIZATION TECHNIQUES
├── Partitioning: Skip irrelevant files
├── Clustering: Group related data
├── Pre-aggregation: Compute once, query fast
├── Caching: Don't recompute unchanged data
└── Query rewriting: Use rollups automatically
PRE-AGGREGATION
├── Analyze query patterns first
├── Create purpose-specific rollups
├── Hierarchy: hourly → daily → monthly
├── Cube pattern for drill-down
└── Refresh strategy matters
CACHING STRATEGY
├── Multi-layer: CDN → API → Database
├── TTL based on freshness needs
├── Invalidation: time or event-based
├── Warm cache during off-peak
└── Monitor hit rates
MULTI-TENANCY
├── Row-level security: Simple, shared
├── Schema per tenant: Isolated, more work
├── Database per tenant: Complete isolation, highest cost
├── Resource quotas: Fair usage, prevent noisy neighbor
└── Cost attribution: Know who's spending what
Key Takeaways
QUERY OPTIMIZATION KEY TAKEAWAYS
1. KNOW YOUR QUERY PATTERNS
Can't optimize what you don't measure
2. PRE-COMPUTE WHAT'S PREDICTABLE
Rollups for dashboards, raw for ad-hoc
3. CACHE AGGRESSIVELY, INVALIDATE CAREFULLY
Right TTL is critical
4. PARTITION EVERYTHING
Date partitioning is table stakes
5. MATCH DATABASE TO USE CASE
No one database does it all
GOLDEN RULES:
├── Analyze before optimizing
├── 20% of queries cause 80% of cost
├── Pre-aggregate predictable patterns
├── Cache with appropriate TTL
└── Monitor, iterate, improve
Week 8 Complete!
WEEK 8 SUMMARY: ANALYTICS PIPELINE
Day 1: EVENT INGESTION
├── Event schema design
├── Kafka as backbone
├── Validation and enrichment
└── Exactly-once delivery
Day 2: STREAMING VS BATCH
├── Lambda vs Kappa architecture
├── Flink for streaming
├── Spark for batch
└── Watermarks and state
Day 3: DATA MODELING
├── Star schema
├── Fact and dimension tables
├── SCD Type 2
└── Columnar storage
Day 4: LATE DATA
├── Event time vs processing time
├── Watermarks deep dive
├── Versioned results
└── Reprocessing patterns
Day 5: QUERY LAYER
├── OLAP databases
├── Pre-aggregation
├── Caching strategies
└── Multi-tenant management
You now understand the complete analytics pipeline
from event capture to fast dashboard queries!
What's Next
The Week 8 Capstone will integrate all these concepts into designing a complete analytics system for a video streaming platform.
CAPSTONE PREVIEW: STREAMING ANALYTICS PLATFORM
Design Netflix-style analytics:
├── Track video plays, pauses, completions
├── Real-time "trending now" dashboard
├── Historical viewing patterns
├── Content performance for creators
├── A/B test analysis
└── Handle 10M concurrent viewers
This will test everything from Week 8:
├── Event ingestion at scale
├── Streaming + batch processing
├── Star schema for viewing data
├── Late data from mobile apps
└── Fast queries for dashboards
End of Week 8, Day 5
Next: Week 8 Capstone — Real-Time Analytics for Streaming Platform