Himanshu Kukreja
0%
Day 03

Week 8 — Day 3: Data Modeling for Analytics

System Design Mastery Series — Analytics Pipeline Week


Preface

Yesterday, we learned when to use streaming versus batch processing. Our Flink jobs and Spark jobs are now producing aggregated metrics. But where do these metrics go? How do we store them for fast queries?

THE QUERY PERFORMANCE PROBLEM

Your batch job computed daily revenue by category:
├── 365 days × 1000 categories = 365,000 rows
├── Query: "Show revenue by category for 2024"
└── Result: 50ms ✓ Fast!

But the CEO asks:
├── "Break it down by country" (200 countries)
├── "And by customer segment" (10 segments)
├── "And by channel" (5 channels)

Now you have:
├── 365 × 1000 × 200 × 10 × 5 = 3.65 BILLION rows
├── Same query now takes 45 seconds
└── Dashboard unusable

The problem isn't compute — it's how you model the data.

Today, we'll learn to design data models that make analytics queries fast, even over billions of rows.


Part I: Foundations

Chapter 1: OLTP vs OLAP

1.1 Two Different Worlds

OLTP (Online Transaction Processing)
────────────────────────────────────
Purpose: Run the business
Examples: Orders, payments, user signups

Characteristics:
├── Many small transactions
├── Read and write single rows
├── Normalized (3NF) to avoid duplication
├── Optimized for write speed
├── Row-oriented storage

Query patterns:
├── "Get order #12345"
├── "Update user's email"
├── "Insert new payment"
└── Touches few rows, many columns


OLAP (Online Analytical Processing)
────────────────────────────────────
Purpose: Understand the business
Examples: Revenue reports, funnel analysis, trends

Characteristics:
├── Few large queries
├── Read many rows, few columns
├── Denormalized for query speed
├── Optimized for read speed
├── Column-oriented storage

Query patterns:
├── "Total revenue by month for 2024"
├── "Conversion rate by country"
├── "Top 100 products by sales"
└── Touches many rows, few columns

1.2 Why Different Models?

THE NORMALIZATION TRADE-OFF

OLTP: Normalized (3NF)
──────────────────────

orders:
  order_id | user_id | total | created_at

users:
  user_id | name | country_id | segment_id

countries:
  country_id | name | region

segments:
  segment_id | name | description


Query: "Revenue by country"

SELECT c.name, SUM(o.total)
FROM orders o
JOIN users u ON o.user_id = u.user_id
JOIN countries c ON u.country_id = c.country_id
GROUP BY c.name

Problems:
├── Multiple JOINs (expensive at scale)
├── Each JOIN shuffles data
├── Index lookups for each row
└── Gets slower as data grows


OLAP: Denormalized (Star Schema)
────────────────────────────────

fact_orders:
  order_id | user_id | country | segment | total | date

Query: "Revenue by country"

SELECT country, SUM(total)
FROM fact_orders
GROUP BY country

Benefits:
├── No JOINs needed
├── Single table scan
├── Columnar storage efficient
└── Scales linearly with data

Chapter 2: Star Schema

2.1 The Core Concept

STAR SCHEMA STRUCTURE

                        ┌─────────────────┐
                        │  dim_product    │
                        │ ─────────────── │
                        │ product_key (PK)│
                        │ product_id      │
                        │ name            │
                        │ category        │
                        │ subcategory     │
                        │ brand           │
                        │ price           │
                        └────────┬────────┘
                                 │
┌─────────────────┐              │              ┌─────────────────┐
│   dim_date      │              │              │  dim_customer   │
│ ─────────────── │              │              │ ─────────────── │
│ date_key (PK)   │              │              │ customer_key(PK)│
│ date            │              │              │ customer_id     │
│ day             │      ┌───────┴───────┐      │ name            │
│ month           │      │               │      │ email           │
│ quarter         │◄─────┤  fact_sales   ├─────►│ segment         │
│ year            │      │ ───────────── │      │ tier            │
│ day_of_week     │      │ sale_key (PK) │      │ signup_date     │
│ is_weekend      │      │ date_key (FK) │      │ country         │
│ is_holiday      │      │ product_key   │      │ region          │
└─────────────────┘      │ customer_key  │      └─────────────────┘
                         │ store_key     │
                         │ quantity      │
                         │ unit_price    │
                         │ discount      │
                         │ revenue       │
┌─────────────────┐      │ profit        │
│   dim_store     │      └───────┬───────┘
│ ─────────────── │              │
│ store_key (PK)  │              │
│ store_id        │◄─────────────┘
│ name            │
│ city            │
│ state           │
│ country         │
│ region          │
│ store_type      │
└─────────────────┘


WHY IT'S CALLED "STAR":
The fact table in the center with dimension tables around it
looks like a star when you draw the relationships.

KEY PRINCIPLES:
1. FACT TABLE: Contains measures (numbers you aggregate)
   - revenue, quantity, discount, profit
   - One row per transaction/event
   - Foreign keys to dimension tables

2. DIMENSION TABLES: Contains attributes (things you group by)
   - product details, customer info, date attributes
   - Denormalized (no further joins needed)
   - Descriptive text fields

2.2 Implementing Star Schema

# modeling/star_schema.py

"""
Star schema implementation for e-commerce analytics.

This model supports queries like:
- Revenue by product category by month
- Top customers by segment by region
- Sales trends by day of week
"""

from dataclasses import dataclass
from datetime import date, datetime
from typing import Optional, List
from decimal import Decimal


# =============================================================================
# Dimension Tables
# =============================================================================

@dataclass
class DimDate:
    """
    Date dimension with pre-computed attributes.
    
    Generated once, rarely changes.
    Enables fast date-based filtering and grouping.
    """
    date_key: int  # Surrogate key (YYYYMMDD format)
    date: date
    day: int
    month: int
    month_name: str
    quarter: int
    year: int
    day_of_week: int  # 0=Monday, 6=Sunday
    day_name: str
    week_of_year: int
    is_weekend: bool
    is_holiday: bool
    is_last_day_of_month: bool
    fiscal_year: int
    fiscal_quarter: int


@dataclass
class DimProduct:
    """
    Product dimension with hierarchy.
    
    Category → Subcategory → Product
    """
    product_key: int  # Surrogate key
    product_id: str  # Business key
    product_name: str
    category: str
    subcategory: str
    brand: str
    supplier: str
    unit_price: Decimal
    unit_cost: Decimal
    is_active: bool
    
    # SCD Type 2 fields (for tracking changes)
    valid_from: date
    valid_to: Optional[date]
    is_current: bool


@dataclass  
class DimCustomer:
    """
    Customer dimension with segmentation.
    """
    customer_key: int  # Surrogate key
    customer_id: str  # Business key
    first_name: str
    last_name: str
    email: str
    phone: Optional[str]
    
    # Demographics
    age_group: str  # "18-25", "26-35", etc.
    gender: Optional[str]
    
    # Segmentation
    segment: str  # "Premium", "Standard", "Budget"
    tier: str  # "Gold", "Silver", "Bronze"
    lifetime_value_band: str  # "High", "Medium", "Low"
    
    # Geography
    city: str
    state: str
    country: str
    region: str  # "North America", "Europe", etc.
    postal_code: str
    
    # Dates
    first_purchase_date: date
    signup_date: date
    
    # SCD fields
    valid_from: date
    valid_to: Optional[date]
    is_current: bool


@dataclass
class DimStore:
    """
    Store/channel dimension.
    """
    store_key: int
    store_id: str
    store_name: str
    store_type: str  # "Online", "Retail", "Warehouse"
    city: str
    state: str
    country: str
    region: str
    timezone: str
    opened_date: date
    manager_name: str
    square_footage: Optional[int]


# =============================================================================
# Fact Table
# =============================================================================

@dataclass
class FactSales:
    """
    Sales fact table.
    
    Grain: One row per line item in an order.
    
    Contains:
    - Foreign keys to all dimensions
    - Measures (numeric values to aggregate)
    - Degenerate dimensions (order_id - no separate table needed)
    """
    # Surrogate key
    sale_key: int
    
    # Foreign keys to dimensions
    date_key: int
    product_key: int
    customer_key: int
    store_key: int
    
    # Degenerate dimension (stored in fact, no lookup needed)
    order_id: str
    line_item_number: int
    
    # Measures (things we SUM, AVG, COUNT)
    quantity: int
    unit_price: Decimal
    unit_cost: Decimal
    discount_amount: Decimal
    tax_amount: Decimal
    
    # Derived measures (pre-calculated for performance)
    gross_revenue: Decimal  # quantity * unit_price
    net_revenue: Decimal    # gross_revenue - discount_amount
    profit: Decimal         # net_revenue - (quantity * unit_cost)
    
    # Timestamps
    order_timestamp: datetime
    ship_timestamp: Optional[datetime]


# =============================================================================
# Schema Creation SQL
# =============================================================================

STAR_SCHEMA_DDL = """
-- Date Dimension (generate for all dates needed)
CREATE TABLE dim_date (
    date_key        INTEGER PRIMARY KEY,  -- YYYYMMDD
    date            DATE NOT NULL,
    day             SMALLINT NOT NULL,
    month           SMALLINT NOT NULL,
    month_name      VARCHAR(10) NOT NULL,
    quarter         SMALLINT NOT NULL,
    year            SMALLINT NOT NULL,
    day_of_week     SMALLINT NOT NULL,
    day_name        VARCHAR(10) NOT NULL,
    week_of_year    SMALLINT NOT NULL,
    is_weekend      BOOLEAN NOT NULL,
    is_holiday      BOOLEAN NOT NULL,
    fiscal_year     SMALLINT NOT NULL,
    fiscal_quarter  SMALLINT NOT NULL
);

-- Product Dimension
CREATE TABLE dim_product (
    product_key     SERIAL PRIMARY KEY,
    product_id      VARCHAR(50) NOT NULL,
    product_name    VARCHAR(200) NOT NULL,
    category        VARCHAR(100) NOT NULL,
    subcategory     VARCHAR(100) NOT NULL,
    brand           VARCHAR(100),
    supplier        VARCHAR(100),
    unit_price      DECIMAL(10,2) NOT NULL,
    unit_cost       DECIMAL(10,2) NOT NULL,
    is_active       BOOLEAN NOT NULL DEFAULT TRUE,
    valid_from      DATE NOT NULL,
    valid_to        DATE,
    is_current      BOOLEAN NOT NULL DEFAULT TRUE
);

CREATE INDEX idx_product_id ON dim_product(product_id);
CREATE INDEX idx_product_category ON dim_product(category, subcategory);

-- Customer Dimension  
CREATE TABLE dim_customer (
    customer_key    SERIAL PRIMARY KEY,
    customer_id     VARCHAR(50) NOT NULL,
    first_name      VARCHAR(100),
    last_name       VARCHAR(100),
    email           VARCHAR(200),
    segment         VARCHAR(50) NOT NULL,
    tier            VARCHAR(20) NOT NULL,
    city            VARCHAR(100),
    state           VARCHAR(100),
    country         VARCHAR(100) NOT NULL,
    region          VARCHAR(50) NOT NULL,
    first_purchase_date DATE,
    valid_from      DATE NOT NULL,
    valid_to        DATE,
    is_current      BOOLEAN NOT NULL DEFAULT TRUE
);

CREATE INDEX idx_customer_id ON dim_customer(customer_id);
CREATE INDEX idx_customer_segment ON dim_customer(segment, tier);
CREATE INDEX idx_customer_region ON dim_customer(region, country);

-- Store Dimension
CREATE TABLE dim_store (
    store_key       SERIAL PRIMARY KEY,
    store_id        VARCHAR(50) NOT NULL,
    store_name      VARCHAR(200) NOT NULL,
    store_type      VARCHAR(50) NOT NULL,
    city            VARCHAR(100),
    state           VARCHAR(100),
    country         VARCHAR(100) NOT NULL,
    region          VARCHAR(50) NOT NULL,
    timezone        VARCHAR(50),
    opened_date     DATE
);

-- Sales Fact Table
CREATE TABLE fact_sales (
    sale_key        BIGSERIAL PRIMARY KEY,
    date_key        INTEGER NOT NULL REFERENCES dim_date(date_key),
    product_key     INTEGER NOT NULL REFERENCES dim_product(product_key),
    customer_key    INTEGER NOT NULL REFERENCES dim_customer(customer_key),
    store_key       INTEGER NOT NULL REFERENCES dim_store(store_key),
    order_id        VARCHAR(50) NOT NULL,
    line_item_number SMALLINT NOT NULL,
    quantity        INTEGER NOT NULL,
    unit_price      DECIMAL(10,2) NOT NULL,
    discount_amount DECIMAL(10,2) NOT NULL DEFAULT 0,
    tax_amount      DECIMAL(10,2) NOT NULL DEFAULT 0,
    gross_revenue   DECIMAL(12,2) NOT NULL,
    net_revenue     DECIMAL(12,2) NOT NULL,
    profit          DECIMAL(12,2) NOT NULL,
    order_timestamp TIMESTAMP NOT NULL
);

-- Critical indexes for fact table
CREATE INDEX idx_fact_date ON fact_sales(date_key);
CREATE INDEX idx_fact_product ON fact_sales(product_key);
CREATE INDEX idx_fact_customer ON fact_sales(customer_key);
CREATE INDEX idx_fact_order ON fact_sales(order_id);

-- Composite indexes for common query patterns
CREATE INDEX idx_fact_date_product ON fact_sales(date_key, product_key);
CREATE INDEX idx_fact_date_store ON fact_sales(date_key, store_key);
"""

2.3 Star Schema Queries

# modeling/star_queries.py

"""
Example queries on star schema.

These demonstrate why star schema is efficient for analytics.
"""

EXAMPLE_QUERIES = """
-- =============================================================================
-- Query 1: Revenue by category by month
-- =============================================================================
-- Simple two-table join (fact + product dimension)

SELECT 
    p.category,
    d.year,
    d.month,
    SUM(f.net_revenue) as revenue,
    COUNT(DISTINCT f.order_id) as orders
FROM fact_sales f
JOIN dim_product p ON f.product_key = p.product_key
JOIN dim_date d ON f.date_key = d.date_key
WHERE d.year = 2024
  AND p.is_current = TRUE
GROUP BY p.category, d.year, d.month
ORDER BY d.year, d.month, revenue DESC;


-- =============================================================================
-- Query 2: Top 10 products by revenue this quarter
-- =============================================================================

SELECT 
    p.product_name,
    p.category,
    SUM(f.net_revenue) as revenue,
    SUM(f.quantity) as units_sold,
    SUM(f.profit) as profit
FROM fact_sales f
JOIN dim_product p ON f.product_key = p.product_key
JOIN dim_date d ON f.date_key = d.date_key
WHERE d.year = 2024 AND d.quarter = 4
GROUP BY p.product_key, p.product_name, p.category
ORDER BY revenue DESC
LIMIT 10;


-- =============================================================================
-- Query 3: Customer segment analysis
-- =============================================================================

SELECT 
    c.segment,
    c.tier,
    c.region,
    COUNT(DISTINCT c.customer_key) as customers,
    SUM(f.net_revenue) as revenue,
    SUM(f.net_revenue) / COUNT(DISTINCT c.customer_key) as revenue_per_customer,
    SUM(f.quantity) / COUNT(DISTINCT f.order_id) as avg_items_per_order
FROM fact_sales f
JOIN dim_customer c ON f.customer_key = c.customer_key
JOIN dim_date d ON f.date_key = d.date_key
WHERE d.year = 2024
GROUP BY c.segment, c.tier, c.region
ORDER BY revenue DESC;


-- =============================================================================
-- Query 4: Year-over-year comparison
-- =============================================================================

WITH current_year AS (
    SELECT 
        p.category,
        SUM(f.net_revenue) as revenue_2024
    FROM fact_sales f
    JOIN dim_product p ON f.product_key = p.product_key
    JOIN dim_date d ON f.date_key = d.date_key
    WHERE d.year = 2024
    GROUP BY p.category
),
previous_year AS (
    SELECT 
        p.category,
        SUM(f.net_revenue) as revenue_2023
    FROM fact_sales f
    JOIN dim_product p ON f.product_key = p.product_key
    JOIN dim_date d ON f.date_key = d.date_key
    WHERE d.year = 2023
    GROUP BY p.category
)
SELECT 
    c.category,
    c.revenue_2024,
    p.revenue_2023,
    (c.revenue_2024 - p.revenue_2023) / p.revenue_2023 * 100 as yoy_growth_pct
FROM current_year c
JOIN previous_year p ON c.category = p.category
ORDER BY yoy_growth_pct DESC;


-- =============================================================================
-- Query 5: Conversion funnel by store type
-- =============================================================================

SELECT 
    s.store_type,
    COUNT(DISTINCT f.order_id) as orders,
    COUNT(DISTINCT f.customer_key) as unique_customers,
    SUM(f.net_revenue) as revenue,
    AVG(f.net_revenue) as avg_order_value,
    SUM(f.discount_amount) / SUM(f.gross_revenue) * 100 as discount_rate
FROM fact_sales f
JOIN dim_store s ON f.store_key = s.store_key
JOIN dim_date d ON f.date_key = d.date_key
WHERE d.year = 2024
GROUP BY s.store_type
ORDER BY revenue DESC;
"""

Chapter 3: Slowly Changing Dimensions (SCD)

3.1 The Problem

THE CHANGING DIMENSION PROBLEM

A customer's attributes change over time:

January: Customer John
├── Segment: "Standard"
├── Tier: "Bronze"
└── Country: "USA"

June: Customer John (upgraded!)
├── Segment: "Premium"    ← Changed!
├── Tier: "Gold"          ← Changed!
└── Country: "USA"

Question: When we query Q1 revenue by segment,
should John count as "Standard" or "Premium"?

Answer: It depends on the business question!

Option A: Current value
  "How much did our current Premium customers spend in Q1?"
  John counts as Premium (even for Q1 purchases)

Option B: Historical value
  "What was the revenue split by segment in Q1?"
  John counts as Standard for Q1 purchases

Both are valid! We need to support both.

3.2 SCD Types

SLOWLY CHANGING DIMENSION TYPES

TYPE 0: Retain Original
──────────────────────
Never change. Keep the first value forever.
Use for: Immutable attributes (signup date, original source)


TYPE 1: Overwrite
─────────────────
Simply update the value. Lose history.

Before:  customer_id=123, segment="Standard"
After:   customer_id=123, segment="Premium"

Pros: Simple, no extra storage
Cons: No history, can't analyze changes


TYPE 2: Add New Row (Most Common)
─────────────────────────────────
Create new row for each change. Track validity period.

customer_key | customer_id | segment   | valid_from | valid_to   | is_current
1            | 123         | Standard  | 2024-01-01 | 2024-05-31 | FALSE
2            | 123         | Premium   | 2024-06-01 | NULL       | TRUE

Pros: Full history, accurate point-in-time analysis
Cons: More storage, more complex queries


TYPE 3: Add New Column
──────────────────────
Track current and previous value only.

customer_id | current_segment | previous_segment
123         | Premium         | Standard

Pros: Simple to query current vs previous
Cons: Only one level of history


TYPE 6: Hybrid (1 + 2 + 3)
──────────────────────────
Combine approaches for flexibility.

customer_key | customer_id | segment  | current_segment | valid_from | valid_to
1            | 123         | Standard | Premium         | 2024-01-01 | 2024-05-31
2            | 123         | Premium  | Premium         | 2024-06-01 | NULL

Can query historical value (segment) OR current value (current_segment)

3.3 Implementing SCD Type 2

# modeling/scd_type2.py

"""
Slowly Changing Dimension Type 2 implementation.

Maintains full history of dimension changes.
"""

from dataclasses import dataclass
from datetime import date, datetime
from typing import Optional, Dict, Any, List
import hashlib


@dataclass
class DimensionRecord:
    """A record in an SCD Type 2 dimension."""
    surrogate_key: int
    business_key: str
    attributes: Dict[str, Any]
    valid_from: date
    valid_to: Optional[date]
    is_current: bool
    hash_value: str  # For change detection


class SCDType2Manager:
    """
    Manages SCD Type 2 dimension updates.
    
    Handles:
    - New records (INSERT)
    - Changed records (close old, open new)
    - Unchanged records (no action)
    - Deleted records (close, don't delete)
    """
    
    def __init__(self, tracked_columns: List[str]):
        """
        Args:
            tracked_columns: Columns that trigger new version on change
        """
        self.tracked_columns = tracked_columns
    
    def compute_hash(self, attributes: Dict[str, Any]) -> str:
        """
        Compute hash of tracked columns for change detection.
        """
        values = [str(attributes.get(col, "")) for col in self.tracked_columns]
        combined = "|".join(values)
        return hashlib.md5(combined.encode()).hexdigest()
    
    def process_updates(
        self,
        current_records: List[DimensionRecord],
        incoming_records: List[Dict[str, Any]],
        effective_date: date
    ) -> Dict[str, List]:
        """
        Process dimension updates using SCD Type 2 logic.
        
        Returns dict with:
        - 'inserts': New records to insert
        - 'updates': Existing records to update (close)
        - 'unchanged': Records that don't need changes
        """
        
        # Build lookup of current records by business key
        current_by_key = {
            r.business_key: r for r in current_records if r.is_current
        }
        
        inserts = []
        updates = []
        unchanged = []
        
        for incoming in incoming_records:
            business_key = incoming["business_key"]
            new_hash = self.compute_hash(incoming)
            
            if business_key not in current_by_key:
                # New record - INSERT
                inserts.append({
                    **incoming,
                    "valid_from": effective_date,
                    "valid_to": None,
                    "is_current": True,
                    "hash_value": new_hash
                })
            
            else:
                current = current_by_key[business_key]
                
                if current.hash_value != new_hash:
                    # Changed - close old, insert new
                    updates.append({
                        "surrogate_key": current.surrogate_key,
                        "valid_to": effective_date,
                        "is_current": False
                    })
                    
                    inserts.append({
                        **incoming,
                        "valid_from": effective_date,
                        "valid_to": None,
                        "is_current": True,
                        "hash_value": new_hash
                    })
                
                else:
                    # Unchanged
                    unchanged.append(current.surrogate_key)
        
        return {
            "inserts": inserts,
            "updates": updates,
            "unchanged": unchanged
        }


# =============================================================================
# SQL for SCD Type 2 Merge
# =============================================================================

SCD_TYPE2_MERGE_SQL = """
-- Using MERGE for SCD Type 2 (works in most modern databases)

-- Step 1: Close changed/deleted records
UPDATE dim_customer
SET valid_to = :effective_date,
    is_current = FALSE
WHERE customer_id IN (
    SELECT c.customer_id
    FROM dim_customer c
    JOIN staging_customer s ON c.customer_id = s.customer_id
    WHERE c.is_current = TRUE
      AND c.hash_value != s.hash_value
);

-- Step 2: Insert new versions of changed records
INSERT INTO dim_customer (
    customer_id, name, segment, tier, country,
    valid_from, valid_to, is_current, hash_value
)
SELECT 
    s.customer_id, s.name, s.segment, s.tier, s.country,
    :effective_date, NULL, TRUE, s.hash_value
FROM staging_customer s
JOIN dim_customer c ON s.customer_id = c.customer_id
WHERE c.valid_to = :effective_date;  -- Just closed

-- Step 3: Insert completely new records
INSERT INTO dim_customer (
    customer_id, name, segment, tier, country,
    valid_from, valid_to, is_current, hash_value
)
SELECT 
    s.customer_id, s.name, s.segment, s.tier, s.country,
    :effective_date, NULL, TRUE, s.hash_value
FROM staging_customer s
WHERE NOT EXISTS (
    SELECT 1 FROM dim_customer c 
    WHERE c.customer_id = s.customer_id
);
"""


# =============================================================================
# Querying SCD Type 2 Dimensions
# =============================================================================

SCD_TYPE2_QUERIES = """
-- Query 1: Get current customer data
SELECT * 
FROM dim_customer 
WHERE is_current = TRUE;

-- Query 2: Get customer data as of a specific date
SELECT * 
FROM dim_customer 
WHERE valid_from <= '2024-06-15' 
  AND (valid_to IS NULL OR valid_to > '2024-06-15');

-- Query 3: Revenue by segment with point-in-time accuracy
-- Each sale joins to the customer record that was valid at sale time
SELECT 
    c.segment,
    SUM(f.net_revenue) as revenue
FROM fact_sales f
JOIN dim_customer c ON f.customer_key = c.customer_key
    -- The fact table stores the surrogate key that was valid at transaction time
GROUP BY c.segment;

-- Query 4: Customer history
SELECT 
    customer_id,
    segment,
    tier,
    valid_from,
    valid_to
FROM dim_customer
WHERE customer_id = 'cust_123'
ORDER BY valid_from;
"""

Chapter 4: Columnar Storage

4.1 Row vs Column Storage

ROW-ORIENTED STORAGE (OLTP)
───────────────────────────

How data is stored on disk:

Row 1: [id=1, name="John", country="USA", revenue=100]
Row 2: [id=2, name="Jane", country="UK",  revenue=200]
Row 3: [id=3, name="Bob",  country="USA", revenue=150]

Query: SELECT SUM(revenue) FROM customers

Process:
├── Read Row 1 (all columns) → extract revenue
├── Read Row 2 (all columns) → extract revenue  
├── Read Row 3 (all columns) → extract revenue
└── Sum: 100 + 200 + 150 = 450

Problem: We read name, country too (wasted I/O!)


COLUMN-ORIENTED STORAGE (OLAP)
──────────────────────────────

How data is stored on disk:

id column:      [1, 2, 3]
name column:    ["John", "Jane", "Bob"]
country column: ["USA", "UK", "USA"]
revenue column: [100, 200, 150]

Query: SELECT SUM(revenue) FROM customers

Process:
├── Read only revenue column: [100, 200, 150]
└── Sum: 450

Benefits:
├── 75% less I/O (read 1 column, not 4)
├── Better compression (similar values together)
├── SIMD-friendly (vectorized operations)
└── Cache-efficient (sequential access)

4.2 Columnar Formats

POPULAR COLUMNAR FORMATS

PARQUET (Apache)
────────────────
├── Row groups (default 128MB)
├── Column chunks within row groups
├── Multiple compression codecs (Snappy, GZIP, LZ4)
├── Rich type system (nested types)
├── Predicate pushdown (skip irrelevant row groups)
└── Industry standard for data lakes

ORC (Apache)
────────────
├── Stripes (default 64MB)
├── Lightweight indexes for skip
├── Better for Hive/Presto
├── Slightly better compression than Parquet
└── More mature ACID support

DELTA LAKE (Databricks)
───────────────────────
├── Parquet files + transaction log
├── ACID transactions on data lake
├── Time travel (query historical versions)
├── Schema evolution
└── Merge/Update/Delete support

ICEBERG (Apache)
────────────────
├── Open table format
├── Hidden partitioning
├── Schema evolution
├── Time travel
└── Supported by many engines (Spark, Flink, Trino)

4.3 Parquet Optimization

# modeling/parquet_optimization.py

"""
Parquet file optimization for analytics.

Key factors:
1. Row group size
2. Compression codec
3. Column ordering
4. Statistics collection
"""

from typing import List, Dict, Any
import pyarrow as pa
import pyarrow.parquet as pq


class ParquetWriter:
    """
    Optimized Parquet writer for analytics workloads.
    """
    
    def __init__(
        self,
        row_group_size: int = 128 * 1024 * 1024,  # 128MB
        compression: str = "snappy",
        enable_statistics: bool = True
    ):
        self.row_group_size = row_group_size
        self.compression = compression
        self.enable_statistics = enable_statistics
    
    def write_table(
        self,
        table: pa.Table,
        output_path: str,
        partition_cols: List[str] = None
    ):
        """
        Write table to Parquet with optimizations.
        """
        
        # Sort by partition columns for better compression
        if partition_cols:
            sort_keys = [(col, "ascending") for col in partition_cols]
            table = table.sort_by(sort_keys)
        
        pq.write_table(
            table,
            output_path,
            row_group_size=self.row_group_size,
            compression=self.compression,
            write_statistics=self.enable_statistics,
            # Use dictionary encoding for low-cardinality columns
            use_dictionary=True,
            # Write column indexes for predicate pushdown
            write_page_index=True
        )
    
    def write_partitioned(
        self,
        table: pa.Table,
        output_path: str,
        partition_cols: List[str]
    ):
        """
        Write partitioned Parquet dataset.
        
        Partitioning enables partition pruning:
        WHERE date = '2024-01-15' reads only that partition.
        """
        
        pq.write_to_dataset(
            table,
            output_path,
            partition_cols=partition_cols,
            compression=self.compression,
            row_group_size=self.row_group_size
        )


PARQUET_BEST_PRACTICES = """
PARQUET OPTIMIZATION CHECKLIST

1. ROW GROUP SIZE
   ├── Default: 128MB
   ├── Smaller (64MB): More parallelism, more overhead
   ├── Larger (256MB): Better compression, less parallelism
   └── Match to executor memory / parallelism

2. COMPRESSION
   ├── Snappy: Fast, moderate compression (default)
   ├── LZ4: Faster, less compression
   ├── GZIP/ZSTD: Slower, better compression
   └── Choose based on I/O vs CPU trade-off

3. COLUMN ORDERING
   ├── Put frequently filtered columns first
   ├── Group related columns together
   └── Low-cardinality columns benefit from dictionary encoding

4. FILE SIZE
   ├── Target: 256MB - 1GB per file
   ├── Too small: Excessive metadata overhead
   ├── Too large: Less parallelism
   └── Coalesce small files periodically

5. PARTITIONING
   ├── Partition by query filter columns (date, region)
   ├── Avoid over-partitioning (>10K partitions)
   ├── Balance partition size (aim for 256MB-1GB each)
   └── Use Z-ordering for multi-column filters
"""

Chapter 5: Partitioning and Pre-Aggregation

5.1 Partitioning Strategies

PARTITIONING FOR ANALYTICS

Goal: Let queries skip irrelevant data

PARTITION BY DATE (Most Common)
───────────────────────────────

fact_sales/
├── date=2024-01-01/
│   ├── part-00000.parquet
│   └── part-00001.parquet
├── date=2024-01-02/
│   └── part-00000.parquet
├── date=2024-01-03/
│   └── ...
└── ...

Query: WHERE date = '2024-01-15'
Effect: Reads only 1 partition, skips 364 others

MULTI-LEVEL PARTITIONING
─────────────────────────

fact_sales/
├── year=2024/
│   ├── month=01/
│   │   ├── day=01/
│   │   ├── day=02/
│   │   └── ...
│   ├── month=02/
│   └── ...
└── year=2023/
    └── ...

Query: WHERE year = 2024 AND month = 1
Effect: Reads only January 2024 (~31 partitions)

PARTITION + SORT (Z-Ordering)
─────────────────────────────

For multiple filter columns (date AND product_id):

Physical layout uses space-filling curves to cluster
related values together.

Query: WHERE date = '2024-01-15' AND product_id = 'P123'
Effect: Minimal file reads due to co-location

5.2 Pre-Aggregation (Rollups)

# modeling/pre_aggregation.py

"""
Pre-aggregation (Rollups) for fast queries.

Instead of aggregating at query time:
  SELECT date, SUM(revenue) FROM fact_sales GROUP BY date
  → Scans 1B rows

Query from pre-aggregated table:
  SELECT date, revenue FROM daily_revenue
  → Scans 365 rows
"""

from dataclasses import dataclass
from typing import List, Dict, Any
from datetime import date


@dataclass
class RollupDefinition:
    """Defines a pre-aggregation rollup."""
    name: str
    source_table: str
    target_table: str
    dimensions: List[str]  # GROUP BY columns
    measures: Dict[str, str]  # column: aggregation
    refresh_schedule: str  # Cron expression


# Define common rollups
ROLLUPS = [
    RollupDefinition(
        name="daily_revenue_by_category",
        source_table="fact_sales",
        target_table="agg_daily_category",
        dimensions=["date_key", "category"],
        measures={
            "net_revenue": "SUM",
            "quantity": "SUM",
            "order_count": "COUNT DISTINCT order_id",
            "customer_count": "COUNT DISTINCT customer_key"
        },
        refresh_schedule="0 1 * * *"  # Daily at 1 AM
    ),
    
    RollupDefinition(
        name="hourly_revenue_by_store",
        source_table="fact_sales",
        target_table="agg_hourly_store",
        dimensions=["date_key", "hour", "store_key"],
        measures={
            "net_revenue": "SUM",
            "quantity": "SUM",
            "order_count": "COUNT DISTINCT order_id"
        },
        refresh_schedule="0 * * * *"  # Hourly
    ),
    
    RollupDefinition(
        name="monthly_customer_metrics",
        source_table="fact_sales",
        target_table="agg_monthly_customer",
        dimensions=["year", "month", "customer_key", "segment"],
        measures={
            "net_revenue": "SUM",
            "order_count": "COUNT",
            "avg_order_value": "AVG(net_revenue)"
        },
        refresh_schedule="0 2 1 * *"  # Monthly on 1st at 2 AM
    )
]


ROLLUP_DDL = """
-- Pre-aggregated table for daily revenue by category
CREATE TABLE agg_daily_category (
    date_key        INTEGER NOT NULL,
    category        VARCHAR(100) NOT NULL,
    net_revenue     DECIMAL(15,2) NOT NULL,
    quantity        BIGINT NOT NULL,
    order_count     BIGINT NOT NULL,
    customer_count  BIGINT NOT NULL,
    last_updated    TIMESTAMP NOT NULL DEFAULT NOW(),
    PRIMARY KEY (date_key, category)
);

-- Refresh query
INSERT INTO agg_daily_category
SELECT 
    f.date_key,
    p.category,
    SUM(f.net_revenue) as net_revenue,
    SUM(f.quantity) as quantity,
    COUNT(DISTINCT f.order_id) as order_count,
    COUNT(DISTINCT f.customer_key) as customer_count,
    NOW() as last_updated
FROM fact_sales f
JOIN dim_product p ON f.product_key = p.product_key
WHERE f.date_key = :target_date_key
GROUP BY f.date_key, p.category
ON CONFLICT (date_key, category) 
DO UPDATE SET
    net_revenue = EXCLUDED.net_revenue,
    quantity = EXCLUDED.quantity,
    order_count = EXCLUDED.order_count,
    customer_count = EXCLUDED.customer_count,
    last_updated = NOW();


-- Query from rollup (fast!)
SELECT category, SUM(net_revenue)
FROM agg_daily_category
WHERE date_key BETWEEN 20240101 AND 20241231
GROUP BY category;
-- Scans 365 * ~100 categories = 36,500 rows
-- vs 1B rows in fact_sales
"""


ROLLUP_HIERARCHY = """
ROLLUP CUBE PATTERN

For flexible drill-down, pre-compute multiple granularities:

┌─────────────────────────────────────────────────────────────┐
│                    AGGREGATION HIERARCHY                    │
│                                                             │
│  Level 0: Total                                             │
│           │                                                 │
│  Level 1: ├── By Year                                       │
│           │   ├── By Month                                  │
│           │   │   ├── By Day                                │
│           │   │   └── By Week                               │
│           │   └── By Quarter                                │
│           │                                                 │
│  Cross:   ├── By Year × Category                            │
│           ├── By Month × Category × Region                  │
│           └── By Day × Product                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Pre-compute common combinations:
├── Time only: Year, Quarter, Month, Week, Day
├── Time × Category: For product analysis
├── Time × Region: For geographic analysis
├── Time × Customer Segment: For marketing
└── CUBE: All combinations (expensive but complete)
"""

5.3 Materialized Views

# modeling/materialized_views.py

"""
Materialized Views for automatic pre-aggregation.

Better than manual rollups because:
1. Automatic refresh
2. Query optimizer can use them transparently
3. Built-in freshness tracking
"""

MATERIALIZED_VIEW_PATTERNS = """
-- =============================================================================
-- Pattern 1: Simple aggregation view
-- =============================================================================

CREATE MATERIALIZED VIEW mv_daily_revenue AS
SELECT 
    date_key,
    SUM(net_revenue) as revenue,
    COUNT(*) as transaction_count
FROM fact_sales
GROUP BY date_key
WITH DATA;

-- Refresh options
REFRESH MATERIALIZED VIEW mv_daily_revenue;  -- Full refresh
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_daily_revenue;  -- No lock


-- =============================================================================
-- Pattern 2: Join materialized into view
-- =============================================================================

CREATE MATERIALIZED VIEW mv_product_performance AS
SELECT 
    p.category,
    p.subcategory,
    p.brand,
    d.year,
    d.month,
    SUM(f.net_revenue) as revenue,
    SUM(f.quantity) as units,
    SUM(f.profit) as profit
FROM fact_sales f
JOIN dim_product p ON f.product_key = p.product_key
JOIN dim_date d ON f.date_key = d.date_key
WHERE p.is_current = TRUE
GROUP BY p.category, p.subcategory, p.brand, d.year, d.month
WITH DATA;

-- Index for fast queries
CREATE INDEX idx_mv_product_year 
ON mv_product_performance(year, month);


-- =============================================================================
-- Pattern 3: Incremental materialized view (database-specific)
-- =============================================================================

-- In BigQuery: 
CREATE MATERIALIZED VIEW mv_hourly_revenue
OPTIONS (
    enable_refresh = true,
    refresh_interval_minutes = 60
)
AS
SELECT 
    TIMESTAMP_TRUNC(order_timestamp, HOUR) as hour,
    SUM(net_revenue) as revenue
FROM fact_sales
GROUP BY 1;

-- BigQuery automatically updates incrementally

-- In Snowflake:
CREATE MATERIALIZED VIEW mv_hourly_revenue
AS
SELECT 
    DATE_TRUNC('HOUR', order_timestamp) as hour,
    SUM(net_revenue) as revenue
FROM fact_sales
GROUP BY 1;

-- Snowflake maintains incrementally if possible
"""


class MaterializedViewManager:
    """
    Manages materialized view lifecycle.
    """
    
    def __init__(self, db_connection):
        self.db = db_connection
        self.views: Dict[str, Dict] = {}
    
    def create_view(
        self,
        name: str,
        query: str,
        refresh_schedule: str = None,
        indexes: List[str] = None
    ):
        """Create a materialized view."""
        
        sql = f"CREATE MATERIALIZED VIEW {name} AS {query} WITH DATA"
        self.db.execute(sql)
        
        if indexes:
            for idx_cols in indexes:
                idx_name = f"idx_{name}_{idx_cols.replace(', ', '_')}"
                self.db.execute(
                    f"CREATE INDEX {idx_name} ON {name}({idx_cols})"
                )
        
        self.views[name] = {
            "query": query,
            "refresh_schedule": refresh_schedule,
            "last_refresh": None
        }
    
    def refresh_view(self, name: str, concurrent: bool = True):
        """Refresh a materialized view."""
        
        if concurrent:
            sql = f"REFRESH MATERIALIZED VIEW CONCURRENTLY {name}"
        else:
            sql = f"REFRESH MATERIALIZED VIEW {name}"
        
        self.db.execute(sql)
        self.views[name]["last_refresh"] = datetime.utcnow()
    
    def get_freshness(self, name: str) -> Dict:
        """Get view freshness information."""
        
        view = self.views.get(name)
        if not view:
            return {"error": "View not found"}
        
        last_refresh = view.get("last_refresh")
        if not last_refresh:
            return {"status": "never_refreshed"}
        
        age = datetime.utcnow() - last_refresh
        
        return {
            "last_refresh": last_refresh.isoformat(),
            "age_seconds": age.total_seconds(),
            "status": "fresh" if age.total_seconds() < 3600 else "stale"
        }

Part II: Implementation

Chapter 6: Building the Data Model

6.1 ETL Pipeline for Star Schema

# modeling/etl_pipeline.py

"""
ETL pipeline to build star schema from source data.

Flow:
1. Extract from source systems
2. Transform into dimension and fact tables
3. Load into data warehouse
"""

from dataclasses import dataclass
from datetime import date, datetime
from typing import Dict, Any, List, Optional
import logging

logger = logging.getLogger(__name__)


@dataclass
class ETLConfig:
    """ETL job configuration."""
    source_db_url: str
    target_db_url: str
    batch_date: date
    full_refresh: bool = False


class DimensionLoader:
    """
    Loads dimension tables with SCD Type 2 handling.
    """
    
    def __init__(self, target_db, tracked_columns: Dict[str, List[str]]):
        self.db = target_db
        self.tracked_columns = tracked_columns
    
    def load_product_dimension(self, products_df, batch_date: date):
        """
        Load product dimension with Type 2 SCD.
        """
        
        logger.info(f"Loading product dimension for {batch_date}")
        
        # Get current records
        current = self.db.read_table("dim_product", "is_current = TRUE")
        
        # Compute changes
        scd_manager = SCDType2Manager(
            self.tracked_columns.get("product", ["name", "category", "price"])
        )
        
        changes = scd_manager.process_updates(
            current_records=current,
            incoming_records=products_df.to_records(),
            effective_date=batch_date
        )
        
        # Apply changes
        if changes["updates"]:
            self.db.update_batch("dim_product", changes["updates"])
            logger.info(f"Closed {len(changes['updates'])} changed products")
        
        if changes["inserts"]:
            self.db.insert_batch("dim_product", changes["inserts"])
            logger.info(f"Inserted {len(changes['inserts'])} product versions")
        
        return {
            "inserted": len(changes["inserts"]),
            "updated": len(changes["updates"]),
            "unchanged": len(changes["unchanged"])
        }
    
    def load_customer_dimension(self, customers_df, batch_date: date):
        """Load customer dimension."""
        # Similar pattern to product
        pass
    
    def load_date_dimension(self, start_date: date, end_date: date):
        """
        Generate and load date dimension.
        
        Usually done once, extended as needed.
        """
        
        dates = []
        current = start_date
        
        while current <= end_date:
            dates.append({
                "date_key": int(current.strftime("%Y%m%d")),
                "date": current,
                "day": current.day,
                "month": current.month,
                "month_name": current.strftime("%B"),
                "quarter": (current.month - 1) // 3 + 1,
                "year": current.year,
                "day_of_week": current.weekday(),
                "day_name": current.strftime("%A"),
                "week_of_year": current.isocalendar()[1],
                "is_weekend": current.weekday() >= 5,
                "is_holiday": False,  # Add holiday logic
                "fiscal_year": current.year,  # Adjust for fiscal calendar
                "fiscal_quarter": (current.month - 1) // 3 + 1
            })
            current = current + timedelta(days=1)
        
        self.db.upsert_batch("dim_date", dates, key_columns=["date_key"])
        
        return {"dates_loaded": len(dates)}


class FactLoader:
    """
    Loads fact tables from source transactions.
    """
    
    def __init__(self, target_db):
        self.db = target_db
    
    def load_sales_fact(
        self,
        orders_df,
        batch_date: date,
        dimension_lookups: Dict
    ):
        """
        Load sales fact table.
        
        Steps:
        1. Join with dimensions to get surrogate keys
        2. Calculate derived measures
        3. Insert into fact table
        """
        
        logger.info(f"Loading sales fact for {batch_date}")
        
        facts = []
        
        for order in orders_df.to_records():
            # Look up dimension keys
            date_key = int(order["order_date"].strftime("%Y%m%d"))
            
            product_key = dimension_lookups["product"].get(
                order["product_id"], self._get_unknown_key("product")
            )
            
            customer_key = dimension_lookups["customer"].get(
                order["customer_id"], self._get_unknown_key("customer")
            )
            
            store_key = dimension_lookups["store"].get(
                order["store_id"], self._get_unknown_key("store")
            )
            
            # Calculate measures
            gross_revenue = order["quantity"] * order["unit_price"]
            net_revenue = gross_revenue - order.get("discount", 0)
            cost = order["quantity"] * order.get("unit_cost", 0)
            profit = net_revenue - cost
            
            facts.append({
                "date_key": date_key,
                "product_key": product_key,
                "customer_key": customer_key,
                "store_key": store_key,
                "order_id": order["order_id"],
                "line_item_number": order.get("line_number", 1),
                "quantity": order["quantity"],
                "unit_price": order["unit_price"],
                "discount_amount": order.get("discount", 0),
                "tax_amount": order.get("tax", 0),
                "gross_revenue": gross_revenue,
                "net_revenue": net_revenue,
                "profit": profit,
                "order_timestamp": order["order_timestamp"]
            })
        
        # Batch insert
        self.db.insert_batch("fact_sales", facts)
        
        logger.info(f"Loaded {len(facts)} sales facts")
        
        return {"facts_loaded": len(facts)}
    
    def _get_unknown_key(self, dimension: str) -> int:
        """Get the 'unknown' dimension key for missing lookups."""
        # Convention: -1 or a specific unknown record
        return -1
    
    def build_dimension_lookups(self, batch_date: date) -> Dict:
        """
        Build lookup dictionaries for dimension foreign keys.
        
        Returns mapping from business key → surrogate key
        for each dimension, using the record valid on batch_date.
        """
        
        lookups = {}
        
        # Product lookup (current records)
        products = self.db.query("""
            SELECT product_id, product_key 
            FROM dim_product 
            WHERE is_current = TRUE
        """)
        lookups["product"] = {
            r["product_id"]: r["product_key"] for r in products
        }
        
        # Customer lookup (current records)
        customers = self.db.query("""
            SELECT customer_id, customer_key 
            FROM dim_customer 
            WHERE is_current = TRUE
        """)
        lookups["customer"] = {
            r["customer_id"]: r["customer_key"] for r in customers
        }
        
        # Store lookup
        stores = self.db.query("SELECT store_id, store_key FROM dim_store")
        lookups["store"] = {
            r["store_id"]: r["store_key"] for r in stores
        }
        
        return lookups


class ETLOrchestrator:
    """
    Orchestrates the full ETL pipeline.
    """
    
    def __init__(
        self,
        source_db,
        target_db,
        dim_loader: DimensionLoader,
        fact_loader: FactLoader
    ):
        self.source = source_db
        self.target = target_db
        self.dimensions = dim_loader
        self.facts = fact_loader
    
    def run_daily_etl(self, batch_date: date) -> Dict:
        """
        Run complete daily ETL pipeline.
        """
        
        results = {"batch_date": batch_date.isoformat()}
        
        try:
            # Step 1: Load dimensions (order matters for lookups)
            logger.info("Loading dimensions...")
            
            products = self.source.extract_products(batch_date)
            results["products"] = self.dimensions.load_product_dimension(
                products, batch_date
            )
            
            customers = self.source.extract_customers(batch_date)
            results["customers"] = self.dimensions.load_customer_dimension(
                customers, batch_date
            )
            
            # Step 2: Build lookups
            logger.info("Building dimension lookups...")
            lookups = self.facts.build_dimension_lookups(batch_date)
            
            # Step 3: Load facts
            logger.info("Loading facts...")
            orders = self.source.extract_orders(batch_date)
            results["facts"] = self.facts.load_sales_fact(
                orders, batch_date, lookups
            )
            
            # Step 4: Update aggregations
            logger.info("Updating rollups...")
            results["rollups"] = self._refresh_rollups(batch_date)
            
            results["status"] = "success"
            
        except Exception as e:
            logger.error(f"ETL failed: {e}", exc_info=True)
            results["status"] = "failed"
            results["error"] = str(e)
        
        return results
    
    def _refresh_rollups(self, batch_date: date) -> Dict:
        """Refresh materialized views / rollup tables."""
        
        rollups_refreshed = []
        
        for rollup in ROLLUPS:
            self.target.execute(
                rollup.refresh_query,
                {"date_key": int(batch_date.strftime("%Y%m%d"))}
            )
            rollups_refreshed.append(rollup.name)
        
        return {"refreshed": rollups_refreshed}

Part III: Real-World Application

Chapter 7: Case Studies

7.1 Airbnb's Data Model

AIRBNB'S DIMENSIONAL MODEL

FACT TABLES:
├── fact_bookings (grain: one row per booking)
├── fact_searches (grain: one row per search)
├── fact_reviews (grain: one row per review)
└── fact_messages (grain: one row per message)

DIMENSION TABLES:
├── dim_listing (property details, host info)
├── dim_guest (guest profile, verification)
├── dim_host (host profile, superhost status)
├── dim_location (city, neighborhood, market)
├── dim_date (calendar, holidays, events)
└── dim_time (hour, minute, day part)

KEY DESIGN DECISIONS:
1. Separate host and guest dimensions
   - Same person can be both
   - Different attributes matter

2. Location dimension with hierarchy
   - Country → State → City → Neighborhood
   - Enables drill-down in reports

3. Date + Time dimensions
   - Date for daily aggregates
   - Time for booking time analysis

4. Slowly changing host/guest dimensions
   - Track superhost status changes
   - Analyze impact of status on bookings

7.2 Uber's Metrics Layer

UBER'S ANALYTICS ARCHITECTURE

Challenge:
├── 100+ metrics (bookings, revenue, driver hours, etc.)
├── Multiple dimensions (city, product, time)
├── Consistency across dashboards and reports
├── Self-service for analysts

Solution: METRICS LAYER

┌─────────────────────────────────────────────────────────────┐
│                      METRICS LAYER                          │
│                                                             │
│  Metric Definitions:                                        │
│  ├── gross_bookings = SUM(fare_amount)                      │
│  ├── net_revenue = gross_bookings - promotions - refunds    │
│  ├── trips = COUNT(DISTINCT trip_id)                        │
│  └── driver_hours = SUM(online_hours)                       │
│                                                             │
│  Dimensions:                                                │
│  ├── time: hour, day, week, month, quarter, year            │
│  ├── geography: country, city, zone                         │
│  ├── product: UberX, UberXL, UberBlack                      │
│  └── user_type: new, returning, power_user                  │
│                                                             │
│  Pre-computed Cubes:                                        │
│  ├── hourly × city × product                                │
│  ├── daily × city × product × user_type                     │
│  └── weekly × country × product                             │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Benefits:
├── Single source of truth for each metric
├── Consistent calculations everywhere
├── Pre-computed for fast queries
└── Self-service via metrics catalog

Chapter 8: Common Mistakes

8.1 Modeling Mistakes

DATA MODELING ANTI-PATTERNS

❌ MISTAKE 1: Over-Normalization

Wrong:
  -- 5 JOINs to get product category
  SELECT category_name
  FROM products p
  JOIN product_types pt ON p.type_id = pt.id
  JOIN subcategories sc ON pt.subcategory_id = sc.id
  JOIN categories c ON sc.category_id = c.id
  JOIN category_groups cg ON c.group_id = cg.id

Problem: Slow queries, complex ETL

Right:
  -- Denormalized in dimension
  SELECT category
  FROM dim_product
  WHERE product_id = '123'


❌ MISTAKE 2: Wrong Grain

Wrong:
  -- Fact table at order level
  fact_orders: order_id, total_amount, item_count
  
  -- Can't analyze by product!

Problem: Can't drill down to line items

Right:
  -- Fact table at line item level
  fact_order_lines: order_id, line_id, product_id, quantity, amount
  
  -- Can aggregate up to orders when needed


❌ MISTAKE 3: Missing Surrogate Keys

Wrong:
  -- Using business keys directly
  fact_sales:
    customer_id = "CUST123"  -- Business key
    
  -- If customer changes, what customer_id was valid at sale time?

Problem: Can't track dimension history

Right:
  -- Using surrogate keys
  fact_sales:
    customer_key = 42  -- Links to version valid at sale time


❌ MISTAKE 4: Ignoring NULL Handling

Wrong:
  -- NULLs in dimensions
  dim_customer:
    customer_id = NULL  -- Anonymous purchases
    
Problem: JOINs may lose rows, GROUP BY issues

Right:
  -- Unknown row for NULLs
  dim_customer:
    customer_key = -1, customer_id = "UNKNOWN", name = "Unknown"
    
  -- NULL business keys map to unknown surrogate key

8.2 Performance Mistakes

PERFORMANCE ANTI-PATTERNS

❌ MISTAKE 1: No Partitioning

Wrong:
  -- 3 years of data in one table
  SELECT SUM(revenue) FROM fact_sales WHERE date = '2024-01-15'
  -- Scans 1 billion rows!

Right:
  -- Partitioned by date
  fact_sales/date=2024-01-15/
  -- Scans only that partition


❌ MISTAKE 2: Over-Partitioning

Wrong:
  -- Partition by customer_id
  fact_sales/customer_id=C1/
  fact_sales/customer_id=C2/
  -- 10 million partitions!

Problem: Metadata overhead, small files

Right:
  -- Partition by date, maybe region
  fact_sales/date=2024-01-15/region=US/
  -- Reasonable partition count


❌ MISTAKE 3: No Rollups for Common Queries

Wrong:
  -- Every dashboard query scans fact table
  SELECT date, SUM(revenue) FROM fact_sales GROUP BY date
  -- 60 seconds every time

Right:
  -- Pre-aggregated daily rollup
  SELECT date, revenue FROM agg_daily_revenue
  -- 50 milliseconds

Part IV: Interview Preparation

Chapter 9: Interview Tips

9.1 When to Discuss Data Modeling

DATA MODELING INTERVIEW TRIGGERS

"Design an analytics system"
  → Star schema for the data warehouse
  → Discuss dimensions and facts

"How would you make this dashboard fast?"
  → Partitioning and pre-aggregation
  → Materialized views

"Users report the numbers don't match"
  → SCD Type 2 for dimension history
  → Point-in-time accuracy

"Design a metrics platform"
  → Metrics layer architecture
  → Rollup strategy

9.2 Key Phrases

TALKING ABOUT DATA MODELING

"I'd use a star schema with the sales fact table at the line-item
grain. This gives us flexibility to aggregate to any level —
orders, customers, products, time periods."

"For the product dimension, I'd implement SCD Type 2 to track
price changes. When we report historical revenue, we want the
price that was actually charged, not today's price."

"To make the dashboard fast, I'd pre-aggregate at the day-category
level. That reduces a billion-row scan to thousands of rows.
We refresh this rollup hourly."

"The date dimension is denormalized with day-of-week, is-weekend,
quarter, and fiscal year. This lets us filter and group by any
time attribute without complex date math in queries."

Chapter 10: Practice Problems

Problem 1: E-commerce Data Model

Setup: Design the dimensional model for an e-commerce platform:

  • Products with categories, brands, and prices that change
  • Customers with segments that upgrade/downgrade
  • Orders with multiple items, shipping, and returns
  • Multiple sales channels (web, mobile, marketplace)

Questions:

  1. What's the grain of your fact table?
  2. How do you handle returns and refunds?
  3. How do you track product price changes historically?
  • Grain: Line item level (not order level)
  • Returns: Separate fact table or negative quantities
  • Price changes: SCD Type 2 on product dimension
  • Capture price in fact table too (what was charged)

Problem 2: SaaS Subscription Analytics

Setup: Design analytics for a B2B SaaS with:

  • Subscription plans (monthly/annual, tiers)
  • Usage metrics (API calls, storage, users)
  • Revenue recognition (MRR, ARR, expansion, churn)

Questions:

  1. What fact tables do you need?
  2. How do you calculate MRR?
  3. How do you track plan changes?
  • Fact tables: subscriptions, usage, revenue events
  • MRR: Point-in-time subscription value (monthly snapshot)
  • Plan changes: SCD Type 2 on subscription dimension
  • Consider snapshot fact table for MRR

Chapter 11: Sample Interview Dialogue

Interviewer: "We have slow dashboard queries. How would you speed them up?"

You: "Let me understand the current setup first. What's the data model, and what kinds of queries are slow?"

Interviewer: "We have a normalized operational database. Dashboards join 5-6 tables and aggregate millions of rows. Some queries take 30 seconds."

You: "That's a classic problem. Normalized models are great for OLTP but painful for analytics. I'd recommend a dimensional model — specifically a star schema.

First, I'd identify the key fact table. For an e-commerce dashboard, that's probably sales at the line-item grain. It would have foreign keys to dimensions and numeric measures like quantity and revenue.

Then dimension tables for product, customer, time, and location. These are denormalized — all product attributes in one table, no further joins needed.

This alone eliminates most joins. But for a dashboard hitting millions of rows, I'd add pre-aggregation. Common queries like 'revenue by category by month' can read from a rollup table with 365 × 100 rows instead of 100 million.

Finally, partitioning the fact table by date means queries for specific time ranges only scan relevant data. 'Last 7 days' reads 7 partitions, not the whole table."

Interviewer: "What about customers who change segments over time?"

You: "Great question. That's a slowly changing dimension. I'd use SCD Type 2 — when a customer's segment changes, we close the old record and create a new one with different valid dates.

The fact table stores the surrogate key that was valid at transaction time. So when we query 'Q1 revenue by segment', each sale is attributed to the segment the customer was in when they bought, not their current segment.

This is crucial for accurate historical analysis. Without it, if a customer upgrades to Premium today, all their historical purchases would suddenly count as Premium — which would be misleading."


Summary

DAY 3 SUMMARY: DATA MODELING FOR ANALYTICS

STAR SCHEMA
├── Fact tables: Measures (revenue, quantity)
├── Dimension tables: Attributes (product, customer, date)
├── Denormalized for fast queries
├── Surrogate keys for history tracking
└── Grain defines the level of detail

SLOWLY CHANGING DIMENSIONS
├── Type 0: Retain original
├── Type 1: Overwrite (no history)
├── Type 2: Add row (full history) ← Most common
├── Type 3: Add column (limited history)
└── Hash-based change detection

COLUMNAR STORAGE
├── Read only needed columns
├── Better compression
├── Parquet, ORC, Delta, Iceberg
└── Predicate pushdown for filtering

PARTITIONING
├── Partition by query filter columns
├── Date partitioning most common
├── Avoid over-partitioning
└── Enables partition pruning

PRE-AGGREGATION
├── Rollup tables for common queries
├── Materialized views for automation
├── Trade storage for speed
└── Refresh strategy matters

Key Takeaways

DATA MODELING KEY TAKEAWAYS

1. OLTP ≠ OLAP
   Different access patterns need different models

2. DENORMALIZE FOR READS
   Star schema trades storage for query speed

3. GRAIN IS FOUNDATIONAL
   Define the lowest level of detail first

4. TRACK DIMENSION HISTORY
   SCD Type 2 enables accurate historical analysis

5. PRE-COMPUTE COMMON QUERIES
   Rollups make dashboards fast

GOLDEN RULES:
├── One fact table per business process
├── Surrogate keys for all dimensions
├── Denormalize dimensions (no snowflake)
├── Partition fact tables by date
└── Pre-aggregate for dashboard queries

What's Next

Tomorrow, we'll tackle Late-Arriving Data and Correctness — handling events that arrive after their window closed, and maintaining accurate analytics despite late data.

TOMORROW'S PREVIEW: LATE DATA

Questions we'll answer:
├── What happens when events arrive late?
├── How do watermarks work in practice?
├── When should we recompute vs patch?
├── How do we version our analytics?
└── What's the "correction" pattern?

Late data is the nightmare of real-time analytics.
Tomorrow we learn to wake up from it.

End of Week 8, Day 3

Next: Day 4 — Late-Arriving Data and Correctness