Week 8 — Day 2: Streaming vs Batch Processing
System Design Mastery Series — Analytics Pipeline Week
Preface
Yesterday, we built an event ingestion pipeline that reliably captures 100M+ events per day into Kafka. Now we have a firehose of data.
THE PROCESSING DILEMMA
Events are flowing into Kafka at 1,000/second:
├── Product views
├── Searches
├── Cart updates
├── Orders
├── Payments
Business asks three questions:
Question 1: "What's our revenue RIGHT NOW?"
└── Need: Real-time, updates every second
└── Tolerance: Can be 1-2% off
Question 2: "What was our revenue YESTERDAY?"
└── Need: Exact number for finance
└── Tolerance: Must be 100% accurate
Question 3: "What's trending THIS HOUR?"
└── Need: Near real-time, updates every minute
└── Tolerance: Some lag is okay
One architecture cannot optimally serve all three.
This is the streaming vs batch dilemma.
Today, we'll learn when to stream, when to batch, and how to do both.
Part I: Foundations
Chapter 1: The Two Worlds of Data Processing
1.1 Batch Processing: The Original Approach
BATCH PROCESSING
Process data in large, discrete chunks:
┌─────────────────────────────────────────────────────────────────────────┐
│ BATCH PROCESSING TIMELINE │
│ │
│ Events: ──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●── │
│ │ │ │
│ └──────────── Batch 1 ─────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Process │ │
│ │ (Spark) │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Output │ │
│ │ (Table) │ │
│ └─────────────┘ │
│ │
│ Latency: Hours (wait for batch to complete) │
│ Accuracy: High (all data processed together) │
│ Complexity: Lower (simpler programming model) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
WHEN TO USE BATCH:
├── Historical analysis ("last year's trends")
├── Machine learning training
├── End-of-day financial reconciliation
├── Data warehouse loading
└── Complex aggregations across large datasets
1.2 Stream Processing: The Real-Time Approach
STREAM PROCESSING
Process data continuously as it arrives:
┌────────────────────────────────────────────────────────────────────────┐
│ STREAM PROCESSING TIMELINE │
│ │
│ Events: ──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●── │
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Stream Processor (Flink) │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │State│ │State│ │State│ │State│ (per-key state) │ │
│ │ └─────┘ └─────┘ └─────┘ └─────┘ │ │
│ └───────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ (continuous output) │
│ ┌─────────────┐ │
│ │ Real-time │ │
│ │ Views │ │
│ └─────────────┘ │
│ │
│ Latency: Milliseconds to seconds │
│ Accuracy: Approximate (may miss late data) │
│ Complexity: Higher (state management, exactly-once) │
│ │
└────────────────────────────────────────────────────────────────────────┘
WHEN TO USE STREAMING:
├── Real-time dashboards
├── Fraud detection (immediate action needed)
├── Live recommendations
├── Alerting and monitoring
└── Session analytics
1.3 The Trade-offs
STREAMING VS BATCH: THE FUNDAMENTAL TRADE-OFF
STREAMING BATCH
───────── ─────
Latency Seconds Hours
Throughput Lower Higher
Accuracy Approximate* Exact
Late data Complex Natural
State In-memory Disk-based
Cost Higher (always-on) Lower (periodic)
Complexity Higher Lower
Recovery Checkpoints Re-run job
* Approximate because late-arriving events may be missed
THE TRUTH:
Most systems need BOTH streaming AND batch.
The question is: how do you combine them?
Chapter 2: Lambda Architecture
2.1 The Original Hybrid Approach
LAMBDA ARCHITECTURE
Nathan Marz's original design (2011):
"Have both batch and streaming, merge results at query time"
┌────────────────────────────────────────────────────────────────────────┐
│ LAMBDA ARCHITECTURE │
│ │
│ ┌─────────────┐ │
│ │ Events │ │
│ │ (Kafka) │ │
│ └──────┬──────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ │ │ │ │
│ ▼ │ ▼ │
│ ┌─────────────┐ │ ┌─────────────┐ │
│ │ BATCH │ │ │ SPEED │ │
│ │ LAYER │ │ │ LAYER │ │
│ │ │ │ │ │ │
│ │ ┌───────┐ │ │ │ ┌───────┐ │ │
│ │ │ Spark │ │ │ │ │ Flink │ │ │
│ │ │ Job │ │ │ │ │Stream │ │ │
│ │ └───┬───┘ │ │ │ └───┬───┘ │ │
│ │ │ │ │ │ │ │ │
│ │ ▼ │ │ │ ▼ │ │
│ │ ┌───────┐ │ │ │ ┌───────┐ │ │
│ │ │ Batch │ │ │ │ │ Real- │ │ │
│ │ │ Views │ │ │ │ │ time │ │ │
│ │ │(daily)│ │ │ │ │ Views │ │ │
│ │ └───┬───┘ │ │ │ └───┬───┘ │ │
│ └──────┼──────┘ │ └──────┼──────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────┐ │
│ │ SERVING LAYER │ │
│ │ (Merge batch + real) │ │
│ └───────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────┘
HOW IT WORKS:
1. BATCH LAYER: Processes all historical data, produces accurate views
- Runs periodically (hourly/daily)
- Overwrites previous batch views
- Source of truth for accuracy
2. SPEED LAYER: Processes only recent data in real-time
- Fills the gap since last batch
- Approximate but current
- Discarded when batch catches up
3. SERVING LAYER: Merges both at query time
- Batch view (yesterday's totals) + Speed view (today so far)
- Query: batch_result + realtime_result
2.2 Lambda Implementation
# architectures/lambda_architecture.py
"""
Lambda Architecture implementation.
Combines batch accuracy with streaming speed.
"""
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from abc import ABC, abstractmethod
@dataclass
class MetricResult:
"""Result from either batch or speed layer."""
value: float
timestamp: datetime
is_batch: bool
coverage_start: datetime
coverage_end: datetime
class BatchLayer:
"""
Batch processing layer.
Processes all historical data with Spark.
Runs periodically (e.g., hourly).
"""
def __init__(self, spark_session, data_lake_path: str):
self.spark = spark_session
self.data_path = data_lake_path
self.last_batch_time: Optional[datetime] = None
def run_batch_job(self, end_time: datetime) -> Dict[str, MetricResult]:
"""
Run batch aggregation up to end_time.
This processes ALL data from the beginning,
producing the authoritative "batch views".
"""
# Read all raw events from data lake
events_df = self.spark.read.parquet(
f"{self.data_path}/events/"
).filter(
f"event_time < '{end_time.isoformat()}'"
)
# Compute aggregations
metrics = {}
# Daily revenue
daily_revenue = events_df.filter(
"event_type = 'order.placed'"
).groupBy(
"date(event_time) as date"
).agg(
"sum(payload.total_amount) as revenue",
"count(*) as order_count"
).collect()
for row in daily_revenue:
metrics[f"revenue_{row.date}"] = MetricResult(
value=row.revenue,
timestamp=end_time,
is_batch=True,
coverage_start=datetime.combine(row.date, datetime.min.time()),
coverage_end=datetime.combine(row.date, datetime.max.time())
)
# DAU (Daily Active Users)
dau = events_df.groupBy(
"date(event_time) as date"
).agg(
"count(distinct user_id) as dau"
).collect()
for row in dau:
metrics[f"dau_{row.date}"] = MetricResult(
value=row.dau,
timestamp=end_time,
is_batch=True,
coverage_start=datetime.combine(row.date, datetime.min.time()),
coverage_end=datetime.combine(row.date, datetime.max.time())
)
self.last_batch_time = end_time
return metrics
def get_batch_view(self, metric_name: str) -> Optional[MetricResult]:
"""Get the latest batch view for a metric."""
# In practice, read from batch view storage (e.g., Cassandra, Redis)
pass
class SpeedLayer:
"""
Speed/Streaming layer.
Processes only events since last batch.
Provides real-time updates.
"""
def __init__(self, flink_env, kafka_source: str):
self.env = flink_env
self.kafka_source = kafka_source
self.realtime_state: Dict[str, MetricResult] = {}
def start_streaming(self, batch_watermark: datetime):
"""
Start streaming from batch watermark.
Only processes events AFTER the last batch completed.
"""
# In Flink, this would be a DataStream job:
#
# stream = env.add_source(KafkaSource(...))
# .filter(lambda e: e.event_time > batch_watermark)
# .key_by(lambda e: e.event_type)
# .window(TumblingEventTimeWindows.of(Time.minutes(1)))
# .aggregate(MetricAggregator())
# .add_sink(RealtimeViewSink())
pass
def get_realtime_view(
self,
metric_name: str,
since: datetime
) -> Optional[MetricResult]:
"""
Get real-time aggregation since a timestamp.
This fills the gap between last batch and now.
"""
# In practice, query from real-time store (e.g., Druid, Redis)
return self.realtime_state.get(metric_name)
class ServingLayer:
"""
Serving layer that merges batch and speed results.
"""
def __init__(self, batch_layer: BatchLayer, speed_layer: SpeedLayer):
self.batch = batch_layer
self.speed = speed_layer
def query_metric(self, metric_name: str, date: datetime.date) -> float:
"""
Query a metric by merging batch and real-time.
Logic:
1. Get batch view (covers up to last batch time)
2. Get speed view (covers batch time to now)
3. Combine appropriately
"""
today = datetime.utcnow().date()
if date < today:
# Historical date: batch only (complete)
batch_result = self.batch.get_batch_view(f"{metric_name}_{date}")
return batch_result.value if batch_result else 0
elif date == today:
# Today: might need batch + realtime
batch_result = self.batch.get_batch_view(f"{metric_name}_{date}")
if batch_result:
# Batch has partial day, add realtime since batch
realtime_result = self.speed.get_realtime_view(
f"{metric_name}_{date}",
since=batch_result.coverage_end
)
batch_value = batch_result.value
realtime_value = realtime_result.value if realtime_result else 0
return batch_value + realtime_value
else:
# No batch yet today, realtime only
realtime_result = self.speed.get_realtime_view(
f"{metric_name}_{date}",
since=datetime.combine(date, datetime.min.time())
)
return realtime_result.value if realtime_result else 0
else:
# Future date: no data
return 0
# =============================================================================
# Example Queries
# =============================================================================
LAMBDA_QUERY_EXAMPLES = """
LAMBDA ARCHITECTURE QUERIES
Query: "What's today's revenue?"
┌────────────────────────────────────────────────────────────────────────┐
│ │
│ Timeline: │
│ ──────────────────────────────────────────────────────────────────── │
│ 00:00 06:00 12:00 Now (14:30) │
│ │ │ │ │ │
│ └────────────┴────────────┘ │ │
│ Batch View │ │
│ (Ran at 12:00) │ │
│ Revenue: $50,000 │ │
│ │ │ │
│ └──────────────┘ │
│ Speed View │
│ (12:00 to Now) │
│ Revenue: $12,000 │
│ │
│ Total Revenue = $50,000 + $12,000 = $62,000 │
│ │
└────────────────────────────────────────────────────────────────────────┘
Query: "What was yesterday's revenue?"
┌────────────────────────────────────────────────────────────────────────┐
│ │
│ Batch View for Yesterday: $150,000 (complete, authoritative) │
│ │
│ Speed View: Not needed (batch has full day) │
│ │
│ Total Revenue = $150,000 │
│ │
└────────────────────────────────────────────────────────────────────────┘
"""
2.3 Lambda Pros and Cons
LAMBDA ARCHITECTURE: TRADE-OFFS
PROS:
├── Accuracy: Batch layer is source of truth
├── Latency: Speed layer provides real-time
├── Fault tolerance: Batch can recompute everything
├── Late data: Handled in next batch run
└── Mature: Well-understood, battle-tested
CONS:
├── Complexity: Two codebases (batch + streaming)
├── Consistency: Different results from same logic
├── Maintenance: Two systems to maintain and debug
├── Divergence: Batch and streaming logic can drift
└── Cost: Running two processing systems
THE BIG PROBLEM:
"Now you have two codebases doing the same thing."
- Write aggregation in Spark (batch)
- Write same aggregation in Flink (stream)
- Hope they produce the same results
- They often don't.
Chapter 3: Kappa Architecture
3.1 Streaming Only
KAPPA ARCHITECTURE
Jay Kreps' simplification (2014):
"Everything is a stream. Replay for reprocessing."
┌────────────────────────────────────────────────────────────────────────┐
│ KAPPA ARCHITECTURE │
│ │
│ ┌──────────────────────┐ │
│ │ Immutable Event │ │
│ │ Log │ │
│ │ (Kafka) │ │
│ │ │ │
│ │ All events stored │ │
│ │ with long retention │ │
│ └──────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ Stream Processor │ │
│ │ (Flink) │ │
│ │ │ │
│ │ Single codebase │ │
│ │ for all processing │ │
│ └──────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ Serving Layer │ │
│ │ (Druid/ClickHouse) │ │
│ └──────────────────────┘ │
│ │
│ KEY INSIGHT: │
│ If you need to reprocess (bug fix, new logic): │
│ 1. Start new stream processor reading from beginning │
│ 2. Process all historical events │
│ 3. Switch to new output when caught up │
│ 4. Delete old output │
│ │
└────────────────────────────────────────────────────────────────────────┘
REPROCESSING IN KAPPA:
V1 (current) V2 (new logic)
──────────── ──────────────
Processing live ───────────▶ Start from beginning
│ │
│ │ Catching up...
│ │
▼ ▼
┌─────────┐ ┌─────────┐
│Output V1│ │Output V2│
└─────────┘ └─────────┘
│ │
│ ◀──── Switch ───── │
│ (when caught up) │
▼
┌─────────┐
│ Delete │
│ V1 │
└─────────┘
3.2 Kappa Implementation
# architectures/kappa_architecture.py
"""
Kappa Architecture implementation.
Single streaming codebase for all processing.
Reprocessing via replay from event log.
"""
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any, Optional, Callable
from enum import Enum
import asyncio
class ProcessorVersion(Enum):
"""Processor version for blue-green deployments."""
V1 = "v1"
V2 = "v2"
@dataclass
class ProcessorConfig:
"""Configuration for stream processor."""
version: ProcessorVersion
kafka_brokers: list
input_topic: str
output_table: str
consumer_group: str
start_offset: str = "earliest" # or "latest" or timestamp
class KappaProcessor:
"""
Unified stream processor for Kappa architecture.
Key principle: One codebase, used for both
real-time processing and historical reprocessing.
"""
def __init__(self, config: ProcessorConfig):
self.config = config
self.aggregators: Dict[str, Callable] = {}
self.state: Dict[str, Any] = {}
def register_aggregation(
self,
name: str,
aggregator: Callable
):
"""Register an aggregation function."""
self.aggregators[name] = aggregator
async def process_event(self, event: Dict) -> Dict[str, Any]:
"""
Process a single event through all aggregators.
This same code handles:
1. Live events (normal operation)
2. Historical events (reprocessing)
"""
results = {}
for name, aggregator in self.aggregators.items():
result = await aggregator(event, self.state)
if result:
results[name] = result
return results
async def run(self):
"""
Main processing loop.
Consumes from Kafka, processes events, outputs to serving layer.
"""
# In practice, use Flink/Kafka Streams
# This is simplified pseudocode
consumer = self._create_consumer()
async for event in consumer:
try:
results = await self.process_event(event)
for metric_name, value in results.items():
await self._write_to_serving_layer(
metric_name,
value,
event.get("event_time")
)
except Exception as e:
await self._handle_error(event, e)
def _create_consumer(self):
"""Create Kafka consumer with appropriate offset."""
# Start from earliest for reprocessing
# Start from latest for caught-up live processing
pass
async def _write_to_serving_layer(
self,
metric: str,
value: Any,
timestamp: datetime
):
"""Write result to serving layer (e.g., Druid, ClickHouse)."""
pass
async def _handle_error(self, event: Dict, error: Exception):
"""Handle processing error."""
# Log error, send to DLQ, continue
pass
class KappaOrchestrator:
"""
Manages Kappa processor deployments and reprocessing.
"""
def __init__(self):
self.active_version: Optional[ProcessorVersion] = None
self.processors: Dict[ProcessorVersion, KappaProcessor] = {}
async def deploy_new_version(
self,
new_version: ProcessorVersion,
processor: KappaProcessor
):
"""
Deploy new processor version with reprocessing.
Steps:
1. Start new processor from beginning of log
2. Wait for it to catch up to live
3. Switch traffic to new version
4. Stop old version
"""
# Start new processor (reads from earliest)
self.processors[new_version] = processor
asyncio.create_task(processor.run())
# Wait for catch-up
await self._wait_for_catchup(processor)
# Atomic switch
old_version = self.active_version
self.active_version = new_version
# Stop old processor
if old_version:
await self._stop_processor(old_version)
async def _wait_for_catchup(self, processor: KappaProcessor):
"""Wait for processor to catch up to live data."""
while True:
lag = await self._get_consumer_lag(processor)
if lag < 1000: # Less than 1000 events behind
break
await asyncio.sleep(10)
async def _get_consumer_lag(self, processor: KappaProcessor) -> int:
"""Get consumer lag (how far behind live)."""
# Query Kafka for consumer group lag
pass
async def _stop_processor(self, version: ProcessorVersion):
"""Gracefully stop a processor version."""
pass
# =============================================================================
# Aggregation Functions
# =============================================================================
async def revenue_aggregator(
event: Dict,
state: Dict
) -> Optional[Dict]:
"""
Aggregate revenue by day.
Same code works for:
- Live events: Updates today's revenue
- Reprocessing: Recomputes historical revenue
"""
if event.get("event_type") != "order.placed":
return None
event_time = datetime.fromisoformat(event["event_time"])
date_key = event_time.strftime("%Y-%m-%d")
amount = event.get("payload", {}).get("total_amount", 0)
# Update state
if date_key not in state:
state[date_key] = {"revenue": 0, "orders": 0}
state[date_key]["revenue"] += amount
state[date_key]["orders"] += 1
return {
"date": date_key,
"revenue": state[date_key]["revenue"],
"orders": state[date_key]["orders"]
}
async def active_users_aggregator(
event: Dict,
state: Dict
) -> Optional[Dict]:
"""Aggregate unique active users by day."""
event_time = datetime.fromisoformat(event["event_time"])
date_key = event_time.strftime("%Y-%m-%d")
user_id = event.get("user_id")
if not user_id:
return None
# Track unique users
users_key = f"users_{date_key}"
if users_key not in state:
state[users_key] = set()
state[users_key].add(user_id)
return {
"date": date_key,
"dau": len(state[users_key])
}
3.3 Kappa Pros and Cons
KAPPA ARCHITECTURE: TRADE-OFFS
PROS:
├── Simplicity: One codebase for everything
├── Consistency: Same logic, same results
├── Maintenance: One system to maintain
├── Flexibility: Easy to reprocess with new logic
└── Modern: Fits streaming-first world
CONS:
├── Reprocessing time: Slow for large history
├── Storage: Must keep all events in Kafka
├── Ordering: Must handle out-of-order carefully
├── State: Managing large state is complex
└── Maturity: Fewer battle-tested examples
THE BIG QUESTION:
"Can you reprocess years of data fast enough?"
If you have:
- 1 year of data
- 1B events
- Reprocessing at 100K/sec
Reprocessing time: 10,000 seconds ≈ 3 hours
For many systems, this is acceptable.
For some, it's not.
Chapter 4: Choosing Your Architecture
4.1 Decision Framework
LAMBDA VS KAPPA: DECISION FRAMEWORK
┌────────────────────────────────────────────────────────────────────────┐
│ CHOOSE LAMBDA WHEN: │
│ │
│ ✓ Historical data is huge (PB scale) │
│ ✓ Batch processing is significantly cheaper │
│ ✓ You already have Spark/Hadoop infrastructure │
│ ✓ Complex ML training that's inherently batch │
│ ✓ Regulatory requirements for batch reconciliation │
│ ✓ Team is more experienced with batch │
│ │
└────────────────────────────────────────────────────────────────────────┘
┌────────────────────────────────────────────────────────────────────────┐
│ CHOOSE KAPPA WHEN: │
│ │
│ ✓ Real-time is primary use case │
│ ✓ Data volume allows reasonable reprocessing │
│ ✓ Simplicity is valued over optimization │
│ ✓ Team is experienced with streaming │
│ ✓ Starting fresh (no legacy batch) │
│ ✓ Events naturally fit streaming model │
│ │
└────────────────────────────────────────────────────────────────────────┘
┌────────────────────────────────────────────────────────────────────────┐
│ CONSIDER HYBRID WHEN: │
│ │
│ ✓ Different use cases need different approaches │
│ ✓ Some metrics need exact accuracy (batch) │
│ ✓ Some metrics need real-time (stream) │
│ ✓ Can afford complexity for optimization │
│ │
└────────────────────────────────────────────────────────────────────────┘
4.2 Modern Reality: The Unified Approach
THE MODERN TREND: UNIFIED BATCH + STREAM
Tools that unify both:
├── Apache Beam: Write once, run on Flink or Spark
├── Apache Flink: Batch is just bounded streaming
├── Databricks: Delta Lake with streaming + batch
├── Snowflake: Snowpipe (stream) + batch loading
THE IDEA:
Write your processing logic ONCE.
The engine runs it as stream or batch as needed.
Example with Apache Beam:
# Same code for batch and stream
events = pipeline | ReadFromKafka(topic) # or ReadFromFiles()
revenue = (
events
| Filter(lambda e: e.type == 'order.placed')
| WindowInto(FixedWindows(1 hour))
| CombineGlobally(sum_revenue)
)
revenue | WriteToBigQuery(table)
Run as streaming: continuous processing
Run as batch: process all files, terminate
This is where the industry is heading.
Part II: Implementation
Chapter 5: Stream Processing with Flink
5.1 Flink Fundamentals
# streaming/flink_processor.py
"""
Apache Flink stream processing implementation.
Flink concepts:
- DataStream: Unbounded stream of records
- Operators: Transformations (map, filter, keyBy, window)
- State: Per-key state for aggregations
- Checkpoints: Fault tolerance via periodic snapshots
- Watermarks: Tracking event time progress
"""
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, Iterator
from abc import ABC, abstractmethod
# =============================================================================
# Core Abstractions (Simplified Flink-like API)
# =============================================================================
@dataclass
class Event:
"""An event in the stream."""
event_id: str
event_type: str
event_time: datetime
user_id: Optional[str]
payload: Dict[str, Any]
class DataStream:
"""
Represents an unbounded stream of events.
"""
def __init__(self, source: Iterator[Event]):
self._source = source
self._transformations = []
def filter(self, predicate) -> 'DataStream':
"""Filter events based on predicate."""
self._transformations.append(('filter', predicate))
return self
def map(self, mapper) -> 'DataStream':
"""Transform each event."""
self._transformations.append(('map', mapper))
return self
def key_by(self, key_selector) -> 'KeyedStream':
"""Partition stream by key for stateful processing."""
return KeyedStream(self, key_selector)
class KeyedStream:
"""
Stream partitioned by key.
Enables:
- Stateful processing per key
- Windowed aggregations per key
- Exactly-once per-key guarantees
"""
def __init__(self, stream: DataStream, key_selector):
self.stream = stream
self.key_selector = key_selector
def window(self, window_assigner: 'WindowAssigner') -> 'WindowedStream':
"""Apply windowing to the keyed stream."""
return WindowedStream(self, window_assigner)
def process(self, processor: 'KeyedProcessFunction') -> DataStream:
"""Apply stateful processing function."""
# Execute processor with per-key state
pass
class WindowAssigner(ABC):
"""Assigns events to windows."""
@abstractmethod
def assign_windows(self, event: Event) -> list:
pass
class TumblingEventTimeWindows(WindowAssigner):
"""
Fixed-size, non-overlapping windows based on event time.
Example: 1-hour windows
[00:00-01:00), [01:00-02:00), [02:00-03:00), ...
"""
def __init__(self, size: timedelta):
self.size = size
def assign_windows(self, event: Event) -> list:
window_start = self._get_window_start(event.event_time)
window_end = window_start + self.size
return [(window_start, window_end)]
def _get_window_start(self, timestamp: datetime) -> datetime:
"""Get the start of the window containing timestamp."""
epoch = datetime(1970, 1, 1)
elapsed = (timestamp - epoch).total_seconds()
window_seconds = self.size.total_seconds()
window_start_seconds = (elapsed // window_seconds) * window_seconds
return epoch + timedelta(seconds=window_start_seconds)
class SlidingEventTimeWindows(WindowAssigner):
"""
Overlapping windows that slide.
Example: 1-hour windows, sliding every 15 minutes
[00:00-01:00), [00:15-01:15), [00:30-01:30), ...
"""
def __init__(self, size: timedelta, slide: timedelta):
self.size = size
self.slide = slide
def assign_windows(self, event: Event) -> list:
windows = []
timestamp = event.event_time
# Event belongs to multiple overlapping windows
earliest_start = self._get_earliest_window_start(timestamp)
start = earliest_start
while start + self.size > timestamp:
windows.append((start, start + self.size))
start = start - self.slide
return windows
def _get_earliest_window_start(self, timestamp: datetime) -> datetime:
"""Get earliest window that contains this timestamp."""
epoch = datetime(1970, 1, 1)
elapsed = (timestamp - epoch).total_seconds()
slide_seconds = self.slide.total_seconds()
# Align to slide boundaries
aligned = (elapsed // slide_seconds) * slide_seconds
return epoch + timedelta(seconds=aligned)
class SessionWindows(WindowAssigner):
"""
Windows based on activity sessions.
A session ends when there's a gap > threshold.
"""
def __init__(self, gap: timedelta):
self.gap = gap
def assign_windows(self, event: Event) -> list:
# Sessions are merged dynamically as events arrive
# Initial window is [event_time, event_time + gap)
return [(event.event_time, event.event_time + self.gap)]
# =============================================================================
# Windowed Aggregation
# =============================================================================
class WindowedStream:
"""
Keyed stream with windowing applied.
"""
def __init__(self, keyed_stream: KeyedStream, assigner: WindowAssigner):
self.keyed_stream = keyed_stream
self.assigner = assigner
def aggregate(self, aggregator: 'AggregateFunction') -> DataStream:
"""
Apply aggregation function to each window.
Flink incrementally aggregates as events arrive,
not waiting for window to complete.
"""
pass
def reduce(self, reducer) -> DataStream:
"""Reduce events in window to single value."""
pass
class AggregateFunction(ABC):
"""
Incremental aggregation function.
Maintains an accumulator that's updated per event.
"""
@abstractmethod
def create_accumulator(self) -> Any:
"""Create initial accumulator."""
pass
@abstractmethod
def add(self, value: Any, accumulator: Any) -> Any:
"""Add value to accumulator."""
pass
@abstractmethod
def get_result(self, accumulator: Any) -> Any:
"""Get result from accumulator."""
pass
@abstractmethod
def merge(self, acc1: Any, acc2: Any) -> Any:
"""Merge two accumulators (for session windows)."""
pass
class RevenueAggregator(AggregateFunction):
"""Aggregate revenue in a window."""
def create_accumulator(self) -> Dict:
return {"total": 0, "count": 0}
def add(self, order: Dict, acc: Dict) -> Dict:
acc["total"] += order.get("amount", 0)
acc["count"] += 1
return acc
def get_result(self, acc: Dict) -> Dict:
return {
"revenue": acc["total"],
"orders": acc["count"],
"avg_order": acc["total"] / acc["count"] if acc["count"] > 0 else 0
}
def merge(self, acc1: Dict, acc2: Dict) -> Dict:
return {
"total": acc1["total"] + acc2["total"],
"count": acc1["count"] + acc2["count"]
}
5.2 Watermarks and Late Data
# streaming/watermarks.py
"""
Watermarks for handling event time in streaming.
The Problem:
Events arrive out of order. When can we close a window?
Example:
Real time → 10:00 10:01 10:02 10:03 10:04
Events arrive: [A-9:58] [B-9:55] [C-10:01] [D-9:59] [E-10:02]
When can we emit the [9:00-10:00) window?
We might receive a 9:xx event at any time!
Solution: WATERMARKS
"I believe all events with time <= W have arrived"
When watermark passes window end, window can fire.
"""
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
@dataclass
class Watermark:
"""
Represents progress of event time.
Watermark(t) means: "No more events with event_time <= t expected"
"""
timestamp: datetime
class WatermarkStrategy:
"""
Strategy for generating watermarks.
"""
def __init__(self, max_out_of_orderness: timedelta):
"""
Args:
max_out_of_orderness: How late events can be.
If events can be up to 5 minutes late,
watermark = max_event_time - 5 minutes
"""
self.max_out_of_orderness = max_out_of_orderness
self.current_max_timestamp: Optional[datetime] = None
def on_event(self, event_time: datetime) -> Optional[Watermark]:
"""Update watermark based on new event."""
if self.current_max_timestamp is None:
self.current_max_timestamp = event_time
else:
self.current_max_timestamp = max(
self.current_max_timestamp,
event_time
)
# Watermark = max seen - allowed lateness
watermark_time = self.current_max_timestamp - self.max_out_of_orderness
return Watermark(watermark_time)
def get_current_watermark(self) -> Optional[Watermark]:
"""Get current watermark."""
if self.current_max_timestamp:
return Watermark(
self.current_max_timestamp - self.max_out_of_orderness
)
return None
class LateDataHandler:
"""
Handles events that arrive after their window closed.
"""
def __init__(
self,
allowed_lateness: timedelta,
late_output_tag: str = "late-data"
):
"""
Args:
allowed_lateness: How long to keep window state after closing
late_output_tag: Tag for side output of late events
"""
self.allowed_lateness = allowed_lateness
self.late_output_tag = late_output_tag
def is_late(
self,
event_time: datetime,
watermark: datetime,
window_end: datetime
) -> bool:
"""Check if event is late for its window."""
# Window hasn't closed yet
if watermark < window_end:
return False
# Window closed, but within allowed lateness
if watermark < window_end + self.allowed_lateness:
return False # Can still update window
# Too late, drop or side output
return True
WATERMARK_EXPLANATION = """
WATERMARKS VISUALIZED
Events arriving (real time on X, event time shown):
Real time: ────────────────────────────────────▶
10:00 10:01 10:02 10:03 10:04 10:05
Events: A B C D E F
Event times: 9:58 9:55 10:01 9:59 10:02 10:04
Max lateness: 5 minutes
Watermarks:
After A: WM = 9:58 - 5min = 9:53
After B: WM = 9:58 - 5min = 9:53 (B is old, doesn't advance)
After C: WM = 10:01 - 5min = 9:56
After D: WM = 10:01 - 5min = 9:56 (D is old)
After E: WM = 10:02 - 5min = 9:57
After F: WM = 10:04 - 5min = 9:59
Window [9:00-10:00) can fire when watermark >= 10:00
This happens... never in this example!
Only when we see an event with time >= 10:05 will the
watermark reach 10:00 and the window fire.
LATE DATA:
If window [9:00-10:00) already fired, and then event
with time 9:50 arrives, it's LATE.
Options:
1. DROP: Ignore it (simplest, loses data)
2. SIDE OUTPUT: Send to separate stream for special handling
3. REFIRE: Update window output (complex, not always supported)
4. ALLOWED LATENESS: Keep window state longer
"""
5.3 State Management
# streaming/state_management.py
"""
State management in stream processing.
State is the memory of past events needed for current processing.
Examples:
- Running sum: Need previous sum
- Distinct count: Need set of seen IDs
- Session windows: Need events in current session
- Joins: Need buffer of events from both streams
"""
from dataclasses import dataclass
from typing import Dict, Any, Optional, Generic, TypeVar
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import pickle
T = TypeVar('T')
class StateBackend(ABC):
"""
Backend for storing stream processing state.
Options:
- Memory: Fast, lost on failure
- RocksDB: Larger-than-memory, persistent locally
- Remote (Redis, etc.): Shared across nodes
"""
@abstractmethod
def get(self, key: str) -> Optional[Any]:
pass
@abstractmethod
def put(self, key: str, value: Any):
pass
@abstractmethod
def delete(self, key: str):
pass
@abstractmethod
def checkpoint(self) -> bytes:
"""Serialize all state for checkpointing."""
pass
@abstractmethod
def restore(self, data: bytes):
"""Restore state from checkpoint."""
pass
class InMemoryStateBackend(StateBackend):
"""Simple in-memory state backend."""
def __init__(self):
self._state: Dict[str, Any] = {}
def get(self, key: str) -> Optional[Any]:
return self._state.get(key)
def put(self, key: str, value: Any):
self._state[key] = value
def delete(self, key: str):
self._state.pop(key, None)
def checkpoint(self) -> bytes:
return pickle.dumps(self._state)
def restore(self, data: bytes):
self._state = pickle.loads(data)
class RocksDBStateBackend(StateBackend):
"""
RocksDB-backed state for larger-than-memory state.
Used in production Flink deployments.
"""
def __init__(self, db_path: str):
# In practice: import rocksdb; self.db = rocksdb.DB(db_path)
self.db_path = db_path
self._db = {} # Simplified
def get(self, key: str) -> Optional[Any]:
value_bytes = self._db.get(key.encode())
if value_bytes:
return pickle.loads(value_bytes)
return None
def put(self, key: str, value: Any):
self._db[key.encode()] = pickle.dumps(value)
def delete(self, key: str):
self._db.pop(key.encode(), None)
def checkpoint(self) -> bytes:
# In practice: Use RocksDB snapshots
return pickle.dumps(dict(self._db))
def restore(self, data: bytes):
self._db = pickle.loads(data)
class ValueState(Generic[T]):
"""
State holding a single value per key.
Example: Current sum for a user
"""
def __init__(self, backend: StateBackend, name: str, key: str):
self.backend = backend
self.state_key = f"{name}:{key}"
def value(self) -> Optional[T]:
return self.backend.get(self.state_key)
def update(self, value: T):
self.backend.put(self.state_key, value)
def clear(self):
self.backend.delete(self.state_key)
class ListState(Generic[T]):
"""
State holding a list of values per key.
Example: All events in a session
"""
def __init__(self, backend: StateBackend, name: str, key: str):
self.backend = backend
self.state_key = f"{name}:{key}"
def get(self) -> list:
return self.backend.get(self.state_key) or []
def add(self, value: T):
current = self.get()
current.append(value)
self.backend.put(self.state_key, current)
def clear(self):
self.backend.delete(self.state_key)
class MapState(Generic[T]):
"""
State holding a map of values per key.
Example: Count per category for a user
"""
def __init__(self, backend: StateBackend, name: str, key: str):
self.backend = backend
self.state_key = f"{name}:{key}"
def get(self, map_key: str) -> Optional[T]:
current = self.backend.get(self.state_key) or {}
return current.get(map_key)
def put(self, map_key: str, value: T):
current = self.backend.get(self.state_key) or {}
current[map_key] = value
self.backend.put(self.state_key, current)
def keys(self) -> list:
current = self.backend.get(self.state_key) or {}
return list(current.keys())
def clear(self):
self.backend.delete(self.state_key)
# =============================================================================
# Stateful Processing Example
# =============================================================================
class UserSessionProcessor:
"""
Stateful processor that tracks user sessions.
A session ends after 30 minutes of inactivity.
"""
SESSION_TIMEOUT = timedelta(minutes=30)
def __init__(self, state_backend: StateBackend):
self.backend = state_backend
def process_event(self, user_id: str, event: Dict) -> Optional[Dict]:
"""
Process event, maintain session state, emit session on close.
"""
# Get session state for this user
session_state = ValueState[Dict](self.backend, "session", user_id)
timer_state = ValueState[datetime](self.backend, "timer", user_id)
event_time = datetime.fromisoformat(event["event_time"])
current_session = session_state.value()
result = None
if current_session is None:
# New session
current_session = {
"user_id": user_id,
"start_time": event_time.isoformat(),
"events": [],
"event_count": 0
}
else:
# Check if session timed out
last_timer = timer_state.value()
if last_timer and event_time > last_timer:
# Session closed, emit it
result = {
"type": "session_complete",
"session": current_session
}
# Start new session
current_session = {
"user_id": user_id,
"start_time": event_time.isoformat(),
"events": [],
"event_count": 0
}
# Add event to session
current_session["events"].append(event)
current_session["event_count"] += 1
current_session["last_event_time"] = event_time.isoformat()
# Update state
session_state.update(current_session)
timer_state.update(event_time + self.SESSION_TIMEOUT)
return result
Chapter 6: Batch Processing with Spark
6.1 Spark for Analytics
# batch/spark_processor.py
"""
Apache Spark batch processing for analytics.
Spark concepts:
- DataFrame: Distributed table of data
- Transformations: Lazy operations (filter, map, join)
- Actions: Trigger computation (collect, write)
- Partitioning: How data is distributed
- Caching: Keep intermediate results in memory
"""
from dataclasses import dataclass
from datetime import datetime, date, timedelta
from typing import Dict, Any, List, Optional
@dataclass
class SparkConfig:
"""Spark application configuration."""
app_name: str
master: str = "local[*]" # or "yarn", "k8s://..."
executor_memory: str = "4g"
executor_cores: int = 4
num_executors: int = 10
class AnalyticsBatchJob:
"""
Batch analytics job using Spark.
Processes historical data to compute accurate metrics.
"""
def __init__(self, spark_session):
self.spark = spark_session
def compute_daily_metrics(
self,
events_path: str,
output_path: str,
target_date: date
) -> Dict[str, Any]:
"""
Compute all daily metrics for a specific date.
This is the batch layer of Lambda architecture.
"""
# Read events for the day
events = self.spark.read.parquet(events_path).filter(
f"date(event_time) = '{target_date}'"
)
# Cache for multiple aggregations
events.cache()
metrics = {}
# Revenue metrics
revenue_df = events.filter(
"event_type = 'order.placed'"
).agg({
"payload.total_amount": "sum",
"*": "count"
}).collect()[0]
metrics["revenue"] = revenue_df[0] or 0
metrics["order_count"] = revenue_df[1] or 0
# User metrics
user_metrics = events.agg({
"user_id": "count_distinct"
}).collect()[0]
metrics["dau"] = user_metrics[0] or 0
# Funnel metrics
metrics["funnel"] = self._compute_funnel(events)
# Product metrics
metrics["top_products"] = self._compute_top_products(events)
# Write results
self._write_metrics(output_path, target_date, metrics)
return metrics
def _compute_funnel(self, events) -> Dict[str, int]:
"""Compute conversion funnel."""
funnel = {}
# View → Cart → Purchase funnel
funnel["product_views"] = events.filter(
"event_type = 'product.viewed'"
).select("user_id").distinct().count()
funnel["add_to_cart"] = events.filter(
"event_type = 'cart.updated' AND payload.action = 'add'"
).select("user_id").distinct().count()
funnel["purchases"] = events.filter(
"event_type = 'order.placed'"
).select("user_id").distinct().count()
# Conversion rates
if funnel["product_views"] > 0:
funnel["view_to_cart_rate"] = (
funnel["add_to_cart"] / funnel["product_views"]
)
funnel["view_to_purchase_rate"] = (
funnel["purchases"] / funnel["product_views"]
)
if funnel["add_to_cart"] > 0:
funnel["cart_to_purchase_rate"] = (
funnel["purchases"] / funnel["add_to_cart"]
)
return funnel
def _compute_top_products(self, events, limit: int = 100) -> List[Dict]:
"""Compute top products by various metrics."""
product_stats = events.filter(
"event_type IN ('product.viewed', 'order.placed')"
).groupBy(
"payload.product_id"
).agg({
"*": "count",
"payload.total_amount": "sum"
}).orderBy(
"count(*)", ascending=False
).limit(limit).collect()
return [
{
"product_id": row["product_id"],
"view_count": row["count(*)"],
"revenue": row["sum(total_amount)"] or 0
}
for row in product_stats
]
def _write_metrics(
self,
output_path: str,
target_date: date,
metrics: Dict
):
"""Write metrics to output storage."""
# Write to partitioned parquet
self.spark.createDataFrame([{
"date": target_date.isoformat(),
"metrics": metrics,
"computed_at": datetime.utcnow().isoformat()
}]).write.mode("overwrite").parquet(
f"{output_path}/date={target_date}"
)
class IncrementalBatchJob:
"""
Incremental batch job that processes only new data.
More efficient than full recomputation.
"""
def __init__(self, spark_session):
self.spark = spark_session
def process_incremental(
self,
events_path: str,
state_path: str,
output_path: str,
watermark: datetime
):
"""
Process only events since last watermark.
Merge with existing state for complete picture.
"""
# Read new events only
new_events = self.spark.read.parquet(events_path).filter(
f"event_time > '{watermark.isoformat()}'"
)
# Read existing aggregates
existing = self.spark.read.parquet(state_path)
# Compute incremental aggregates
incremental = new_events.groupBy("date(event_time) as date").agg({
"payload.total_amount": "sum",
"*": "count"
})
# Merge: Add incremental to existing
merged = existing.unionByName(incremental).groupBy("date").agg({
"sum(total_amount)": "sum",
"count": "sum"
})
# Write updated state
merged.write.mode("overwrite").parquet(state_path)
# Also write to serving layer
merged.write.mode("overwrite").parquet(output_path)
6.2 Spark Streaming
# batch/spark_streaming.py
"""
Spark Structured Streaming for unified batch + stream.
Structured Streaming treats streams as unbounded tables,
using the same DataFrame API as batch.
"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
class UnifiedAnalyticsProcessor:
"""
Unified processor that works for both batch and streaming.
Same code, different execution modes.
"""
def __init__(self, spark: SparkSession):
self.spark = spark
def create_streaming_source(self, kafka_brokers: str, topic: str):
"""Create streaming DataFrame from Kafka."""
return self.spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_brokers) \
.option("subscribe", topic) \
.option("startingOffsets", "latest") \
.load() \
.select(
from_json(
col("value").cast("string"),
self._get_event_schema()
).alias("event")
).select("event.*")
def create_batch_source(self, path: str):
"""Create batch DataFrame from files."""
return self.spark.read.parquet(path)
def compute_revenue_metrics(self, events_df):
"""
Compute revenue metrics.
Works identically for batch and streaming DataFrames!
"""
return events_df \
.filter(col("event_type") == "order.placed") \
.withWatermark("event_time", "1 hour") \
.groupBy(
window("event_time", "1 hour"),
"payload.category"
) \
.agg(
sum("payload.total_amount").alias("revenue"),
count("*").alias("order_count"),
avg("payload.total_amount").alias("avg_order_value")
)
def write_streaming(self, df, output_path: str, checkpoint_path: str):
"""Write streaming DataFrame with checkpointing."""
return df.writeStream \
.format("parquet") \
.option("path", output_path) \
.option("checkpointLocation", checkpoint_path) \
.outputMode("append") \
.trigger(processingTime="1 minute") \
.start()
def write_batch(self, df, output_path: str):
"""Write batch DataFrame."""
df.write \
.mode("overwrite") \
.partitionBy("date") \
.parquet(output_path)
def _get_event_schema(self):
"""Define event schema."""
return StructType([
StructField("event_id", StringType()),
StructField("event_type", StringType()),
StructField("event_time", TimestampType()),
StructField("user_id", StringType()),
StructField("payload", MapType(StringType(), StringType()))
])
# =============================================================================
# Usage Examples
# =============================================================================
USAGE_EXAMPLES = """
UNIFIED BATCH + STREAMING
# Same processing logic for both!
processor = UnifiedAnalyticsProcessor(spark)
# STREAMING MODE
stream_df = processor.create_streaming_source("kafka:9092", "events")
revenue = processor.compute_revenue_metrics(stream_df)
query = processor.write_streaming(revenue, "/output/stream", "/checkpoint")
# BATCH MODE
batch_df = processor.create_batch_source("/data/events")
revenue = processor.compute_revenue_metrics(batch_df)
processor.write_batch(revenue, "/output/batch")
THE POWER:
- Same aggregation code
- Same business logic
- Different execution modes
- Consistent results
"""
Part III: Real-World Application
Chapter 7: Case Studies
7.1 Netflix: Lambda to Kappa Transition
NETFLIX ANALYTICS EVOLUTION
BEFORE (Lambda-ish):
├── Batch: Spark on HDFS, daily jobs
├── Streaming: Flink for real-time
├── Problem: Two codebases, divergent results
└── Pain: "Why is real-time different from batch?"
AFTER (Kappa-ish):
├── Single: Flink for everything
├── Kafka: Long retention for replay
├── Result: One codebase, consistent
└── Reprocess: Replay from Kafka
KEY LEARNINGS:
1. Unified codebase reduced bugs by 60%
2. Onboarding time cut in half
3. Schema changes deploy once, not twice
4. Cost slightly higher but worth simplicity
7.2 Uber: Lambda at Scale
UBER ANALYTICS ARCHITECTURE
They kept Lambda because:
├── Historical data: Petabytes
├── Reprocessing: Would take days in Flink
├── ML training: Inherently batch
└── Cost: Batch on spot instances is cheaper
Their approach:
├── Streaming: Apache Flink for real-time
├── Batch: Apache Spark on YARN
├── Unified: Apache Beam for some jobs
├── Storage: Apache Hudi (streaming to batch-readable)
KEY INSIGHT:
"We use Lambda not because we want to,
but because the alternatives are worse at our scale."
At Uber's scale (trillions of events):
├── Kappa reprocessing: 1+ week
├── Lambda batch: 6 hours daily
└── Decision: Lambda wins on practicality
Chapter 8: Common Mistakes
8.1 Streaming Mistakes
STREAMING ANTI-PATTERNS
❌ MISTAKE 1: Ignoring Late Data
Wrong:
# Just drop late events
events.filter("event_time > watermark")
Problem:
Mobile events routinely arrive late
Dropping them = wrong metrics
Right:
events
.withWatermark("event_time", "1 hour")
.withLateDataOutputTag("late-events")
# Process late events in separate path
❌ MISTAKE 2: Unbounded State
Wrong:
# Keep all sessions forever
state[user_id] = session
Problem:
State grows forever
Eventually OOM
Right:
# Use state TTL
state.set_ttl(timedelta(days=7))
# Or use timers to clean up
timer.register_cleanup(session_end + timedelta(hours=1))
❌ MISTAKE 3: Not Checkpointing
Wrong:
# No checkpoints configured
stream.process(handler)
Problem:
Crash = start from scratch
Data loss or reprocessing
Right:
stream
.enable_checkpointing(interval=60_000) # Every minute
.checkpoint_storage("s3://checkpoints/")
8.2 Batch Mistakes
BATCH ANTI-PATTERNS
❌ MISTAKE 1: No Idempotency
Wrong:
# Append to output on every run
df.write.mode("append").parquet(output)
# Re-run = duplicates!
Problem:
Job fails mid-write, retry creates duplicates
Right:
# Overwrite partitions atomically
df.write.mode("overwrite").partitionBy("date").parquet(output)
# Or use exactly-once with Delta Lake
df.write.format("delta").mode("merge").save(output)
❌ MISTAKE 2: Ignoring Skew
Wrong:
# Group by a skewed key
events.groupBy("celebrity_user_id").count()
# One executor gets 90% of data!
Problem:
One task takes hours, others finish in minutes
Right:
# Salt the key
events
.withColumn("salted_key", concat("user_id", lit("_"), rand() % 10))
.groupBy("salted_key")
.count()
.groupBy("user_id") # Aggregate salts
.sum("count")
Part IV: Interview Preparation
Chapter 9: Interview Tips
9.1 When to Discuss Stream vs Batch
TOPIC TRIGGERS:
"Design a real-time dashboard"
→ Primarily streaming
→ Discuss: windowing, approximate vs exact
"Design an analytics platform"
→ Both streaming and batch
→ Discuss: Lambda vs Kappa trade-offs
"Design a recommendation system"
→ ML training is batch
→ Serving might use streaming for freshness
"Design fraud detection"
→ Streaming for real-time alerts
→ Batch for model training
"Design a data warehouse"
→ Primarily batch
→ Maybe streaming for near-real-time
9.2 Key Phrases
TALKING ABOUT STREAM VS BATCH:
"For this use case, I'd consider whether we need real-time
results or if hourly/daily is acceptable. Real-time adds
complexity with state management, exactly-once, and late data."
"I'd use a Lambda architecture here because we have both
real-time needs and accuracy requirements. The batch layer
handles late data and corrections, while streaming provides
immediate visibility."
"For this volume, I'd lean toward Kappa since reprocessing
100GB from Kafka is fast enough. The single codebase is
worth more than the slight efficiency of batch-only."
"The key trade-off is latency vs accuracy vs cost.
Streaming gives seconds of latency but higher cost.
Batch gives hours of latency but lower cost and easier
handling of corrections."
Chapter 10: Practice Problems
Problem 1: Real-Time Leaderboard
Setup: Design a gaming leaderboard that shows:
- Top 100 players globally (updates every second)
- Your rank (updates every action)
- Historical rankings (end of day snapshots)
Questions:
- Stream, batch, or both? Why?
- How do you handle millions of score updates/second?
- How do you efficiently compute "your rank" among millions?
- Streaming for real-time updates
- Redis sorted sets for O(log N) rank lookup
- Batch for historical snapshots
- Sample or approximate for global leaderboard
Problem 2: Ad Attribution
Setup: Track which ad led to a purchase:
- User sees ad at time T1
- User purchases at time T2 (within 7 days)
- Attribute purchase to ad
Questions:
- How do you join ad views with purchases?
- What if the purchase event arrives before the ad view?
- Streaming or batch for attribution?
- Stream-stream join with state
- Watermarks to handle out-of-order
- Buffering needed for 7-day window
- Maybe batch for accuracy, stream for estimates
Chapter 11: Sample Interview Dialogue
Interviewer: "Design an analytics system for our e-commerce platform."
You: "Let me start by understanding the requirements. What types of analytics are needed, and what's the latency requirement?"
Interviewer: "We need real-time revenue dashboards for executives, and daily reports for finance. Also, conversion funnel analysis."
You: "Got it. This is a classic case where we need both real-time and batch processing. Let me propose an architecture.
For the streaming layer, I'd use Flink consuming from Kafka. It would compute real-time aggregates like current revenue, active users, and approximate conversion funnels. These feed into Druid for real-time dashboards.
For the batch layer, I'd use Spark running hourly. It would compute exact metrics including complete funnel analysis with proper session handling, accurate revenue with all late events included, and the data that finance needs.
The key insight is that some metrics need real-time approximation, while others need daily accuracy. Revenue for executives can be 'roughly right now' — they don't need penny-perfect accuracy at 2 PM. But the end-of-day report must be exact."
Interviewer: "Why not just use streaming for everything?"
You: "Great question. We could, but there are trade-offs:
First, late data. E-commerce has refunds, corrections, and late mobile events. In streaming, we'd need to handle allowed lateness, retractions, and updates — adding complexity.
Second, cost. Streaming is always-on. Batch can use spot instances and run only when needed.
Third, accuracy vs speed. For the finance report, we want to wait for all late data. With streaming, we'd need to recompute or issue corrections.
However, if simplicity is paramount and the scale is manageable, Kappa with long Kafka retention could work. We could reprocess nightly to handle corrections. It depends on whether the team is more comfortable with Spark or Flink."
Summary
DAY 2 SUMMARY: STREAMING VS BATCH
BATCH PROCESSING
├── Process data in large chunks
├── High throughput, high latency
├── Accurate (all data processed together)
├── Cheaper (periodic, uses spot instances)
└── Simpler programming model
STREAM PROCESSING
├── Process data continuously
├── Lower throughput, low latency
├── Approximate (late data challenges)
├── More expensive (always on)
└── Complex (state, exactly-once, watermarks)
LAMBDA ARCHITECTURE
├── Batch layer + Speed layer + Serving layer
├── Batch for accuracy, streaming for speed
├── Two codebases (main disadvantage)
└── Best when: Huge history, mixed requirements
KAPPA ARCHITECTURE
├── Streaming only + Replay for reprocessing
├── Single codebase
├── Replay must be fast enough
└── Best when: Moderate history, simplicity valued
MODERN TREND
├── Unified frameworks (Beam, Flink, Spark Structured Streaming)
├── Write once, run as batch or stream
└── Industry moving toward unified APIs
Key Takeaways
STREAMING VS BATCH KEY TAKEAWAYS
1. THEY SOLVE DIFFERENT PROBLEMS
Stream: "What's happening NOW?"
Batch: "What happened EXACTLY?"
2. LATE DATA IS THE KILLER
Streaming makes late data hard
Batch handles it naturally
3. SIMPLICITY HAS VALUE
Kappa's single codebase often wins
Unless scale forces Lambda
4. COST MATTERS
Streaming: Always on = always paying
Batch: Pay only when running
5. THE INDUSTRY IS CONVERGING
Unified APIs blur the line
Same code for both modes
GOLDEN RULES:
├── Start with batch (simpler)
├── Add streaming only when needed
├── Use watermarks for late data
├── Checkpoint everything
└── One codebase when possible
What's Next
Tomorrow, we'll dive into Data Modeling for Analytics — designing schemas that make queries fast over billions of rows.
TOMORROW'S PREVIEW: DATA MODELING
Questions we'll answer:
├── What is a star schema?
├── Fact tables vs dimension tables?
├── How to handle slowly changing dimensions?
├── When to pre-aggregate?
└── Partitioning for analytics performance?
We'll design the data warehouse that our
streaming and batch jobs write to.
End of Week 8, Day 2
Next: Day 3 — Data Modeling for Analytics