Week 8 Capstone: Real-Time Analytics for Video Streaming Platform
π― A Complete System Design Interview Integrating All Week 8 Concepts
The Interview Begins
You walk into the interview room at a major streaming company. The interviewer, a Principal Engineer, greets you and gestures to the whiteboard.
Interviewer: "Thanks for coming in. Today we're going to design the analytics platform for our video streaming service. We have 150 million subscribers watching content across mobile, web, smart TVs, and gaming consoles. We need to track everything β what people watch, when they pause, what they search for β and turn that into actionable insights."
They write on the whiteboard:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β Design: Real-Time Analytics for Video Streaming β
β β
β Context: β
β - 150M subscribers globally β
β - 10M peak concurrent viewers β
β - Content library: 50K titles (movies, series, documentaries) β
β - Devices: Mobile (40%), Smart TV (35%), Web (15%), Gaming (10%) β
β β
β Requirements: β
β 1. Track all viewing events (play, pause, seek, complete) β
β 2. Real-time "Trending Now" dashboard (updates every minute) β
β 3. Content performance analytics for creators β
β 4. A/B test analysis for product experiments β
β 5. Historical viewing patterns for recommendations β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Interviewer: "Take a few minutes to think about this, ask clarifying questions, and then walk me through your design. We have about 45 minutes."
Phase 1: Requirements Clarification (5 minutes)
Before diving in, you take a breath and start asking questions.
Your Questions
You: "This is a great problem. Before I start designing, I'd like to clarify a few requirements. First, what's the expected event volume? How many events per second should we plan for?"
Interviewer: "At peak, we see about 10 million concurrent viewers. Each viewer generates roughly 1 event every 10 seconds on average β heartbeats, seeks, quality changes. So peak is around 1 million events per second."
You: "That's significant scale. For the 'Trending Now' dashboard, what's the freshness requirement? You mentioned updates every minute β is that the business requirement or current state?"
Interviewer: "Business wants it under 1 minute. Ideally 30 seconds or less. We compete with social media for attention, so trending content needs to surface fast."
You: "Understood. For historical analytics and recommendations, what's the retention requirement? How far back do we need to query?"
Interviewer: "We need at least 2 years of viewing history for recommendations. For operational analytics, 90 days of detailed data is sufficient. Aggregated data we keep indefinitely."
You: "For the device mix you mentioned β mobile is 40%. Mobile apps often have connectivity issues. What's an acceptable lateness for events from mobile devices?"
Interviewer: "Good catch. We see mobile events arriving anywhere from real-time to several hours late when users were on a plane or subway. We need to handle that gracefully."
You: "One more question β for A/B testing, how many concurrent experiments do you typically run, and what's the analysis latency requirement?"
Interviewer: "We run 50-100 experiments at any time. For analysis, daily aggregates are fine β we don't need real-time experiment results."
You: "Perfect. Let me summarize the requirements as I understand them."
Functional Requirements
1. EVENT TRACKING
- Play, pause, resume, seek, stop, complete events
- Quality change events (buffering, resolution switches)
- Search and browse events
- Heartbeat events (every 10 seconds during playback)
2. REAL-TIME DASHBOARDS
- "Trending Now" β updated every 30-60 seconds
- Live viewership counts per title
- Geographic distribution of viewers
- Quality metrics (buffering rate, errors)
3. CONTENT ANALYTICS (for creators/studios)
- Total views, unique viewers, completion rate
- Audience demographics and geography
- Comparison to similar content
- Performance over time (daily/weekly/monthly)
4. A/B TEST ANALYSIS
- Track experiment assignments
- Calculate metrics per variant
- Statistical significance computation
- Daily refresh acceptable
5. HISTORICAL ANALYTICS
- 2 years of viewing data for recommendations
- 90 days detailed, older aggregated
- Ad-hoc queries by analysts
Non-Functional Requirements
1. SCALE
- Peak: 1M events/second (10M concurrent viewers)
- Daily: 50B events
- Storage: ~5TB/day raw, ~500GB/day aggregated
2. LATENCY
- Event ingestion: < 1 second acknowledgment
- Trending dashboard: < 60 second freshness
- Historical queries: < 30 seconds for 90-day range
3. RELIABILITY
- No event loss (at-least-once delivery)
- Late data handling for mobile (up to 24 hours)
- 99.9% availability for dashboards
4. DATA QUALITY
- Deduplication of replayed events
- Handle out-of-order events correctly
- Clear distinction between real-time and final numbers
Phase 2: Back of the Envelope Estimation (5 minutes)
You: "Let me work through the numbers to validate scale assumptions."
Traffic Estimation
EVENT VOLUME
Peak concurrent viewers: 10,000,000 viewers
Events per viewer: 1 event / 10 seconds
Peak event rate: 1,000,000 events/second
Daily pattern:
βββ Peak hours (4 hours): 1M events/sec Γ 4 hours = 14.4B events
βββ High hours (8 hours): 500K events/sec Γ 8 hours = 14.4B events
βββ Normal hours (12 hours): 200K events/sec Γ 12 hours = 8.6B events
βββ Total daily: ~40-50B events
Event breakdown:
βββ Heartbeats (70%): 35B/day (high volume, low value)
βββ Playback events (20%): 10B/day (play, pause, seek, complete)
βββ Browse/Search (8%): 4B/day
βββ Quality events (2%): 1B/day
Storage Estimation
RAW EVENT STORAGE
Event size (average):
βββ Event metadata: 100 bytes (event_id, type, timestamp)
βββ User context: 50 bytes (user_id, device_id, session_id)
βββ Content context: 50 bytes (content_id, episode_id)
βββ Playback state: 50 bytes (position, duration, quality)
βββ Device info: 50 bytes (device_type, os, app_version)
βββ Total per event: ~300 bytes
Daily raw storage:
βββ 50B events Γ 300 bytes = 15TB uncompressed
βββ With compression (5x): 3TB/day
βββ With replication (3x): 9TB/day effective
βββ 90-day retention: 810TB
Aggregated storage:
βββ Hourly aggregates: ~10GB/day
βββ Daily aggregates: ~1GB/day
βββ 2-year retention: ~750GB total aggregates
Key Metrics Summary
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ESTIMATION SUMMARY β
β β
β TRAFFIC β
β βββ Peak events: 1,000,000 /second β
β βββ Daily events: 50 billion β
β βββ Monthly events: 1.5 trillion β
β β
β STORAGE β
β βββ Daily raw (compressed): 3 TB β
β βββ 90-day raw: 270 TB β
β βββ Daily aggregates: 10 GB β
β βββ Total system: ~1 PB β
β β
β INFRASTRUCTURE (rough) β
β βββ Kafka brokers: 50+ (for 1M events/sec) β
β βββ Flink workers: 100+ (for streaming) β
β βββ ClickHouse nodes: 20+ (for real-time OLAP) β
β βββ Spark cluster: 500+ cores (for batch) β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Phase 3: High-Level Design (10 minutes)
You: "Now let me sketch out the high-level architecture."
System Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β HIGH-LEVEL ARCHITECTURE β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CLIENT DEVICES β β
β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ ββββββββββ β β
β β β Mobile β βSmart TVβ β Web β β Gaming β β Other β β β
β β β 40% β β 35% β β 15% β β 10% β β β β β
β β βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ β β
β ββββββββββΌββββββββββββΌββββββββββββΌββββββββββββΌββββββββββββΌββββββββββββ β
β βββββββββββββ΄ββββββ¬ββββββ΄ββββββββββββ΄ββββββββββββ β
β β β
β βΌ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β INGESTION LAYER β β
β β βββββββββββββββ ββββββββββββββββ βββββββββββββββ β β
β β β CDN/Edge βββββΆβ API Gateway βββββΆβ Collectors β β β
β β β (Regional) β β (Regional) β β (Regional) β β β
β β βββββββββββββββ ββββββββββββββββ ββββββββ¬βββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββΌββββββββββββββββββββ β
β β β
β βΌ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MESSAGE QUEUE β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Kafka Cluster β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β events. β β events. β β events. β β events. β β β β
β β β β playback β β browse β β quality β β dead_ β β β β
β β β β β β β β β β letter β β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β ββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββ β
β β β
β ββββββββββββββββ΄βββββββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββ β
β β STREAMING LAYER β β BATCH LAYER β β
β β ββββββββββββββββββββββ β β ββββββββββββββββββββββ β β
β β β Flink Cluster β β β β Spark Cluster β β β
β β β βββ Aggregations β β β β βββ Daily ETL β β β
β β β βββ Trending calc β β β β βββ Late data β β β
β β β βββ Sessionizationβ β β β βββ A/B analysis β β β
β β β βββ Late handling β β β β βββ ML features β β β
β β βββββββββββ¬βββββββββββ β β βββββββββββ¬βββββββββββ β β
β ββββββββββββββΌββββββββββββββ ββββββββββββββΌββββββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SERVING LAYER β β
β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββ β β
β β β ClickHouse β β BigQuery β β Redis β β S3 β β β
β β β (Real-time) β β (Historical)β β (Cache) β β(Data Lake)β β β
β β β β β β β β β β β β
β β β Trending β β Ad-hoc β β Dashboard β β Raw eventsβ β β
β β β Live counts β β Analytics β β Results β β Parquet β β β
β β β Quality β β A/B tests β β Hot data β β Archives β β β
β β ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ βββββββββββββ β β
β βββββββββββΌβββββββββββββββββΌβββββββββββββββββΌβββββββββββββββββββββββββ β
β ββββββββββββββββββΌβββββββββββββββββ β
β β β
β βΌ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β QUERY LAYER β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Analytics API β β β
β β β βββ /trending (ClickHouse, 30s cache) β β β
β β β βββ /content/{id} (BigQuery, 5min cache) β β β
β β β βββ /experiments (BigQuery, 1hr cache) β β β
β β β βββ /explore (BigQuery, no cache) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Component Breakdown
You: "Let me walk through each component..."
1. Ingestion Layer
Purpose: Receive events from all client devices reliably at scale.
Key responsibilities:
- Accept events from 10M+ concurrent clients
- Validate event schema and reject malformed events
- Buffer events for reliability
- Route to appropriate Kafka topics
Technology choice: Regional API Gateways + Event Collectors because we need low-latency ingestion globally and can validate/enrich events close to users.
2. Message Queue (Kafka)
Purpose: Durable, ordered event storage and distribution.
Key responsibilities:
- Buffer 1M+ events/second
- Retain events for replay (7 days)
- Distribute to streaming and batch consumers
- Handle backpressure gracefully
Topic design:
events.playbackβ Play, pause, seek, complete (partitioned by user_id)events.browseβ Search, navigation eventsevents.qualityβ Buffering, errors, quality changesevents.dead_letterβ Failed validations for inspection
3. Streaming Layer (Flink)
Purpose: Real-time processing for dashboards and alerting.
Key responsibilities:
- Calculate trending content (1-minute windows)
- Maintain live viewer counts per title
- Detect quality issues in real-time
- Handle late data with watermarks
4. Batch Layer (Spark)
Purpose: Accurate historical analytics and heavy computations.
Key responsibilities:
- Daily aggregation jobs (finalized numbers)
- Late data reprocessing
- A/B test statistical analysis
- ML feature generation
5. Serving Layer
Purpose: Store processed data for different query patterns.
| Store | Use Case | Query Pattern |
|---|---|---|
| ClickHouse | Real-time dashboards | Sub-second, high concurrency |
| BigQuery | Historical analytics | Complex queries, ad-hoc |
| Redis | Dashboard caching | Millisecond lookups |
| S3 | Raw data lake | Batch processing, ML |
Phase 4: Deep Dives (20 minutes)
Interviewer: "Great high-level design. Let's dive deeper into a few areas. Start with the event schema and ingestion β how would you design the events?"
Deep Dive 1: Event Schema Design (Day 1 Concept)
You: "Event schema design is critical. Bad schemas create debt that compounds over time. Let me show you what I'd propose."
The Problem
SCHEMA DESIGN CHALLENGES
Without careful design:
βββ Different teams send different formats
βββ Schema changes break downstream consumers
βββ Missing fields cause analytics gaps
βββ No way to evolve schema safely
We need:
βββ Consistent structure across all event types
βββ Forward and backward compatibility
βββ Rich enough for analytics, lean enough for volume
βββ Clear versioning for evolution
The Solution
You: "I'd use an envelope pattern with a common header and type-specific payload."
EVENT SCHEMA DESIGN
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EVENT ENVELOPE β
β β
β { β
β // HEADER (common to all events) β
β "event_id": "evt_abc123", // UUID, idempotency key β
β "event_type": "playback.started", // domain.entity.action β
β "event_version": "2.1", // schema version β
β "event_time": "2024-01-15T14:30:00.123Z", // when it happened β
β "received_time": "2024-01-15T14:30:00.456Z", // when server got it β
β β
β // CONTEXT (who/where) β
β "user_id": "usr_12345", β
β "device_id": "dev_67890", β
β "session_id": "sess_abcdef", β
β "device_type": "mobile", // mobile, tv, web, gaming β
β "app_version": "5.2.1", β
β "geo": { β
β "country": "US", β
β "region": "CA" β
β }, β
β β
β // PAYLOAD (event-specific) β
β "payload": { β
β "content_id": "mov_789", β
β "content_type": "movie", β
β "position_seconds": 0, β
β "duration_seconds": 7200, β
β "quality": "1080p", β
β "audio_language": "en", β
β "subtitle_language": "es" β
β }, β
β β
β // EXPERIMENTS (A/B test context) β
β "experiments": { β
β "new_player_ui": "variant_b", β
β "recommendation_algo": "control" β
β } β
β } β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Event Types
# Event type hierarchy
EVENT_TYPES = {
# Playback events (highest volume)
"playback.started": "User started playing content",
"playback.paused": "User paused playback",
"playback.resumed": "User resumed from pause",
"playback.seeked": "User jumped to different position",
"playback.completed": "User finished content (>90% watched)",
"playback.abandoned": "User stopped before completion",
"playback.heartbeat": "Periodic signal during playback",
# Quality events
"quality.buffering_started": "Playback buffering began",
"quality.buffering_ended": "Playback resumed after buffer",
"quality.resolution_changed": "Video quality changed",
"quality.error": "Playback error occurred",
# Browse events
"browse.search": "User performed search",
"browse.page_viewed": "User viewed a page/row",
"browse.content_clicked": "User clicked on content tile",
"browse.content_added_list": "User added to My List",
# Session events
"session.started": "App opened / session began",
"session.ended": "App closed / session ended"
}
Schema Evolution
# Schema registry for safe evolution
class ViewingEventSchema:
"""
Schema evolution rules:
- SAFE: Add optional fields
- UNSAFE: Remove fields, change types, rename
"""
VERSION = "2.1"
# v1.0: Original schema
# v2.0: Added experiments field
# v2.1: Added subtitle_language to payload
REQUIRED_FIELDS = [
"event_id", "event_type", "event_time",
"user_id", "device_id", "payload"
]
OPTIONAL_FIELDS = [
"session_id", "geo", "experiments",
"received_time", "app_version"
]
Interviewer: "How do you handle the 1 million events per second throughput?"
Deep Dive 2: High-Throughput Ingestion (Day 1 Concept)
You: "At 1M events/second, we need a carefully designed ingestion pipeline."
Ingestion Architecture
HIGH-THROUGHPUT INGESTION
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β REGIONAL INGESTION (5 regions) β
β β
β Each region handles ~200K events/sec β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β US-EAST β β
β β β β
β β CDN (CloudFront) β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β API Gateway (ALB + Lambda or ECS) β β β
β β β - TLS termination β β β
β β β - Rate limiting per device β β β
β β β - Request validation β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Event Collectors (20 instances) β β β
β β β - Schema validation β β β
β β β - Enrichment (geo, device info) β β β
β β β - Batching (100 events or 100ms) β β β
β β β - Async produce to Kafka β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Regional Kafka Cluster β β β
β β β - 50 partitions per topic β β β
β β β - Replication factor 3 β β β
β β β - 7-day retention β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Cross-region replication to central data lake β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Implementation
# Ingestion service optimized for throughput
from dataclasses import dataclass
from typing import List, Dict
import asyncio
from datetime import datetime
@dataclass
class IngestionConfig:
batch_size: int = 100
batch_timeout_ms: int = 100
max_concurrent_batches: int = 50
kafka_linger_ms: int = 5
kafka_batch_size: int = 16384
class HighThroughputCollector:
"""
Event collector optimized for 200K+ events/second per instance.
Key optimizations:
1. Async everywhere - no blocking
2. Batch events before Kafka produce
3. Schema validation with compiled validators
4. Connection pooling for Kafka
"""
def __init__(self, config: IngestionConfig, kafka_producer, validator):
self.config = config
self.producer = kafka_producer
self.validator = validator
# Event batching
self.batch: List[Dict] = []
self.batch_lock = asyncio.Lock()
self.last_flush = datetime.utcnow()
# Metrics
self.events_received = 0
self.events_validated = 0
self.events_produced = 0
self.validation_errors = 0
async def ingest(self, events: List[Dict]) -> Dict:
"""
Ingest a batch of events from client.
Returns acknowledgment immediately after validation.
Kafka produce happens asynchronously.
"""
results = {"accepted": 0, "rejected": 0, "errors": []}
for event in events:
self.events_received += 1
# Fast validation (compiled schema)
validation_result = self.validator.validate(event)
if not validation_result.is_valid:
results["rejected"] += 1
results["errors"].append({
"event_id": event.get("event_id"),
"error": validation_result.error
})
self.validation_errors += 1
continue
# Enrich event
enriched = self._enrich(event)
# Add to batch (non-blocking)
await self._add_to_batch(enriched)
results["accepted"] += 1
self.events_validated += 1
return results
def _enrich(self, event: Dict) -> Dict:
"""Add server-side enrichments."""
event["received_time"] = datetime.utcnow().isoformat() + "Z"
event["collector_id"] = self.collector_id
event["processing_region"] = self.region
# Add derived fields
if "geo" in event and "country" in event["geo"]:
event["geo"]["continent"] = self._country_to_continent(
event["geo"]["country"]
)
return event
async def _add_to_batch(self, event: Dict):
"""Add event to batch, flush if needed."""
async with self.batch_lock:
self.batch.append(event)
should_flush = (
len(self.batch) >= self.config.batch_size or
self._batch_age_ms() >= self.config.batch_timeout_ms
)
if should_flush:
batch_to_send = self.batch
self.batch = []
self.last_flush = datetime.utcnow()
# Fire and forget - don't await
asyncio.create_task(self._produce_batch(batch_to_send))
async def _produce_batch(self, batch: List[Dict]):
"""Produce batch to Kafka."""
# Group by topic
by_topic = {}
for event in batch:
topic = self._get_topic(event["event_type"])
if topic not in by_topic:
by_topic[topic] = []
by_topic[topic].append(event)
# Produce to each topic
for topic, events in by_topic.items():
for event in events:
# Partition by user_id for ordering
partition_key = event.get("user_id", event["event_id"])
await self.producer.send(
topic=topic,
key=partition_key,
value=event
)
self.events_produced += 1
def _get_topic(self, event_type: str) -> str:
"""Route event to appropriate Kafka topic."""
if event_type.startswith("playback."):
return "events.playback"
elif event_type.startswith("quality."):
return "events.quality"
elif event_type.startswith("browse."):
return "events.browse"
else:
return "events.other"
def _batch_age_ms(self) -> int:
"""Get age of current batch in milliseconds."""
return int((datetime.utcnow() - self.last_flush).total_seconds() * 1000)
def _country_to_continent(self, country: str) -> str:
"""Map country to continent."""
# Simplified mapping
mapping = {
"US": "NA", "CA": "NA", "MX": "NA",
"GB": "EU", "DE": "EU", "FR": "EU",
"JP": "AS", "KR": "AS", "IN": "AS",
# ... more mappings
}
return mapping.get(country, "OTHER")
Interviewer: "Good. Now tell me about the streaming layer β how do you calculate trending content in real-time?"
Deep Dive 3: Real-Time Trending Calculation (Day 2 Concept)
You: "The trending calculation is a classic streaming problem. We need to balance freshness with accuracy."
The Problem
TRENDING CALCULATION CHALLENGES
Naive approach (count plays in last hour):
βββ Favors long content (more heartbeats)
βββ Doesn't account for library size
βββ New releases always dominate
βββ Spikes from bots/fraud not detected
Better approach:
βββ Weight by unique viewers, not events
βββ Normalize by content age and genre
βββ Compare to baseline (is this unusually popular?)
βββ Decay older activity (recent matters more)
Trending Algorithm
# Streaming trending calculation with Flink
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Optional
import math
@dataclass
class TrendingScore:
"""Trending score for a piece of content."""
content_id: str
score: float
unique_viewers_1h: int
unique_viewers_24h: int
velocity: float # Rate of change
percentile_rank: float
updated_at: datetime
class TrendingCalculator:
"""
Calculates trending scores using streaming data.
Algorithm:
1. Count unique viewers in sliding windows
2. Calculate velocity (change in viewership)
3. Normalize by content category baseline
4. Apply time decay for recency
5. Combine into final score
"""
def __init__(self):
# Viewer counts per content per window
self.viewers_1h: Dict[str, set] = {} # Last 1 hour
self.viewers_24h: Dict[str, set] = {} # Last 24 hours
# Historical baselines by category
self.category_baselines: Dict[str, float] = {}
# Previous scores for velocity calculation
self.previous_scores: Dict[str, float] = {}
def process_play_event(self, event: Dict):
"""
Process a playback event and update trending state.
"""
content_id = event["payload"]["content_id"]
user_id = event["user_id"]
# Add to viewer sets (HyperLogLog in production)
if content_id not in self.viewers_1h:
self.viewers_1h[content_id] = set()
self.viewers_24h[content_id] = set()
self.viewers_1h[content_id].add(user_id)
self.viewers_24h[content_id].add(user_id)
def calculate_trending_scores(
self,
content_metadata: Dict[str, Dict]
) -> list[TrendingScore]:
"""
Calculate trending scores for all content.
Called every 30-60 seconds.
"""
scores = []
for content_id, viewers_1h in self.viewers_1h.items():
if len(viewers_1h) < 10: # Minimum threshold
continue
metadata = content_metadata.get(content_id, {})
category = metadata.get("category", "unknown")
# Base score: unique viewers in last hour
base_score = len(viewers_1h)
# Velocity: how fast is viewership growing?
viewers_24h = len(self.viewers_24h.get(content_id, set()))
velocity = self._calculate_velocity(
content_id, base_score, viewers_24h
)
# Normalize by category baseline
baseline = self.category_baselines.get(category, 1000)
normalized_score = base_score / baseline
# Apply velocity boost (trending UP gets bonus)
velocity_multiplier = 1 + (velocity * 0.5) if velocity > 0 else 1
# Final score
final_score = normalized_score * velocity_multiplier
# Store for next velocity calculation
self.previous_scores[content_id] = base_score
scores.append(TrendingScore(
content_id=content_id,
score=final_score,
unique_viewers_1h=base_score,
unique_viewers_24h=viewers_24h,
velocity=velocity,
percentile_rank=0, # Calculated after sorting
updated_at=datetime.utcnow()
))
# Sort and assign percentile ranks
scores.sort(key=lambda x: x.score, reverse=True)
for i, score in enumerate(scores):
score.percentile_rank = 1 - (i / len(scores))
return scores
def _calculate_velocity(
self,
content_id: str,
current_1h: int,
current_24h: int
) -> float:
"""
Calculate velocity (rate of change) of viewership.
Positive = trending up, Negative = trending down
"""
previous = self.previous_scores.get(content_id, current_1h)
if previous == 0:
return 0
# Percentage change from previous calculation
change = (current_1h - previous) / previous
# Also factor in 1h vs 24h ratio
# If 1h viewers >> 24h/24 average, content is spiking
hourly_avg_24h = current_24h / 24
spike_factor = current_1h / hourly_avg_24h if hourly_avg_24h > 0 else 1
return change * spike_factor
def expire_old_data(self, window_cutoff: datetime):
"""
Remove old viewer data from windows.
In production, use Flink's windowing with watermarks.
"""
# Implementation would use event time windows
pass
# Flink job definition (pseudo-code)
FLINK_TRENDING_JOB = """
-- Flink SQL for trending calculation
-- Step 1: Deduplicate events by user per content per window
CREATE VIEW unique_viewers AS
SELECT
content_id,
COUNT(DISTINCT user_id) as viewer_count,
TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end
FROM playback_events
WHERE event_type = 'playback.started'
GROUP BY
content_id,
TUMBLE(event_time, INTERVAL '1' HOUR);
-- Step 2: Calculate velocity using LAG
CREATE VIEW trending_with_velocity AS
SELECT
content_id,
viewer_count,
window_end,
viewer_count - LAG(viewer_count) OVER (
PARTITION BY content_id
ORDER BY window_end
) as velocity
FROM unique_viewers;
-- Step 3: Join with metadata and calculate final score
CREATE VIEW trending_scores AS
SELECT
t.content_id,
t.viewer_count,
t.velocity,
t.viewer_count / m.category_baseline *
(1 + GREATEST(t.velocity, 0) * 0.5) as trending_score,
t.window_end
FROM trending_with_velocity t
JOIN content_metadata m ON t.content_id = m.content_id;
"""
Output to ClickHouse
# Sink trending scores to ClickHouse for dashboards
class TrendingClickHouseSink:
"""
Writes trending scores to ClickHouse for dashboard queries.
"""
async def write_scores(self, scores: list[TrendingScore]):
"""
Write batch of trending scores.
Uses ReplacingMergeTree to handle updates.
"""
values = [
(
s.content_id,
s.score,
s.unique_viewers_1h,
s.unique_viewers_24h,
s.velocity,
s.percentile_rank,
s.updated_at
)
for s in scores
]
await self.client.execute(
"""
INSERT INTO trending_scores
(content_id, score, viewers_1h, viewers_24h,
velocity, percentile_rank, updated_at)
VALUES
""",
values
)
CLICKHOUSE_SCHEMA = """
CREATE TABLE trending_scores (
content_id String,
score Float64,
viewers_1h UInt32,
viewers_24h UInt32,
velocity Float64,
percentile_rank Float64,
updated_at DateTime
)
ENGINE = ReplacingMergeTree(updated_at)
ORDER BY content_id;
-- Materialized view for top 100 trending
CREATE MATERIALIZED VIEW top_trending
ENGINE = Memory
AS
SELECT *
FROM trending_scores
ORDER BY score DESC
LIMIT 100;
"""
Interviewer: "How do you handle late data from mobile devices that were offline?"
Deep Dive 4: Late Data Handling (Day 4 Concept)
You: "Late data is particularly challenging for a streaming service because mobile users often watch offline on planes or subways."
The Problem
LATE DATA SCENARIOS
Scenario 1: Airplane mode viewing
βββ User downloads movie at airport (10:00 AM)
βββ Watches on 3-hour flight
βββ Events buffered on device
βββ Lands and syncs at 1:00 PM
βββ Events arrive 3 hours late
Scenario 2: Subway commute
βββ User starts episode at 8:00 AM
βββ Goes underground, loses connectivity
βββ Watches 30 minutes, events buffer
βββ Emerges at 8:45 AM, events sync
βββ Events arrive 45 minutes late
Scenario 3: App in background
βββ User pauses and switches apps
βββ App is killed by OS
βββ Events in memory lost?
βββ Need client-side persistence
Impact without handling:
βββ Trending misses popular content
βββ Completion rates underreported
βββ Geographic data skewed to connected areas
βββ A/B test results biased toward always-connected users
Solution Architecture
LATE DATA HANDLING ARCHITECTURE
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β CLIENT-SIDE β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Event Buffer (SQLite on device) β β
β β βββ Persist events immediately on generation β β
β β βββ Retry sending on connectivity β β
β β βββ Include original event_time (not send time) β β
β β βββ Delete only after server acknowledgment β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β SERVER-SIDE β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Ingestion Layer β β
β β βββ Accept events regardless of age β β
β β βββ Tag with received_time for lateness tracking β β
β β βββ Route old events (>1hr) to late_events topic β β
β β βββ Route recent events to normal topics β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β ββββββββββββββββ΄βββββββββββββββ β
β βΌ βΌ β
β ββββββββββββββββββββββββ ββββββββββββββββββββββββ β
β β Streaming (Flink) β β Late Events Queue β β
β β βββ 5-min watermark β β βββ Batch process β β
β β βββ On-time results β β βββ Daily catchup β β
β β βββ "Provisional" β β βββ "Final" numbers β β
β ββββββββββββ¬ββββββββββββ ββββββββββββ¬ββββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββββββ ββββββββββββββββββββββββ β
β β ClickHouse β β BigQuery β β
β β (Real-time view) β β (Final view) β β
β ββββββββββββββββββββββββ ββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Implementation
# Late data handling with versioned results
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from enum import Enum
class DataFreshness(Enum):
REALTIME = "realtime" # Streaming result, may change
PRELIMINARY = "preliminary" # T+1, most late data included
FINAL = "final" # T+7, official number
@dataclass
class ViewingMetric:
"""A metric with versioning for late data."""
content_id: str
date: str
metric_name: str
value: float
freshness: DataFreshness
computed_at: datetime
event_count: int
late_event_count: int
class LateDataAwareAggregator:
"""
Aggregates viewing metrics with late data handling.
Maintains multiple versions:
- Real-time: Updated every minute, provisional
- Preliminary: Updated T+1, includes most late data
- Final: Updated T+7, official numbers
"""
def __init__(self, metrics_store):
self.store = metrics_store
# Late data tracking
self.lateness_histogram: Dict[str, int] = {}
async def process_event(self, event: Dict):
"""
Process event and update appropriate metrics version.
"""
event_time = datetime.fromisoformat(event["event_time"])
received_time = datetime.fromisoformat(event["received_time"])
lateness = received_time - event_time
lateness_bucket = self._get_lateness_bucket(lateness)
self.lateness_histogram[lateness_bucket] = \
self.lateness_histogram.get(lateness_bucket, 0) + 1
event_date = event_time.date().isoformat()
# Determine which version to update
if lateness < timedelta(minutes=5):
# On-time: update real-time
await self._update_realtime(event, event_date)
elif lateness < timedelta(hours=24):
# Moderately late: queue for preliminary
await self._queue_for_preliminary(event, event_date)
else:
# Very late: queue for final processing
await self._queue_for_final(event, event_date)
async def _update_realtime(self, event: Dict, event_date: str):
"""Update real-time metrics (streaming)."""
content_id = event["payload"]["content_id"]
# Increment view count
await self.store.increment(
content_id=content_id,
date=event_date,
metric="views",
freshness=DataFreshness.REALTIME
)
# Update completion if applicable
if event["event_type"] == "playback.completed":
await self.store.increment(
content_id=content_id,
date=event_date,
metric="completions",
freshness=DataFreshness.REALTIME
)
async def _queue_for_preliminary(self, event: Dict, event_date: str):
"""Queue late event for preliminary processing."""
await self.store.queue_late_event(
event=event,
target_freshness=DataFreshness.PRELIMINARY
)
async def _queue_for_final(self, event: Dict, event_date: str):
"""Queue very late event for final processing."""
await self.store.queue_late_event(
event=event,
target_freshness=DataFreshness.FINAL
)
async def run_preliminary_job(self, target_date: str):
"""
Run T+1 job to produce preliminary numbers.
Scheduled to run at 6 AM for previous day.
"""
# Read all events for target_date from data lake
events = await self.store.read_events_for_date(target_date)
# Also read late events that arrived after streaming cutoff
late_events = await self.store.read_late_events(
target_date=target_date,
max_lateness=timedelta(hours=30)
)
all_events = events + late_events
# Compute aggregates
metrics = self._compute_daily_metrics(all_events, target_date)
# Store as preliminary
for metric in metrics:
metric.freshness = DataFreshness.PRELIMINARY
await self.store.save_metric(metric)
async def run_final_job(self, target_date: str):
"""
Run T+7 job to produce final numbers.
Scheduled to run 7 days after target_date.
"""
# Read ALL events including very late ones
events = await self.store.read_events_for_date(
target_date,
include_very_late=True
)
# Compute final aggregates
metrics = self._compute_daily_metrics(events, target_date)
# Store as final (immutable after this)
for metric in metrics:
metric.freshness = DataFreshness.FINAL
await self.store.save_metric(metric)
def _compute_daily_metrics(
self,
events: List[Dict],
date: str
) -> List[ViewingMetric]:
"""Compute all metrics for a day."""
by_content: Dict[str, Dict] = {}
for event in events:
content_id = event["payload"]["content_id"]
if content_id not in by_content:
by_content[content_id] = {
"views": 0,
"unique_viewers": set(),
"completions": 0,
"total_watch_time": 0,
"event_count": 0,
"late_event_count": 0
}
stats = by_content[content_id]
stats["event_count"] += 1
# Check if late
event_time = datetime.fromisoformat(event["event_time"])
received_time = datetime.fromisoformat(event["received_time"])
if (received_time - event_time) > timedelta(minutes=5):
stats["late_event_count"] += 1
if event["event_type"] == "playback.started":
stats["views"] += 1
stats["unique_viewers"].add(event["user_id"])
if event["event_type"] == "playback.completed":
stats["completions"] += 1
if event["event_type"] == "playback.heartbeat":
stats["total_watch_time"] += 10 # 10 second heartbeats
# Convert to metrics
metrics = []
for content_id, stats in by_content.items():
metrics.extend([
ViewingMetric(
content_id=content_id,
date=date,
metric_name="views",
value=stats["views"],
freshness=DataFreshness.PRELIMINARY,
computed_at=datetime.utcnow(),
event_count=stats["event_count"],
late_event_count=stats["late_event_count"]
),
ViewingMetric(
content_id=content_id,
date=date,
metric_name="unique_viewers",
value=len(stats["unique_viewers"]),
freshness=DataFreshness.PRELIMINARY,
computed_at=datetime.utcnow(),
event_count=stats["event_count"],
late_event_count=stats["late_event_count"]
),
ViewingMetric(
content_id=content_id,
date=date,
metric_name="completion_rate",
value=stats["completions"] / stats["views"] if stats["views"] > 0 else 0,
freshness=DataFreshness.PRELIMINARY,
computed_at=datetime.utcnow(),
event_count=stats["event_count"],
late_event_count=stats["late_event_count"]
)
])
return metrics
def _get_lateness_bucket(self, lateness: timedelta) -> str:
"""Bucket lateness for histogram."""
seconds = lateness.total_seconds()
if seconds < 60:
return "<1min"
elif seconds < 300:
return "1-5min"
elif seconds < 3600:
return "5-60min"
elif seconds < 86400:
return "1-24hr"
else:
return ">24hr"
Interviewer: "How would you design the data model for the content analytics that creators see?"
Deep Dive 5: Star Schema for Content Analytics (Day 3 Concept)
You: "For content analytics, I'd use a classic star schema optimized for the queries creators care about."
Data Model
CONTENT ANALYTICS STAR SCHEMA
βββββββββββββββββββββββ
β dim_content β
β βββββββββββββββββββ β
β content_key (PK) β
β content_id β
β title β
β type (movie/series) β
β genre β
β rating β
β release_date β
β duration_minutes β
β studio β
β creator_id β
βββββββββββ¬ββββββββββββ
β
βββββββββββββββββββ β βββββββββββββββββββ
β dim_date β β β dim_viewer β
β βββββββββββββββ β β β βββββββββββββββ β
β date_key (PK) β β β viewer_key (PK) β
β date β β β age_group β
β day_of_week β ββββββββ΄βββββββ β gender β
β month β β β β country β
β quarter βββββββ€ fact_views ββββββΊβ subscription β
β year β β βββββββββββ β β tenure_months β
β is_weekend β β view_key β β device_pref β
β is_holiday β β date_key β βββββββββββββββββββ
βββββββββββββββββββ β content_key β
β viewer_key β βββββββββββββββββββ
β device_key β β dim_device β
β geo_key β β βββββββββββββββ β
β β β device_key (PK) β
β views β β device_type β
β unique_vwrs βββββββ os β
β watch_mins β β app_version β
β completions β β screen_size β
β avg_pct β βββββββββββββββββββ
ββββββββ¬βββββββ
β
β βββββββββββββββββββ
β β dim_geo β
β β βββββββββββββββ β
βββββββββββββΊβ geo_key (PK) β
β country β
β region β
β city β
β timezone β
βββββββββββββββββββ
Schema Implementation
# Content analytics data model
BIGQUERY_SCHEMA = """
-- =============================================================================
-- Dimension Tables
-- =============================================================================
-- Content dimension (SCD Type 2 for metadata changes)
CREATE TABLE dim_content (
content_key INT64 NOT NULL,
content_id STRING NOT NULL,
title STRING NOT NULL,
content_type STRING NOT NULL, -- 'movie', 'series', 'documentary'
genre STRING,
maturity_rating STRING,
release_date DATE,
duration_minutes INT64,
studio STRING,
creator_id STRING,
-- SCD Type 2 fields
valid_from DATE NOT NULL,
valid_to DATE,
is_current BOOL NOT NULL,
-- Metadata
created_at TIMESTAMP,
updated_at TIMESTAMP
);
-- Date dimension (generated)
CREATE TABLE dim_date (
date_key INT64 NOT NULL, -- YYYYMMDD
date DATE NOT NULL,
day_of_week INT64,
day_name STRING,
month INT64,
month_name STRING,
quarter INT64,
year INT64,
is_weekend BOOL,
is_holiday BOOL
);
-- Viewer dimension (anonymized/bucketed for privacy)
CREATE TABLE dim_viewer (
viewer_key INT64 NOT NULL,
age_group STRING, -- '18-24', '25-34', etc.
gender STRING,
country STRING,
subscription_type STRING, -- 'basic', 'standard', 'premium'
tenure_months INT64,
preferred_device STRING
);
-- Device dimension
CREATE TABLE dim_device (
device_key INT64 NOT NULL,
device_type STRING, -- 'mobile', 'tv', 'web', 'gaming'
operating_system STRING,
app_version_major INT64,
screen_category STRING -- 'small', 'medium', 'large'
);
-- Geography dimension
CREATE TABLE dim_geo (
geo_key INT64 NOT NULL,
country STRING,
region STRING,
timezone STRING
);
-- =============================================================================
-- Fact Tables
-- =============================================================================
-- Daily viewing facts (grain: content Γ date Γ viewer_segment Γ device Γ geo)
CREATE TABLE fact_daily_views (
date_key INT64 NOT NULL,
content_key INT64 NOT NULL,
viewer_key INT64 NOT NULL,
device_key INT64 NOT NULL,
geo_key INT64 NOT NULL,
-- Measures
view_count INT64,
unique_viewers INT64, -- Approximate, using HLL
watch_minutes INT64,
completion_count INT64,
-- Derived measures
avg_completion_pct FLOAT64,
avg_watch_minutes FLOAT64,
-- Data quality
freshness STRING, -- 'realtime', 'preliminary', 'final'
late_event_pct FLOAT64,
-- Partitioning and clustering
_PARTITIONDATE DATE
)
PARTITION BY _PARTITIONDATE
CLUSTER BY content_key, geo_key;
-- =============================================================================
-- Pre-Aggregated Rollup Tables
-- =============================================================================
-- Daily content summary (most common query)
CREATE TABLE agg_daily_content (
date DATE NOT NULL,
content_id STRING NOT NULL,
total_views INT64,
unique_viewers INT64,
watch_hours FLOAT64,
completions INT64,
completion_rate FLOAT64,
-- Breakdown by device
mobile_views INT64,
tv_views INT64,
web_views INT64,
-- Breakdown by top geos
us_views INT64,
uk_views INT64,
other_views INT64,
freshness STRING,
updated_at TIMESTAMP
)
PARTITION BY date
CLUSTER BY content_id;
-- Weekly content summary
CREATE TABLE agg_weekly_content (
week_start DATE NOT NULL,
content_id STRING NOT NULL,
total_views INT64,
unique_viewers INT64,
watch_hours FLOAT64,
avg_daily_views FLOAT64,
peak_day_views INT64,
freshness STRING,
updated_at TIMESTAMP
);
"""
# Query examples for creator dashboard
CREATOR_DASHBOARD_QUERIES = """
-- Query 1: Content performance summary (last 30 days)
-- Uses agg_daily_content rollup for speed
SELECT
c.title,
c.content_type,
SUM(a.total_views) as total_views,
SUM(a.unique_viewers) as unique_viewers,
ROUND(SUM(a.watch_hours), 1) as watch_hours,
ROUND(AVG(a.completion_rate) * 100, 1) as avg_completion_pct,
SUM(a.mobile_views) / SUM(a.total_views) * 100 as mobile_pct
FROM agg_daily_content a
JOIN dim_content c ON a.content_id = c.content_id
WHERE a.date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
AND c.creator_id = @creator_id
AND c.is_current = TRUE
GROUP BY c.title, c.content_type
ORDER BY total_views DESC;
-- Query 2: Daily trend for specific content
SELECT
a.date,
a.total_views,
a.unique_viewers,
a.completion_rate,
a.freshness
FROM agg_daily_content a
WHERE a.content_id = @content_id
AND a.date >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY)
ORDER BY a.date;
-- Query 3: Audience breakdown (uses fact table for detail)
SELECT
v.age_group,
v.gender,
g.country,
SUM(f.unique_viewers) as viewers,
AVG(f.avg_completion_pct) as avg_completion
FROM fact_daily_views f
JOIN dim_viewer v ON f.viewer_key = v.viewer_key
JOIN dim_geo g ON f.geo_key = g.geo_key
JOIN dim_content c ON f.content_key = c.content_key
WHERE c.content_id = @content_id
AND f.date_key >= @start_date_key
AND f.date_key <= @end_date_key
GROUP BY v.age_group, v.gender, g.country
ORDER BY viewers DESC;
-- Query 4: Comparison to similar content
WITH my_content AS (
SELECT
SUM(total_views) as views,
AVG(completion_rate) as completion_rate
FROM agg_daily_content
WHERE content_id = @content_id
AND date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
),
similar_content AS (
SELECT
AVG(a.total_views) as avg_views,
AVG(a.completion_rate) as avg_completion_rate
FROM agg_daily_content a
JOIN dim_content c ON a.content_id = c.content_id
WHERE c.genre = @genre
AND c.content_type = @content_type
AND a.date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
)
SELECT
m.views,
m.completion_rate,
s.avg_views as category_avg_views,
s.avg_completion_rate as category_avg_completion,
m.views / s.avg_views as views_vs_category,
m.completion_rate / s.avg_completion_rate as completion_vs_category
FROM my_content m, similar_content s;
"""
Interviewer: "Good. How do you keep the queries fast for creators when there's so much data?"
Deep Dive 6: Query Optimization (Day 5 Concept)
You: "Query optimization for creator analytics requires a multi-layered approach."
Optimization Strategy
QUERY OPTIMIZATION LAYERS
Layer 1: PRE-AGGREGATION
βββββββββββββββββββββββββ
βββ agg_daily_content: Most common queries hit this
βββ agg_weekly_content: Week-over-week comparisons
βββ Refreshed: Daily at 6 AM (after preliminary job)
βββ Result: 90% of queries use rollups, not fact tables
Layer 2: CACHING
ββββββββββββββββ
βββ Redis: Dashboard results (5-minute TTL)
βββ CDN: Static dashboard assets
βββ Query cache: Repeated queries (content_id specific)
βββ Result: 80% cache hit rate on dashboards
Layer 3: PARTITIONING
βββββββββββββββββββββ
βββ Partition by date (most queries filter by date range)
βββ Cluster by content_id (most queries filter by content)
βββ Result: Queries scan <1% of data
Layer 4: MATERIALIZED VIEWS
βββββββββββββββββββββββββββ
βββ top_100_by_genre: Pre-computed rankings
βββ creator_summary: Aggregated metrics per creator
βββ Refreshed: Hourly
βββ Result: Instant response for common questions
Caching Implementation
# Multi-layer caching for creator dashboard
from datetime import timedelta
class CreatorDashboardCache:
"""
Caching strategy for creator dashboard queries.
"""
def __init__(self, redis_client, bigquery_client):
self.redis = redis_client
self.bq = bigquery_client
# TTL configuration by query type
self.ttl_config = {
"summary_30d": timedelta(minutes=5), # Updates frequently
"daily_trend": timedelta(minutes=15), # Less frequent
"audience_breakdown": timedelta(hours=1), # Slow-changing
"comparison": timedelta(hours=1), # Slow-changing
}
async def get_content_summary(
self,
creator_id: str,
days: int = 30
) -> dict:
"""
Get content performance summary with caching.
"""
cache_key = f"creator:{creator_id}:summary:{days}d"
# Check cache
cached = await self.redis.get(cache_key)
if cached:
return {
"data": cached,
"cache_hit": True,
"freshness": "cached"
}
# Query BigQuery
result = await self._query_content_summary(creator_id, days)
# Cache result
ttl = self.ttl_config["summary_30d"]
await self.redis.setex(cache_key, ttl.total_seconds(), result)
return {
"data": result,
"cache_hit": False,
"freshness": result.get("freshness", "unknown")
}
async def get_daily_trend(
self,
content_id: str,
days: int = 90
) -> dict:
"""
Get daily viewing trend for content.
"""
cache_key = f"content:{content_id}:trend:{days}d"
cached = await self.redis.get(cache_key)
if cached:
return {"data": cached, "cache_hit": True}
result = await self._query_daily_trend(content_id, days)
ttl = self.ttl_config["daily_trend"]
await self.redis.setex(cache_key, ttl.total_seconds(), result)
return {"data": result, "cache_hit": False}
async def invalidate_for_content(self, content_id: str):
"""
Invalidate all caches for a content item.
Called when new data arrives.
"""
pattern = f"content:{content_id}:*"
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
async def warm_cache_for_creator(self, creator_id: str):
"""
Pre-warm cache for a creator's common queries.
Called when creator logs into dashboard.
"""
# Fire off all common queries in parallel
await asyncio.gather(
self.get_content_summary(creator_id, 30),
self.get_content_summary(creator_id, 7),
# Don't pre-warm content-specific queries
# (too many possible content_ids)
)
Phase 5: Scaling and Edge Cases (5 minutes)
Interviewer: "How would this system scale to 10x the current load?"
Scaling Strategy
You: "Let me walk through scaling each layer."
SCALING TO 10X (10M events/second, 100M concurrent viewers)
INGESTION LAYER
βββββββββββββββ
Current: 20 collectors per region
10x: 200 collectors per region
OR use Kinesis/Kafka as first hop (auto-scaling)
Strategy:
βββ Horizontal scaling (stateless collectors)
βββ Regional Kafka clusters (reduce cross-region traffic)
βββ Client-side sampling for heartbeats (10% sample at 10x)
βββ Dedicated clusters for high-volume event types
KAFKA
βββββ
Current: 50 partitions per topic
10x: 500 partitions per topic
Strategy:
βββ More partitions = more parallelism
βββ Tiered storage (hot in Kafka, warm in S3)
βββ Consider separating heartbeats to dedicated cluster
βββ MirrorMaker 2 for cross-region replication
STREAMING (FLINK)
βββββββββββββββββ
Current: 100 workers
10x: 1000 workers
Strategy:
βββ Scale horizontally (Flink scales well)
βββ Increase parallelism per operator
βββ State backend: RocksDB with incremental checkpoints
βββ Consider sampling for trending (don't need every event)
βββ Key-based scaling (partition by user_id or content_id)
SERVING (CLICKHOUSE)
ββββββββββββββββββββ
Current: 20 nodes
10x: 100+ nodes OR switch to managed service
Strategy:
βββ Add more replicas for read scaling
βββ Sharding by content_id for write scaling
βββ More aggressive pre-aggregation
βββ Consider Druid for highest concurrency
COST CONSIDERATION
ββββββββββββββββββ
At 10x scale:
βββ Sampling heartbeats: 90% cost reduction on that event type
βββ Tiered storage: 50% storage cost reduction
βββ Pre-aggregation: 80% query cost reduction
βββ Regional processing: 30% egress cost reduction
βββ Estimated 10x traffic = 3-4x cost (with optimizations)
Edge Cases
Interviewer: "What edge cases should we handle?"
You: "Several important ones..."
EDGE CASES
1. VIRAL CONTENT
Problem: Single title gets 10x normal traffic
Solution:
βββ Dedicated Kafka partition for hot content
βββ Pre-computed trending (don't recalculate on every query)
βββ Aggressive caching (1-second TTL for hot content)
βββ Circuit breaker on per-content queries
2. NEW RELEASE SPIKE
Problem: Major release causes traffic spike at specific time
Solution:
βββ Pre-scale infrastructure before release
βββ Warm caches before release (predicted queries)
βββ Feature flags to disable non-critical analytics
βββ Graceful degradation (show cached, mark as stale)
3. DEVICE CLOCK SKEW
Problem: Device sends events with wrong timestamps
Solution:
βββ Server-side received_time is source of truth
βββ Reject events with future timestamps > 5 minutes
βββ Accept late events up to 7 days old
βββ Flag suspicious patterns for investigation
4. BOT/FRAUD TRAFFIC
Problem: Fake views to inflate metrics
Solution:
βββ Device fingerprinting
βββ Behavioral analysis (impossible view patterns)
βββ Rate limiting per device/account
βββ Separate raw and validated metrics
5. GDPR DELETION
Problem: User requests data deletion
Solution:
βββ Aggregated data is anonymized (OK to keep)
βββ Raw events must be deleted
βββ Daily deletion job processes requests
βββ 30-day SLA for completion
Phase 6: Monitoring and Operations
Interviewer: "How would you monitor this system in production?"
Key Metrics
You: "I'd monitor at multiple levels..."
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ANALYTICS PLATFORM MONITORING β
β β
β INGESTION HEALTH β
β βββ Events/second (by region): Target: 200K/region β
β βββ Validation error rate: Alert if > 1% β
β βββ P99 ingestion latency: Alert if > 500ms β
β βββ Dead letter queue depth: Alert if > 10K β
β β
β KAFKA HEALTH β
β βββ Consumer lag (by topic): Alert if > 1M events β
β βββ Under-replicated partitions: Alert if > 0 β
β βββ Produce latency P99: Alert if > 100ms β
β βββ Disk usage: Alert if > 80% β
β β
β STREAMING HEALTH β
β βββ Flink checkpoint duration: Alert if > 60s β
β βββ Watermark lag: Alert if > 5 minutes β
β βββ Backpressure ratio: Alert if > 0.5 β
β βββ State size: Track growth rate β
β β
β QUERY HEALTH β
β βββ Dashboard P95 latency: Target < 1s β
β βββ Cache hit rate: Target > 80% β
β βββ Query error rate: Alert if > 0.1% β
β βββ Bytes scanned/query: Track for cost β
β β
β DATA QUALITY β
β βββ Late data percentage: Track by device type β
β βββ Missing events (gap detection): Alert on gaps β
β βββ Schema validation failures: Alert if > 0.1% β
β βββ Duplicate event rate: Track for dedup health β
β β
β BUSINESS METRICS β
β βββ Trending freshness: Target < 60s β
β βββ Real-time vs Final delta: Track accuracy β
β βββ Creator dashboard usage: Engagement tracking β
β βββ A/B test data completeness: Compare to expected β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Alerting Strategy
ALERTING TIERS
CRITICAL (PagerDuty - Wake someone up):
βββ Event ingestion completely down
βββ Kafka cluster unhealthy
βββ Flink job failed and not recovering
βββ Dashboard error rate > 5%
βββ Data loss detected
HIGH (PagerDuty - Business hours):
βββ Consumer lag > 10 minutes
βββ Late data rate > 20%
βββ Query latency P99 > 10s
βββ Trending not updating > 5 minutes
MEDIUM (Slack alert):
βββ Validation error rate elevated
βββ Cache hit rate dropped
βββ Single region ingestion issues
βββ Schema registry alerts
LOW (Dashboard only):
βββ Cost anomalies
βββ Traffic pattern changes
βββ Performance trends
Interview Conclusion
Interviewer: "Excellent work. You've covered event ingestion, streaming processing, data modeling, late data handling, and query optimization. You clearly understand the trade-offs involved. Any questions for me?"
You: "Thank you! I'd love to hear about the biggest operational challenges you've faced with the current analytics platform, and whether there are any aspects of my design that you think wouldn't work in your environment."
Summary: Week 8 Concepts Applied
Day 1: Event Ingestion at Scale
| Concept | Application |
|---|---|
| Event schema design | Envelope pattern with versioned payloads |
| Schema evolution | Backward-compatible changes with version field |
| Validation pipeline | Fast validation before Kafka produce |
| Throughput optimization | Batching, async produces, regional collectors |
| Dead letter queues | Route failed events for analysis |
Day 2: Streaming vs Batch Processing
| Concept | Application |
|---|---|
| Lambda architecture | Streaming (Flink) for real-time, Batch (Spark) for accuracy |
| Watermarks | 5-minute allowed lateness for streaming |
| Window aggregations | 1-hour tumbling windows for trending |
| Stateful processing | HyperLogLog for unique viewer counts |
| Kappa consideration | Could simplify if reprocessing time acceptable |
Day 3: Data Modeling for Analytics
| Concept | Application |
|---|---|
| Star schema | fact_daily_views with 5 dimension tables |
| SCD Type 2 | Content dimension for metadata changes |
| Pre-aggregation | agg_daily_content, agg_weekly_content rollups |
| Partitioning | By date with clustering on content_id |
| Grain definition | Daily Γ content Γ viewer_segment Γ device Γ geo |
Day 4: Late-Arriving Data
| Concept | Application |
|---|---|
| Event time vs processing time | Client event_time for analytics, server received_time for lateness |
| Watermark strategies | 5-minute streaming, 24-hour preliminary, 7-day final |
| Versioned results | Real-time, Preliminary (T+1), Final (T+7) |
| Reprocessing jobs | Daily preliminary job, weekly final job |
| Client-side buffering | SQLite persistence on mobile devices |
Day 5: Query Layer and Optimization
| Concept | Application |
|---|---|
| OLAP database selection | ClickHouse for real-time, BigQuery for historical |
| Multi-layer caching | Redis for dashboard results, CDN for assets |
| Pre-aggregation strategy | Daily/weekly rollups for common queries |
| Multi-tenancy | Creator-specific caches, query isolation |
| Cost optimization | Sampling heartbeats, tiered storage, rollups |
Code Patterns Demonstrated
1. HIGH-THROUGHPUT INGESTION
βββ Async batched produces
βββ Schema validation with compiled validators
βββ Fire-and-forget with acknowledgment
2. STREAMING AGGREGATION
βββ Windowed unique counts (HyperLogLog)
βββ Velocity calculation for trending
βββ State management with cleanup
3. LATE DATA VERSIONING
βββ Multiple freshness levels
βββ Scheduled reprocessing jobs
βββ Lateness histogram tracking
4. STAR SCHEMA MODELING
βββ Fact and dimension tables
βββ Pre-aggregated rollups
βββ Efficient query patterns
5. QUERY CACHING
βββ TTL based on query type
βββ Cache warming on login
βββ Invalidation on data update
Self-Assessment Checklist
After studying this capstone, you should be able to:
- Design event schemas that evolve safely at billion-event scale
- Build high-throughput ingestion pipelines with validation and routing
- Implement real-time aggregations with windowing and watermarks
- Calculate trending scores with velocity and normalization
- Handle late-arriving data from offline mobile devices
- Implement versioned results (real-time, preliminary, final)
- Design star schemas for analytics workloads
- Create pre-aggregation strategies for common query patterns
- Implement multi-layer caching for dashboards
- Scale analytics systems 10x with cost efficiency
- Monitor data pipelines with appropriate alerting
- Discuss trade-offs between freshness, accuracy, and cost
This capstone integrates all concepts from Week 8: Analytics Pipeline of the System Design Mastery Series.
Total Week 8 Content: Preview + 5 Daily Documents + Capstone = ~14,000 lines