Himanshu Kukreja
0%
LearnSystem DesignWeek 8Streaming Vs Batch
Day 02

Week 8 — Day 2: Streaming vs Batch Processing

System Design Mastery Series — Analytics Pipeline Week


Preface

Yesterday, we built an event ingestion pipeline that reliably captures 100M+ events per day into Kafka. Now we have a firehose of data.

THE PROCESSING DILEMMA

Events are flowing into Kafka at 1,000/second:
├── Product views
├── Searches
├── Cart updates
├── Orders
├── Payments

Business asks three questions:

Question 1: "What's our revenue RIGHT NOW?"
└── Need: Real-time, updates every second
└── Tolerance: Can be 1-2% off

Question 2: "What was our revenue YESTERDAY?"
└── Need: Exact number for finance
└── Tolerance: Must be 100% accurate

Question 3: "What's trending THIS HOUR?"
└── Need: Near real-time, updates every minute
└── Tolerance: Some lag is okay

One architecture cannot optimally serve all three.
This is the streaming vs batch dilemma.

Today, we'll learn when to stream, when to batch, and how to do both.


Part I: Foundations

Chapter 1: The Two Worlds of Data Processing

1.1 Batch Processing: The Original Approach

BATCH PROCESSING

Process data in large, discrete chunks:

┌─────────────────────────────────────────────────────────────────────────┐
│                      BATCH PROCESSING TIMELINE                          │
│                                                                         │
│  Events:  ──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──      │
│            │                              │                             │
│            └──────────── Batch 1 ─────────┘                             │
│                           │                                             │
│                           ▼                                             │
│                    ┌─────────────┐                                      │
│                    │   Process   │                                      │
│                    │  (Spark)    │                                      │
│                    └──────┬──────┘                                      │
│                           │                                             │
│                           ▼                                             │
│                    ┌─────────────┐                                      │
│                    │   Output    │                                      │
│                    │  (Table)    │                                      │
│                    └─────────────┘                                      │
│                                                                         │
│  Latency: Hours (wait for batch to complete)                            │
│  Accuracy: High (all data processed together)                           │
│  Complexity: Lower (simpler programming model)                          │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

WHEN TO USE BATCH:
├── Historical analysis ("last year's trends")
├── Machine learning training
├── End-of-day financial reconciliation
├── Data warehouse loading
└── Complex aggregations across large datasets

1.2 Stream Processing: The Real-Time Approach

STREAM PROCESSING

Process data continuously as it arrives:

┌────────────────────────────────────────────────────────────────────────┐
│                     STREAM PROCESSING TIMELINE                         │
│                                                                        │
│  Events:  ──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──     │
│            │  │  │  │  │  │  │  │  │  │  │  │  │  │  │  │  │  │        │
│            ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼        │
│         ┌───────────────────────────────────────────────────────┐      │
│         │              Stream Processor (Flink)                 │      │
│         │  ┌─────┐  ┌─────┐  ┌─────┐  ┌─────┐                   │      │
│         │  │State│  │State│  │State│  │State│   (per-key state) │      │
│         │  └─────┘  └─────┘  └─────┘  └─────┘                   │      │
│         └───────────────────────────────────────────────────────┘      │
│                              │                                         │
│                              ▼ (continuous output)                     │
│                       ┌─────────────┐                                  │
│                       │  Real-time  │                                  │
│                       │   Views     │                                  │
│                       └─────────────┘                                  │
│                                                                        │
│  Latency: Milliseconds to seconds                                      │
│  Accuracy: Approximate (may miss late data)                            │
│  Complexity: Higher (state management, exactly-once)                   │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

WHEN TO USE STREAMING:
├── Real-time dashboards
├── Fraud detection (immediate action needed)
├── Live recommendations
├── Alerting and monitoring
└── Session analytics

1.3 The Trade-offs

STREAMING VS BATCH: THE FUNDAMENTAL TRADE-OFF

                    STREAMING              BATCH
                    ─────────              ─────
Latency             Seconds                Hours
Throughput          Lower                  Higher
Accuracy            Approximate*           Exact
Late data           Complex                Natural
State               In-memory              Disk-based
Cost                Higher (always-on)     Lower (periodic)
Complexity          Higher                 Lower
Recovery            Checkpoints            Re-run job

* Approximate because late-arriving events may be missed

THE TRUTH:
Most systems need BOTH streaming AND batch.
The question is: how do you combine them?

Chapter 2: Lambda Architecture

2.1 The Original Hybrid Approach

LAMBDA ARCHITECTURE

Nathan Marz's original design (2011):
"Have both batch and streaming, merge results at query time"

┌────────────────────────────────────────────────────────────────────────┐
│                       LAMBDA ARCHITECTURE                              │
│                                                                        │
│                         ┌─────────────┐                                │
│                         │   Events    │                                │
│                         │  (Kafka)    │                                │
│                         └──────┬──────┘                                │
│                                │                                       │
│              ┌─────────────────┼─────────────────┐                     │
│              │                 │                 │                     │
│              ▼                 │                 ▼                     │
│       ┌─────────────┐          │          ┌─────────────┐              │
│       │   BATCH     │          │          │   SPEED     │              │
│       │   LAYER     │          │          │   LAYER     │              │
│       │             │          │          │             │              │
│       │  ┌───────┐  │          │          │  ┌───────┐  │              │
│       │  │ Spark │  │          │          │  │ Flink │  │              │
│       │  │  Job  │  │          │          │  │Stream │  │              │
│       │  └───┬───┘  │          │          │  └───┬───┘  │              │
│       │      │      │          │          │      │      │              │
│       │      ▼      │          │          │      ▼      │              │
│       │  ┌───────┐  │          │          │  ┌───────┐  │              │
│       │  │ Batch │  │          │          │  │ Real- │  │              │
│       │  │ Views │  │          │          │  │ time  │  │              │
│       │  │(daily)│  │          │          │  │ Views │  │              │
│       │  └───┬───┘  │          │          │  └───┬───┘  │              │
│       └──────┼──────┘          │          └──────┼──────┘              │
│              │                 │                 │                     │
│              └─────────────────┼─────────────────┘                     │
│                                │                                       │
│                                ▼                                       │
│                    ┌───────────────────────┐                           │
│                    │     SERVING LAYER     │                           │
│                    │  (Merge batch + real) │                           │
│                    └───────────────────────┘                           │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

HOW IT WORKS:
1. BATCH LAYER: Processes all historical data, produces accurate views
   - Runs periodically (hourly/daily)
   - Overwrites previous batch views
   - Source of truth for accuracy

2. SPEED LAYER: Processes only recent data in real-time
   - Fills the gap since last batch
   - Approximate but current
   - Discarded when batch catches up

3. SERVING LAYER: Merges both at query time
   - Batch view (yesterday's totals) + Speed view (today so far)
   - Query: batch_result + realtime_result

2.2 Lambda Implementation

# architectures/lambda_architecture.py

"""
Lambda Architecture implementation.

Combines batch accuracy with streaming speed.
"""

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from abc import ABC, abstractmethod


@dataclass
class MetricResult:
    """Result from either batch or speed layer."""
    value: float
    timestamp: datetime
    is_batch: bool
    coverage_start: datetime
    coverage_end: datetime


class BatchLayer:
    """
    Batch processing layer.
    
    Processes all historical data with Spark.
    Runs periodically (e.g., hourly).
    """
    
    def __init__(self, spark_session, data_lake_path: str):
        self.spark = spark_session
        self.data_path = data_lake_path
        self.last_batch_time: Optional[datetime] = None
    
    def run_batch_job(self, end_time: datetime) -> Dict[str, MetricResult]:
        """
        Run batch aggregation up to end_time.
        
        This processes ALL data from the beginning,
        producing the authoritative "batch views".
        """
        
        # Read all raw events from data lake
        events_df = self.spark.read.parquet(
            f"{self.data_path}/events/"
        ).filter(
            f"event_time < '{end_time.isoformat()}'"
        )
        
        # Compute aggregations
        metrics = {}
        
        # Daily revenue
        daily_revenue = events_df.filter(
            "event_type = 'order.placed'"
        ).groupBy(
            "date(event_time) as date"
        ).agg(
            "sum(payload.total_amount) as revenue",
            "count(*) as order_count"
        ).collect()
        
        for row in daily_revenue:
            metrics[f"revenue_{row.date}"] = MetricResult(
                value=row.revenue,
                timestamp=end_time,
                is_batch=True,
                coverage_start=datetime.combine(row.date, datetime.min.time()),
                coverage_end=datetime.combine(row.date, datetime.max.time())
            )
        
        # DAU (Daily Active Users)
        dau = events_df.groupBy(
            "date(event_time) as date"
        ).agg(
            "count(distinct user_id) as dau"
        ).collect()
        
        for row in dau:
            metrics[f"dau_{row.date}"] = MetricResult(
                value=row.dau,
                timestamp=end_time,
                is_batch=True,
                coverage_start=datetime.combine(row.date, datetime.min.time()),
                coverage_end=datetime.combine(row.date, datetime.max.time())
            )
        
        self.last_batch_time = end_time
        
        return metrics
    
    def get_batch_view(self, metric_name: str) -> Optional[MetricResult]:
        """Get the latest batch view for a metric."""
        # In practice, read from batch view storage (e.g., Cassandra, Redis)
        pass


class SpeedLayer:
    """
    Speed/Streaming layer.
    
    Processes only events since last batch.
    Provides real-time updates.
    """
    
    def __init__(self, flink_env, kafka_source: str):
        self.env = flink_env
        self.kafka_source = kafka_source
        self.realtime_state: Dict[str, MetricResult] = {}
    
    def start_streaming(self, batch_watermark: datetime):
        """
        Start streaming from batch watermark.
        
        Only processes events AFTER the last batch completed.
        """
        
        # In Flink, this would be a DataStream job:
        # 
        # stream = env.add_source(KafkaSource(...))
        #     .filter(lambda e: e.event_time > batch_watermark)
        #     .key_by(lambda e: e.event_type)
        #     .window(TumblingEventTimeWindows.of(Time.minutes(1)))
        #     .aggregate(MetricAggregator())
        #     .add_sink(RealtimeViewSink())
        
        pass
    
    def get_realtime_view(
        self,
        metric_name: str,
        since: datetime
    ) -> Optional[MetricResult]:
        """
        Get real-time aggregation since a timestamp.
        
        This fills the gap between last batch and now.
        """
        
        # In practice, query from real-time store (e.g., Druid, Redis)
        return self.realtime_state.get(metric_name)


class ServingLayer:
    """
    Serving layer that merges batch and speed results.
    """
    
    def __init__(self, batch_layer: BatchLayer, speed_layer: SpeedLayer):
        self.batch = batch_layer
        self.speed = speed_layer
    
    def query_metric(self, metric_name: str, date: datetime.date) -> float:
        """
        Query a metric by merging batch and real-time.
        
        Logic:
        1. Get batch view (covers up to last batch time)
        2. Get speed view (covers batch time to now)
        3. Combine appropriately
        """
        
        today = datetime.utcnow().date()
        
        if date < today:
            # Historical date: batch only (complete)
            batch_result = self.batch.get_batch_view(f"{metric_name}_{date}")
            return batch_result.value if batch_result else 0
        
        elif date == today:
            # Today: might need batch + realtime
            batch_result = self.batch.get_batch_view(f"{metric_name}_{date}")
            
            if batch_result:
                # Batch has partial day, add realtime since batch
                realtime_result = self.speed.get_realtime_view(
                    f"{metric_name}_{date}",
                    since=batch_result.coverage_end
                )
                
                batch_value = batch_result.value
                realtime_value = realtime_result.value if realtime_result else 0
                
                return batch_value + realtime_value
            
            else:
                # No batch yet today, realtime only
                realtime_result = self.speed.get_realtime_view(
                    f"{metric_name}_{date}",
                    since=datetime.combine(date, datetime.min.time())
                )
                return realtime_result.value if realtime_result else 0
        
        else:
            # Future date: no data
            return 0


# =============================================================================
# Example Queries
# =============================================================================

LAMBDA_QUERY_EXAMPLES = """
LAMBDA ARCHITECTURE QUERIES

Query: "What's today's revenue?"

┌────────────────────────────────────────────────────────────────────────┐
│                                                                        │
│  Timeline:                                                             │
│  ────────────────────────────────────────────────────────────────────  │
│  00:00        06:00        12:00        Now (14:30)                    │
│    │            │            │            │                            │
│    └────────────┴────────────┘            │                            │
│           Batch View                      │                            │
│         (Ran at 12:00)                    │                            │
│         Revenue: $50,000                  │                            │
│                            │              │                            │
│                            └──────────────┘                            │
│                             Speed View                                 │
│                            (12:00 to Now)                              │
│                            Revenue: $12,000                            │
│                                                                        │
│  Total Revenue = $50,000 + $12,000 = $62,000                           │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Query: "What was yesterday's revenue?"

┌────────────────────────────────────────────────────────────────────────┐
│                                                                        │
│  Batch View for Yesterday: $150,000 (complete, authoritative)          │
│                                                                        │
│  Speed View: Not needed (batch has full day)                           │
│                                                                        │
│  Total Revenue = $150,000                                              │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘
"""

2.3 Lambda Pros and Cons

LAMBDA ARCHITECTURE: TRADE-OFFS

PROS:
├── Accuracy: Batch layer is source of truth
├── Latency: Speed layer provides real-time
├── Fault tolerance: Batch can recompute everything
├── Late data: Handled in next batch run
└── Mature: Well-understood, battle-tested

CONS:
├── Complexity: Two codebases (batch + streaming)
├── Consistency: Different results from same logic
├── Maintenance: Two systems to maintain and debug
├── Divergence: Batch and streaming logic can drift
└── Cost: Running two processing systems

THE BIG PROBLEM:
"Now you have two codebases doing the same thing."
- Write aggregation in Spark (batch)
- Write same aggregation in Flink (stream)
- Hope they produce the same results
- They often don't.

Chapter 3: Kappa Architecture

3.1 Streaming Only

KAPPA ARCHITECTURE

Jay Kreps' simplification (2014):
"Everything is a stream. Replay for reprocessing."

┌────────────────────────────────────────────────────────────────────────┐
│                       KAPPA ARCHITECTURE                               │
│                                                                        │
│                    ┌──────────────────────┐                            │
│                    │   Immutable Event    │                            │
│                    │        Log           │                            │
│                    │      (Kafka)         │                            │
│                    │                      │                            │
│                    │  All events stored   │                            │
│                    │  with long retention │                            │
│                    └──────────┬───────────┘                            │
│                               │                                        │
│                               ▼                                        │
│                    ┌──────────────────────┐                            │
│                    │   Stream Processor   │                            │
│                    │      (Flink)         │                            │
│                    │                      │                            │
│                    │  Single codebase     │                            │
│                    │  for all processing  │                            │
│                    └──────────┬───────────┘                            │
│                               │                                        │
│                               ▼                                        │
│                    ┌──────────────────────┐                            │
│                    │    Serving Layer     │                            │
│                    │   (Druid/ClickHouse) │                            │
│                    └──────────────────────┘                            │
│                                                                        │
│  KEY INSIGHT:                                                          │
│  If you need to reprocess (bug fix, new logic):                        │
│  1. Start new stream processor reading from beginning                  │
│  2. Process all historical events                                      │
│  3. Switch to new output when caught up                                │
│  4. Delete old output                                                  │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

REPROCESSING IN KAPPA:

V1 (current)                    V2 (new logic)
────────────                    ──────────────
Processing live  ───────────▶   Start from beginning
     │                               │
     │                               │ Catching up...
     │                               │
     ▼                               ▼
┌─────────┐                    ┌─────────┐
│Output V1│                    │Output V2│
└─────────┘                    └─────────┘
     │                               │
     │         ◀──── Switch ─────    │
     │           (when caught up)    │
     ▼                               
┌─────────┐                    
│ Delete  │                    
│   V1    │                    
└─────────┘                    

3.2 Kappa Implementation

# architectures/kappa_architecture.py

"""
Kappa Architecture implementation.

Single streaming codebase for all processing.
Reprocessing via replay from event log.
"""

from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any, Optional, Callable
from enum import Enum
import asyncio


class ProcessorVersion(Enum):
    """Processor version for blue-green deployments."""
    V1 = "v1"
    V2 = "v2"


@dataclass
class ProcessorConfig:
    """Configuration for stream processor."""
    version: ProcessorVersion
    kafka_brokers: list
    input_topic: str
    output_table: str
    consumer_group: str
    start_offset: str = "earliest"  # or "latest" or timestamp


class KappaProcessor:
    """
    Unified stream processor for Kappa architecture.
    
    Key principle: One codebase, used for both
    real-time processing and historical reprocessing.
    """
    
    def __init__(self, config: ProcessorConfig):
        self.config = config
        self.aggregators: Dict[str, Callable] = {}
        self.state: Dict[str, Any] = {}
    
    def register_aggregation(
        self,
        name: str,
        aggregator: Callable
    ):
        """Register an aggregation function."""
        self.aggregators[name] = aggregator
    
    async def process_event(self, event: Dict) -> Dict[str, Any]:
        """
        Process a single event through all aggregators.
        
        This same code handles:
        1. Live events (normal operation)
        2. Historical events (reprocessing)
        """
        
        results = {}
        
        for name, aggregator in self.aggregators.items():
            result = await aggregator(event, self.state)
            if result:
                results[name] = result
        
        return results
    
    async def run(self):
        """
        Main processing loop.
        
        Consumes from Kafka, processes events, outputs to serving layer.
        """
        
        # In practice, use Flink/Kafka Streams
        # This is simplified pseudocode
        
        consumer = self._create_consumer()
        
        async for event in consumer:
            try:
                results = await self.process_event(event)
                
                for metric_name, value in results.items():
                    await self._write_to_serving_layer(
                        metric_name,
                        value,
                        event.get("event_time")
                    )
                    
            except Exception as e:
                await self._handle_error(event, e)
    
    def _create_consumer(self):
        """Create Kafka consumer with appropriate offset."""
        # Start from earliest for reprocessing
        # Start from latest for caught-up live processing
        pass
    
    async def _write_to_serving_layer(
        self,
        metric: str,
        value: Any,
        timestamp: datetime
    ):
        """Write result to serving layer (e.g., Druid, ClickHouse)."""
        pass
    
    async def _handle_error(self, event: Dict, error: Exception):
        """Handle processing error."""
        # Log error, send to DLQ, continue
        pass


class KappaOrchestrator:
    """
    Manages Kappa processor deployments and reprocessing.
    """
    
    def __init__(self):
        self.active_version: Optional[ProcessorVersion] = None
        self.processors: Dict[ProcessorVersion, KappaProcessor] = {}
    
    async def deploy_new_version(
        self,
        new_version: ProcessorVersion,
        processor: KappaProcessor
    ):
        """
        Deploy new processor version with reprocessing.
        
        Steps:
        1. Start new processor from beginning of log
        2. Wait for it to catch up to live
        3. Switch traffic to new version
        4. Stop old version
        """
        
        # Start new processor (reads from earliest)
        self.processors[new_version] = processor
        asyncio.create_task(processor.run())
        
        # Wait for catch-up
        await self._wait_for_catchup(processor)
        
        # Atomic switch
        old_version = self.active_version
        self.active_version = new_version
        
        # Stop old processor
        if old_version:
            await self._stop_processor(old_version)
    
    async def _wait_for_catchup(self, processor: KappaProcessor):
        """Wait for processor to catch up to live data."""
        
        while True:
            lag = await self._get_consumer_lag(processor)
            
            if lag < 1000:  # Less than 1000 events behind
                break
            
            await asyncio.sleep(10)
    
    async def _get_consumer_lag(self, processor: KappaProcessor) -> int:
        """Get consumer lag (how far behind live)."""
        # Query Kafka for consumer group lag
        pass
    
    async def _stop_processor(self, version: ProcessorVersion):
        """Gracefully stop a processor version."""
        pass


# =============================================================================
# Aggregation Functions
# =============================================================================

async def revenue_aggregator(
    event: Dict,
    state: Dict
) -> Optional[Dict]:
    """
    Aggregate revenue by day.
    
    Same code works for:
    - Live events: Updates today's revenue
    - Reprocessing: Recomputes historical revenue
    """
    
    if event.get("event_type") != "order.placed":
        return None
    
    event_time = datetime.fromisoformat(event["event_time"])
    date_key = event_time.strftime("%Y-%m-%d")
    amount = event.get("payload", {}).get("total_amount", 0)
    
    # Update state
    if date_key not in state:
        state[date_key] = {"revenue": 0, "orders": 0}
    
    state[date_key]["revenue"] += amount
    state[date_key]["orders"] += 1
    
    return {
        "date": date_key,
        "revenue": state[date_key]["revenue"],
        "orders": state[date_key]["orders"]
    }


async def active_users_aggregator(
    event: Dict,
    state: Dict
) -> Optional[Dict]:
    """Aggregate unique active users by day."""
    
    event_time = datetime.fromisoformat(event["event_time"])
    date_key = event_time.strftime("%Y-%m-%d")
    user_id = event.get("user_id")
    
    if not user_id:
        return None
    
    # Track unique users
    users_key = f"users_{date_key}"
    if users_key not in state:
        state[users_key] = set()
    
    state[users_key].add(user_id)
    
    return {
        "date": date_key,
        "dau": len(state[users_key])
    }

3.3 Kappa Pros and Cons

KAPPA ARCHITECTURE: TRADE-OFFS

PROS:
├── Simplicity: One codebase for everything
├── Consistency: Same logic, same results
├── Maintenance: One system to maintain
├── Flexibility: Easy to reprocess with new logic
└── Modern: Fits streaming-first world

CONS:
├── Reprocessing time: Slow for large history
├── Storage: Must keep all events in Kafka
├── Ordering: Must handle out-of-order carefully
├── State: Managing large state is complex
└── Maturity: Fewer battle-tested examples

THE BIG QUESTION:
"Can you reprocess years of data fast enough?"

If you have:
- 1 year of data
- 1B events
- Reprocessing at 100K/sec

Reprocessing time: 10,000 seconds ≈ 3 hours

For many systems, this is acceptable.
For some, it's not.

Chapter 4: Choosing Your Architecture

4.1 Decision Framework

LAMBDA VS KAPPA: DECISION FRAMEWORK

┌────────────────────────────────────────────────────────────────────────┐
│                    CHOOSE LAMBDA WHEN:                                 │
│                                                                        │
│  ✓ Historical data is huge (PB scale)                                  │
│  ✓ Batch processing is significantly cheaper                           │
│  ✓ You already have Spark/Hadoop infrastructure                        │
│  ✓ Complex ML training that's inherently batch                         │
│  ✓ Regulatory requirements for batch reconciliation                    │
│  ✓ Team is more experienced with batch                                 │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────────────┐
│                    CHOOSE KAPPA WHEN:                                  │
│                                                                        │
│  ✓ Real-time is primary use case                                       │
│  ✓ Data volume allows reasonable reprocessing                          │
│  ✓ Simplicity is valued over optimization                              │
│  ✓ Team is experienced with streaming                                  │
│  ✓ Starting fresh (no legacy batch)                                    │
│  ✓ Events naturally fit streaming model                                │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────────────┐
│                    CONSIDER HYBRID WHEN:                               │
│                                                                        │
│  ✓ Different use cases need different approaches                       │
│  ✓ Some metrics need exact accuracy (batch)                            │
│  ✓ Some metrics need real-time (stream)                                │
│  ✓ Can afford complexity for optimization                              │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

4.2 Modern Reality: The Unified Approach

THE MODERN TREND: UNIFIED BATCH + STREAM

Tools that unify both:
├── Apache Beam: Write once, run on Flink or Spark
├── Apache Flink: Batch is just bounded streaming
├── Databricks: Delta Lake with streaming + batch
├── Snowflake: Snowpipe (stream) + batch loading

THE IDEA:
Write your processing logic ONCE.
The engine runs it as stream or batch as needed.

Example with Apache Beam:

    # Same code for batch and stream
    events = pipeline | ReadFromKafka(topic)  # or ReadFromFiles()
    
    revenue = (
        events
        | Filter(lambda e: e.type == 'order.placed')
        | WindowInto(FixedWindows(1 hour))
        | CombineGlobally(sum_revenue)
    )
    
    revenue | WriteToBigQuery(table)

Run as streaming: continuous processing
Run as batch: process all files, terminate

This is where the industry is heading.

Part II: Implementation

# streaming/flink_processor.py

"""
Apache Flink stream processing implementation.

Flink concepts:
- DataStream: Unbounded stream of records
- Operators: Transformations (map, filter, keyBy, window)
- State: Per-key state for aggregations
- Checkpoints: Fault tolerance via periodic snapshots
- Watermarks: Tracking event time progress
"""

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, Iterator
from abc import ABC, abstractmethod


# =============================================================================
# Core Abstractions (Simplified Flink-like API)
# =============================================================================

@dataclass
class Event:
    """An event in the stream."""
    event_id: str
    event_type: str
    event_time: datetime
    user_id: Optional[str]
    payload: Dict[str, Any]


class DataStream:
    """
    Represents an unbounded stream of events.
    """
    
    def __init__(self, source: Iterator[Event]):
        self._source = source
        self._transformations = []
    
    def filter(self, predicate) -> 'DataStream':
        """Filter events based on predicate."""
        self._transformations.append(('filter', predicate))
        return self
    
    def map(self, mapper) -> 'DataStream':
        """Transform each event."""
        self._transformations.append(('map', mapper))
        return self
    
    def key_by(self, key_selector) -> 'KeyedStream':
        """Partition stream by key for stateful processing."""
        return KeyedStream(self, key_selector)


class KeyedStream:
    """
    Stream partitioned by key.
    
    Enables:
    - Stateful processing per key
    - Windowed aggregations per key
    - Exactly-once per-key guarantees
    """
    
    def __init__(self, stream: DataStream, key_selector):
        self.stream = stream
        self.key_selector = key_selector
    
    def window(self, window_assigner: 'WindowAssigner') -> 'WindowedStream':
        """Apply windowing to the keyed stream."""
        return WindowedStream(self, window_assigner)
    
    def process(self, processor: 'KeyedProcessFunction') -> DataStream:
        """Apply stateful processing function."""
        # Execute processor with per-key state
        pass


class WindowAssigner(ABC):
    """Assigns events to windows."""
    
    @abstractmethod
    def assign_windows(self, event: Event) -> list:
        pass


class TumblingEventTimeWindows(WindowAssigner):
    """
    Fixed-size, non-overlapping windows based on event time.
    
    Example: 1-hour windows
    [00:00-01:00), [01:00-02:00), [02:00-03:00), ...
    """
    
    def __init__(self, size: timedelta):
        self.size = size
    
    def assign_windows(self, event: Event) -> list:
        window_start = self._get_window_start(event.event_time)
        window_end = window_start + self.size
        return [(window_start, window_end)]
    
    def _get_window_start(self, timestamp: datetime) -> datetime:
        """Get the start of the window containing timestamp."""
        epoch = datetime(1970, 1, 1)
        elapsed = (timestamp - epoch).total_seconds()
        window_seconds = self.size.total_seconds()
        window_start_seconds = (elapsed // window_seconds) * window_seconds
        return epoch + timedelta(seconds=window_start_seconds)


class SlidingEventTimeWindows(WindowAssigner):
    """
    Overlapping windows that slide.
    
    Example: 1-hour windows, sliding every 15 minutes
    [00:00-01:00), [00:15-01:15), [00:30-01:30), ...
    """
    
    def __init__(self, size: timedelta, slide: timedelta):
        self.size = size
        self.slide = slide
    
    def assign_windows(self, event: Event) -> list:
        windows = []
        timestamp = event.event_time
        
        # Event belongs to multiple overlapping windows
        earliest_start = self._get_earliest_window_start(timestamp)
        
        start = earliest_start
        while start + self.size > timestamp:
            windows.append((start, start + self.size))
            start = start - self.slide
        
        return windows
    
    def _get_earliest_window_start(self, timestamp: datetime) -> datetime:
        """Get earliest window that contains this timestamp."""
        epoch = datetime(1970, 1, 1)
        elapsed = (timestamp - epoch).total_seconds()
        slide_seconds = self.slide.total_seconds()
        
        # Align to slide boundaries
        aligned = (elapsed // slide_seconds) * slide_seconds
        return epoch + timedelta(seconds=aligned)


class SessionWindows(WindowAssigner):
    """
    Windows based on activity sessions.
    
    A session ends when there's a gap > threshold.
    """
    
    def __init__(self, gap: timedelta):
        self.gap = gap
    
    def assign_windows(self, event: Event) -> list:
        # Sessions are merged dynamically as events arrive
        # Initial window is [event_time, event_time + gap)
        return [(event.event_time, event.event_time + self.gap)]


# =============================================================================
# Windowed Aggregation
# =============================================================================

class WindowedStream:
    """
    Keyed stream with windowing applied.
    """
    
    def __init__(self, keyed_stream: KeyedStream, assigner: WindowAssigner):
        self.keyed_stream = keyed_stream
        self.assigner = assigner
    
    def aggregate(self, aggregator: 'AggregateFunction') -> DataStream:
        """
        Apply aggregation function to each window.
        
        Flink incrementally aggregates as events arrive,
        not waiting for window to complete.
        """
        pass
    
    def reduce(self, reducer) -> DataStream:
        """Reduce events in window to single value."""
        pass


class AggregateFunction(ABC):
    """
    Incremental aggregation function.
    
    Maintains an accumulator that's updated per event.
    """
    
    @abstractmethod
    def create_accumulator(self) -> Any:
        """Create initial accumulator."""
        pass
    
    @abstractmethod
    def add(self, value: Any, accumulator: Any) -> Any:
        """Add value to accumulator."""
        pass
    
    @abstractmethod
    def get_result(self, accumulator: Any) -> Any:
        """Get result from accumulator."""
        pass
    
    @abstractmethod
    def merge(self, acc1: Any, acc2: Any) -> Any:
        """Merge two accumulators (for session windows)."""
        pass


class RevenueAggregator(AggregateFunction):
    """Aggregate revenue in a window."""
    
    def create_accumulator(self) -> Dict:
        return {"total": 0, "count": 0}
    
    def add(self, order: Dict, acc: Dict) -> Dict:
        acc["total"] += order.get("amount", 0)
        acc["count"] += 1
        return acc
    
    def get_result(self, acc: Dict) -> Dict:
        return {
            "revenue": acc["total"],
            "orders": acc["count"],
            "avg_order": acc["total"] / acc["count"] if acc["count"] > 0 else 0
        }
    
    def merge(self, acc1: Dict, acc2: Dict) -> Dict:
        return {
            "total": acc1["total"] + acc2["total"],
            "count": acc1["count"] + acc2["count"]
        }

5.2 Watermarks and Late Data

# streaming/watermarks.py

"""
Watermarks for handling event time in streaming.

The Problem:
Events arrive out of order. When can we close a window?

Example:
  Real time    →  10:00  10:01  10:02  10:03  10:04
  Events arrive:  [A-9:58] [B-9:55] [C-10:01] [D-9:59] [E-10:02]
  
  When can we emit the [9:00-10:00) window?
  We might receive a 9:xx event at any time!

Solution: WATERMARKS
  "I believe all events with time <= W have arrived"
  When watermark passes window end, window can fire.
"""

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional


@dataclass
class Watermark:
    """
    Represents progress of event time.
    
    Watermark(t) means: "No more events with event_time <= t expected"
    """
    timestamp: datetime


class WatermarkStrategy:
    """
    Strategy for generating watermarks.
    """
    
    def __init__(self, max_out_of_orderness: timedelta):
        """
        Args:
            max_out_of_orderness: How late events can be.
                If events can be up to 5 minutes late,
                watermark = max_event_time - 5 minutes
        """
        self.max_out_of_orderness = max_out_of_orderness
        self.current_max_timestamp: Optional[datetime] = None
    
    def on_event(self, event_time: datetime) -> Optional[Watermark]:
        """Update watermark based on new event."""
        
        if self.current_max_timestamp is None:
            self.current_max_timestamp = event_time
        else:
            self.current_max_timestamp = max(
                self.current_max_timestamp,
                event_time
            )
        
        # Watermark = max seen - allowed lateness
        watermark_time = self.current_max_timestamp - self.max_out_of_orderness
        
        return Watermark(watermark_time)
    
    def get_current_watermark(self) -> Optional[Watermark]:
        """Get current watermark."""
        if self.current_max_timestamp:
            return Watermark(
                self.current_max_timestamp - self.max_out_of_orderness
            )
        return None


class LateDataHandler:
    """
    Handles events that arrive after their window closed.
    """
    
    def __init__(
        self,
        allowed_lateness: timedelta,
        late_output_tag: str = "late-data"
    ):
        """
        Args:
            allowed_lateness: How long to keep window state after closing
            late_output_tag: Tag for side output of late events
        """
        self.allowed_lateness = allowed_lateness
        self.late_output_tag = late_output_tag
    
    def is_late(
        self,
        event_time: datetime,
        watermark: datetime,
        window_end: datetime
    ) -> bool:
        """Check if event is late for its window."""
        
        # Window hasn't closed yet
        if watermark < window_end:
            return False
        
        # Window closed, but within allowed lateness
        if watermark < window_end + self.allowed_lateness:
            return False  # Can still update window
        
        # Too late, drop or side output
        return True


WATERMARK_EXPLANATION = """
WATERMARKS VISUALIZED

Events arriving (real time on X, event time shown):

Real time:   ────────────────────────────────────▶
             10:00  10:01  10:02  10:03  10:04  10:05

Events:        A      B      C      D      E      F
Event times: 9:58   9:55  10:01   9:59  10:02  10:04

Max lateness: 5 minutes

Watermarks:
  After A: WM = 9:58 - 5min = 9:53
  After B: WM = 9:58 - 5min = 9:53 (B is old, doesn't advance)
  After C: WM = 10:01 - 5min = 9:56
  After D: WM = 10:01 - 5min = 9:56 (D is old)
  After E: WM = 10:02 - 5min = 9:57
  After F: WM = 10:04 - 5min = 9:59

Window [9:00-10:00) can fire when watermark >= 10:00
This happens... never in this example!

Only when we see an event with time >= 10:05 will the
watermark reach 10:00 and the window fire.


LATE DATA:

If window [9:00-10:00) already fired, and then event
with time 9:50 arrives, it's LATE.

Options:
1. DROP: Ignore it (simplest, loses data)
2. SIDE OUTPUT: Send to separate stream for special handling
3. REFIRE: Update window output (complex, not always supported)
4. ALLOWED LATENESS: Keep window state longer
"""

5.3 State Management

# streaming/state_management.py

"""
State management in stream processing.

State is the memory of past events needed for current processing.

Examples:
- Running sum: Need previous sum
- Distinct count: Need set of seen IDs
- Session windows: Need events in current session
- Joins: Need buffer of events from both streams
"""

from dataclasses import dataclass
from typing import Dict, Any, Optional, Generic, TypeVar
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import pickle


T = TypeVar('T')


class StateBackend(ABC):
    """
    Backend for storing stream processing state.
    
    Options:
    - Memory: Fast, lost on failure
    - RocksDB: Larger-than-memory, persistent locally
    - Remote (Redis, etc.): Shared across nodes
    """
    
    @abstractmethod
    def get(self, key: str) -> Optional[Any]:
        pass
    
    @abstractmethod
    def put(self, key: str, value: Any):
        pass
    
    @abstractmethod
    def delete(self, key: str):
        pass
    
    @abstractmethod
    def checkpoint(self) -> bytes:
        """Serialize all state for checkpointing."""
        pass
    
    @abstractmethod
    def restore(self, data: bytes):
        """Restore state from checkpoint."""
        pass


class InMemoryStateBackend(StateBackend):
    """Simple in-memory state backend."""
    
    def __init__(self):
        self._state: Dict[str, Any] = {}
    
    def get(self, key: str) -> Optional[Any]:
        return self._state.get(key)
    
    def put(self, key: str, value: Any):
        self._state[key] = value
    
    def delete(self, key: str):
        self._state.pop(key, None)
    
    def checkpoint(self) -> bytes:
        return pickle.dumps(self._state)
    
    def restore(self, data: bytes):
        self._state = pickle.loads(data)


class RocksDBStateBackend(StateBackend):
    """
    RocksDB-backed state for larger-than-memory state.
    
    Used in production Flink deployments.
    """
    
    def __init__(self, db_path: str):
        # In practice: import rocksdb; self.db = rocksdb.DB(db_path)
        self.db_path = db_path
        self._db = {}  # Simplified
    
    def get(self, key: str) -> Optional[Any]:
        value_bytes = self._db.get(key.encode())
        if value_bytes:
            return pickle.loads(value_bytes)
        return None
    
    def put(self, key: str, value: Any):
        self._db[key.encode()] = pickle.dumps(value)
    
    def delete(self, key: str):
        self._db.pop(key.encode(), None)
    
    def checkpoint(self) -> bytes:
        # In practice: Use RocksDB snapshots
        return pickle.dumps(dict(self._db))
    
    def restore(self, data: bytes):
        self._db = pickle.loads(data)


class ValueState(Generic[T]):
    """
    State holding a single value per key.
    
    Example: Current sum for a user
    """
    
    def __init__(self, backend: StateBackend, name: str, key: str):
        self.backend = backend
        self.state_key = f"{name}:{key}"
    
    def value(self) -> Optional[T]:
        return self.backend.get(self.state_key)
    
    def update(self, value: T):
        self.backend.put(self.state_key, value)
    
    def clear(self):
        self.backend.delete(self.state_key)


class ListState(Generic[T]):
    """
    State holding a list of values per key.
    
    Example: All events in a session
    """
    
    def __init__(self, backend: StateBackend, name: str, key: str):
        self.backend = backend
        self.state_key = f"{name}:{key}"
    
    def get(self) -> list:
        return self.backend.get(self.state_key) or []
    
    def add(self, value: T):
        current = self.get()
        current.append(value)
        self.backend.put(self.state_key, current)
    
    def clear(self):
        self.backend.delete(self.state_key)


class MapState(Generic[T]):
    """
    State holding a map of values per key.
    
    Example: Count per category for a user
    """
    
    def __init__(self, backend: StateBackend, name: str, key: str):
        self.backend = backend
        self.state_key = f"{name}:{key}"
    
    def get(self, map_key: str) -> Optional[T]:
        current = self.backend.get(self.state_key) or {}
        return current.get(map_key)
    
    def put(self, map_key: str, value: T):
        current = self.backend.get(self.state_key) or {}
        current[map_key] = value
        self.backend.put(self.state_key, current)
    
    def keys(self) -> list:
        current = self.backend.get(self.state_key) or {}
        return list(current.keys())
    
    def clear(self):
        self.backend.delete(self.state_key)


# =============================================================================
# Stateful Processing Example
# =============================================================================

class UserSessionProcessor:
    """
    Stateful processor that tracks user sessions.
    
    A session ends after 30 minutes of inactivity.
    """
    
    SESSION_TIMEOUT = timedelta(minutes=30)
    
    def __init__(self, state_backend: StateBackend):
        self.backend = state_backend
    
    def process_event(self, user_id: str, event: Dict) -> Optional[Dict]:
        """
        Process event, maintain session state, emit session on close.
        """
        
        # Get session state for this user
        session_state = ValueState[Dict](self.backend, "session", user_id)
        timer_state = ValueState[datetime](self.backend, "timer", user_id)
        
        event_time = datetime.fromisoformat(event["event_time"])
        current_session = session_state.value()
        
        result = None
        
        if current_session is None:
            # New session
            current_session = {
                "user_id": user_id,
                "start_time": event_time.isoformat(),
                "events": [],
                "event_count": 0
            }
        
        else:
            # Check if session timed out
            last_timer = timer_state.value()
            if last_timer and event_time > last_timer:
                # Session closed, emit it
                result = {
                    "type": "session_complete",
                    "session": current_session
                }
                
                # Start new session
                current_session = {
                    "user_id": user_id,
                    "start_time": event_time.isoformat(),
                    "events": [],
                    "event_count": 0
                }
        
        # Add event to session
        current_session["events"].append(event)
        current_session["event_count"] += 1
        current_session["last_event_time"] = event_time.isoformat()
        
        # Update state
        session_state.update(current_session)
        timer_state.update(event_time + self.SESSION_TIMEOUT)
        
        return result

Chapter 6: Batch Processing with Spark

6.1 Spark for Analytics

# batch/spark_processor.py

"""
Apache Spark batch processing for analytics.

Spark concepts:
- DataFrame: Distributed table of data
- Transformations: Lazy operations (filter, map, join)
- Actions: Trigger computation (collect, write)
- Partitioning: How data is distributed
- Caching: Keep intermediate results in memory
"""

from dataclasses import dataclass
from datetime import datetime, date, timedelta
from typing import Dict, Any, List, Optional


@dataclass
class SparkConfig:
    """Spark application configuration."""
    app_name: str
    master: str = "local[*]"  # or "yarn", "k8s://..."
    executor_memory: str = "4g"
    executor_cores: int = 4
    num_executors: int = 10


class AnalyticsBatchJob:
    """
    Batch analytics job using Spark.
    
    Processes historical data to compute accurate metrics.
    """
    
    def __init__(self, spark_session):
        self.spark = spark_session
    
    def compute_daily_metrics(
        self,
        events_path: str,
        output_path: str,
        target_date: date
    ) -> Dict[str, Any]:
        """
        Compute all daily metrics for a specific date.
        
        This is the batch layer of Lambda architecture.
        """
        
        # Read events for the day
        events = self.spark.read.parquet(events_path).filter(
            f"date(event_time) = '{target_date}'"
        )
        
        # Cache for multiple aggregations
        events.cache()
        
        metrics = {}
        
        # Revenue metrics
        revenue_df = events.filter(
            "event_type = 'order.placed'"
        ).agg({
            "payload.total_amount": "sum",
            "*": "count"
        }).collect()[0]
        
        metrics["revenue"] = revenue_df[0] or 0
        metrics["order_count"] = revenue_df[1] or 0
        
        # User metrics
        user_metrics = events.agg({
            "user_id": "count_distinct"
        }).collect()[0]
        
        metrics["dau"] = user_metrics[0] or 0
        
        # Funnel metrics
        metrics["funnel"] = self._compute_funnel(events)
        
        # Product metrics
        metrics["top_products"] = self._compute_top_products(events)
        
        # Write results
        self._write_metrics(output_path, target_date, metrics)
        
        return metrics
    
    def _compute_funnel(self, events) -> Dict[str, int]:
        """Compute conversion funnel."""
        
        funnel = {}
        
        # View → Cart → Purchase funnel
        funnel["product_views"] = events.filter(
            "event_type = 'product.viewed'"
        ).select("user_id").distinct().count()
        
        funnel["add_to_cart"] = events.filter(
            "event_type = 'cart.updated' AND payload.action = 'add'"
        ).select("user_id").distinct().count()
        
        funnel["purchases"] = events.filter(
            "event_type = 'order.placed'"
        ).select("user_id").distinct().count()
        
        # Conversion rates
        if funnel["product_views"] > 0:
            funnel["view_to_cart_rate"] = (
                funnel["add_to_cart"] / funnel["product_views"]
            )
            funnel["view_to_purchase_rate"] = (
                funnel["purchases"] / funnel["product_views"]
            )
        
        if funnel["add_to_cart"] > 0:
            funnel["cart_to_purchase_rate"] = (
                funnel["purchases"] / funnel["add_to_cart"]
            )
        
        return funnel
    
    def _compute_top_products(self, events, limit: int = 100) -> List[Dict]:
        """Compute top products by various metrics."""
        
        product_stats = events.filter(
            "event_type IN ('product.viewed', 'order.placed')"
        ).groupBy(
            "payload.product_id"
        ).agg({
            "*": "count",
            "payload.total_amount": "sum"
        }).orderBy(
            "count(*)", ascending=False
        ).limit(limit).collect()
        
        return [
            {
                "product_id": row["product_id"],
                "view_count": row["count(*)"],
                "revenue": row["sum(total_amount)"] or 0
            }
            for row in product_stats
        ]
    
    def _write_metrics(
        self,
        output_path: str,
        target_date: date,
        metrics: Dict
    ):
        """Write metrics to output storage."""
        
        # Write to partitioned parquet
        self.spark.createDataFrame([{
            "date": target_date.isoformat(),
            "metrics": metrics,
            "computed_at": datetime.utcnow().isoformat()
        }]).write.mode("overwrite").parquet(
            f"{output_path}/date={target_date}"
        )


class IncrementalBatchJob:
    """
    Incremental batch job that processes only new data.
    
    More efficient than full recomputation.
    """
    
    def __init__(self, spark_session):
        self.spark = spark_session
    
    def process_incremental(
        self,
        events_path: str,
        state_path: str,
        output_path: str,
        watermark: datetime
    ):
        """
        Process only events since last watermark.
        
        Merge with existing state for complete picture.
        """
        
        # Read new events only
        new_events = self.spark.read.parquet(events_path).filter(
            f"event_time > '{watermark.isoformat()}'"
        )
        
        # Read existing aggregates
        existing = self.spark.read.parquet(state_path)
        
        # Compute incremental aggregates
        incremental = new_events.groupBy("date(event_time) as date").agg({
            "payload.total_amount": "sum",
            "*": "count"
        })
        
        # Merge: Add incremental to existing
        merged = existing.unionByName(incremental).groupBy("date").agg({
            "sum(total_amount)": "sum",
            "count": "sum"
        })
        
        # Write updated state
        merged.write.mode("overwrite").parquet(state_path)
        
        # Also write to serving layer
        merged.write.mode("overwrite").parquet(output_path)

6.2 Spark Streaming

# batch/spark_streaming.py

"""
Spark Structured Streaming for unified batch + stream.

Structured Streaming treats streams as unbounded tables,
using the same DataFrame API as batch.
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


class UnifiedAnalyticsProcessor:
    """
    Unified processor that works for both batch and streaming.
    
    Same code, different execution modes.
    """
    
    def __init__(self, spark: SparkSession):
        self.spark = spark
    
    def create_streaming_source(self, kafka_brokers: str, topic: str):
        """Create streaming DataFrame from Kafka."""
        
        return self.spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", kafka_brokers) \
            .option("subscribe", topic) \
            .option("startingOffsets", "latest") \
            .load() \
            .select(
                from_json(
                    col("value").cast("string"),
                    self._get_event_schema()
                ).alias("event")
            ).select("event.*")
    
    def create_batch_source(self, path: str):
        """Create batch DataFrame from files."""
        
        return self.spark.read.parquet(path)
    
    def compute_revenue_metrics(self, events_df):
        """
        Compute revenue metrics.
        
        Works identically for batch and streaming DataFrames!
        """
        
        return events_df \
            .filter(col("event_type") == "order.placed") \
            .withWatermark("event_time", "1 hour") \
            .groupBy(
                window("event_time", "1 hour"),
                "payload.category"
            ) \
            .agg(
                sum("payload.total_amount").alias("revenue"),
                count("*").alias("order_count"),
                avg("payload.total_amount").alias("avg_order_value")
            )
    
    def write_streaming(self, df, output_path: str, checkpoint_path: str):
        """Write streaming DataFrame with checkpointing."""
        
        return df.writeStream \
            .format("parquet") \
            .option("path", output_path) \
            .option("checkpointLocation", checkpoint_path) \
            .outputMode("append") \
            .trigger(processingTime="1 minute") \
            .start()
    
    def write_batch(self, df, output_path: str):
        """Write batch DataFrame."""
        
        df.write \
            .mode("overwrite") \
            .partitionBy("date") \
            .parquet(output_path)
    
    def _get_event_schema(self):
        """Define event schema."""
        
        return StructType([
            StructField("event_id", StringType()),
            StructField("event_type", StringType()),
            StructField("event_time", TimestampType()),
            StructField("user_id", StringType()),
            StructField("payload", MapType(StringType(), StringType()))
        ])


# =============================================================================
# Usage Examples
# =============================================================================

USAGE_EXAMPLES = """
UNIFIED BATCH + STREAMING

# Same processing logic for both!

processor = UnifiedAnalyticsProcessor(spark)

# STREAMING MODE
stream_df = processor.create_streaming_source("kafka:9092", "events")
revenue = processor.compute_revenue_metrics(stream_df)
query = processor.write_streaming(revenue, "/output/stream", "/checkpoint")

# BATCH MODE  
batch_df = processor.create_batch_source("/data/events")
revenue = processor.compute_revenue_metrics(batch_df)
processor.write_batch(revenue, "/output/batch")


THE POWER:
- Same aggregation code
- Same business logic
- Different execution modes
- Consistent results
"""

Part III: Real-World Application

Chapter 7: Case Studies

7.1 Netflix: Lambda to Kappa Transition

NETFLIX ANALYTICS EVOLUTION

BEFORE (Lambda-ish):
├── Batch: Spark on HDFS, daily jobs
├── Streaming: Flink for real-time
├── Problem: Two codebases, divergent results
└── Pain: "Why is real-time different from batch?"

AFTER (Kappa-ish):
├── Single: Flink for everything
├── Kafka: Long retention for replay
├── Result: One codebase, consistent
└── Reprocess: Replay from Kafka

KEY LEARNINGS:
1. Unified codebase reduced bugs by 60%
2. Onboarding time cut in half
3. Schema changes deploy once, not twice
4. Cost slightly higher but worth simplicity

7.2 Uber: Lambda at Scale

UBER ANALYTICS ARCHITECTURE

They kept Lambda because:
├── Historical data: Petabytes
├── Reprocessing: Would take days in Flink
├── ML training: Inherently batch
└── Cost: Batch on spot instances is cheaper

Their approach:
├── Streaming: Apache Flink for real-time
├── Batch: Apache Spark on YARN
├── Unified: Apache Beam for some jobs
├── Storage: Apache Hudi (streaming to batch-readable)

KEY INSIGHT:
"We use Lambda not because we want to,
but because the alternatives are worse at our scale."

At Uber's scale (trillions of events):
├── Kappa reprocessing: 1+ week
├── Lambda batch: 6 hours daily
└── Decision: Lambda wins on practicality

Chapter 8: Common Mistakes

8.1 Streaming Mistakes

STREAMING ANTI-PATTERNS

❌ MISTAKE 1: Ignoring Late Data

Wrong:
  # Just drop late events
  events.filter("event_time > watermark")

Problem:
  Mobile events routinely arrive late
  Dropping them = wrong metrics

Right:
  events
    .withWatermark("event_time", "1 hour")
    .withLateDataOutputTag("late-events")
  # Process late events in separate path


❌ MISTAKE 2: Unbounded State

Wrong:
  # Keep all sessions forever
  state[user_id] = session

Problem:
  State grows forever
  Eventually OOM

Right:
  # Use state TTL
  state.set_ttl(timedelta(days=7))
  
  # Or use timers to clean up
  timer.register_cleanup(session_end + timedelta(hours=1))


❌ MISTAKE 3: Not Checkpointing

Wrong:
  # No checkpoints configured
  stream.process(handler)

Problem:
  Crash = start from scratch
  Data loss or reprocessing

Right:
  stream
    .enable_checkpointing(interval=60_000)  # Every minute
    .checkpoint_storage("s3://checkpoints/")

8.2 Batch Mistakes

BATCH ANTI-PATTERNS

❌ MISTAKE 1: No Idempotency

Wrong:
  # Append to output on every run
  df.write.mode("append").parquet(output)
  
  # Re-run = duplicates!

Problem:
  Job fails mid-write, retry creates duplicates

Right:
  # Overwrite partitions atomically
  df.write.mode("overwrite").partitionBy("date").parquet(output)
  
  # Or use exactly-once with Delta Lake
  df.write.format("delta").mode("merge").save(output)


❌ MISTAKE 2: Ignoring Skew

Wrong:
  # Group by a skewed key
  events.groupBy("celebrity_user_id").count()
  
  # One executor gets 90% of data!

Problem:
  One task takes hours, others finish in minutes

Right:
  # Salt the key
  events
    .withColumn("salted_key", concat("user_id", lit("_"), rand() % 10))
    .groupBy("salted_key")
    .count()
    .groupBy("user_id")  # Aggregate salts
    .sum("count")

Part IV: Interview Preparation

Chapter 9: Interview Tips

9.1 When to Discuss Stream vs Batch

TOPIC TRIGGERS:

"Design a real-time dashboard"
  → Primarily streaming
  → Discuss: windowing, approximate vs exact

"Design an analytics platform"
  → Both streaming and batch
  → Discuss: Lambda vs Kappa trade-offs

"Design a recommendation system"
  → ML training is batch
  → Serving might use streaming for freshness

"Design fraud detection"
  → Streaming for real-time alerts
  → Batch for model training

"Design a data warehouse"
  → Primarily batch
  → Maybe streaming for near-real-time

9.2 Key Phrases

TALKING ABOUT STREAM VS BATCH:

"For this use case, I'd consider whether we need real-time 
results or if hourly/daily is acceptable. Real-time adds 
complexity with state management, exactly-once, and late data."

"I'd use a Lambda architecture here because we have both 
real-time needs and accuracy requirements. The batch layer 
handles late data and corrections, while streaming provides 
immediate visibility."

"For this volume, I'd lean toward Kappa since reprocessing 
100GB from Kafka is fast enough. The single codebase is 
worth more than the slight efficiency of batch-only."

"The key trade-off is latency vs accuracy vs cost. 
Streaming gives seconds of latency but higher cost. 
Batch gives hours of latency but lower cost and easier 
handling of corrections."

Chapter 10: Practice Problems

Problem 1: Real-Time Leaderboard

Setup: Design a gaming leaderboard that shows:

  • Top 100 players globally (updates every second)
  • Your rank (updates every action)
  • Historical rankings (end of day snapshots)

Questions:

  1. Stream, batch, or both? Why?
  2. How do you handle millions of score updates/second?
  3. How do you efficiently compute "your rank" among millions?
  • Streaming for real-time updates
  • Redis sorted sets for O(log N) rank lookup
  • Batch for historical snapshots
  • Sample or approximate for global leaderboard

Problem 2: Ad Attribution

Setup: Track which ad led to a purchase:

  • User sees ad at time T1
  • User purchases at time T2 (within 7 days)
  • Attribute purchase to ad

Questions:

  1. How do you join ad views with purchases?
  2. What if the purchase event arrives before the ad view?
  3. Streaming or batch for attribution?
  • Stream-stream join with state
  • Watermarks to handle out-of-order
  • Buffering needed for 7-day window
  • Maybe batch for accuracy, stream for estimates

Chapter 11: Sample Interview Dialogue

Interviewer: "Design an analytics system for our e-commerce platform."

You: "Let me start by understanding the requirements. What types of analytics are needed, and what's the latency requirement?"

Interviewer: "We need real-time revenue dashboards for executives, and daily reports for finance. Also, conversion funnel analysis."

You: "Got it. This is a classic case where we need both real-time and batch processing. Let me propose an architecture.

For the streaming layer, I'd use Flink consuming from Kafka. It would compute real-time aggregates like current revenue, active users, and approximate conversion funnels. These feed into Druid for real-time dashboards.

For the batch layer, I'd use Spark running hourly. It would compute exact metrics including complete funnel analysis with proper session handling, accurate revenue with all late events included, and the data that finance needs.

The key insight is that some metrics need real-time approximation, while others need daily accuracy. Revenue for executives can be 'roughly right now' — they don't need penny-perfect accuracy at 2 PM. But the end-of-day report must be exact."

Interviewer: "Why not just use streaming for everything?"

You: "Great question. We could, but there are trade-offs:

First, late data. E-commerce has refunds, corrections, and late mobile events. In streaming, we'd need to handle allowed lateness, retractions, and updates — adding complexity.

Second, cost. Streaming is always-on. Batch can use spot instances and run only when needed.

Third, accuracy vs speed. For the finance report, we want to wait for all late data. With streaming, we'd need to recompute or issue corrections.

However, if simplicity is paramount and the scale is manageable, Kappa with long Kafka retention could work. We could reprocess nightly to handle corrections. It depends on whether the team is more comfortable with Spark or Flink."


Summary

DAY 2 SUMMARY: STREAMING VS BATCH

BATCH PROCESSING
├── Process data in large chunks
├── High throughput, high latency
├── Accurate (all data processed together)
├── Cheaper (periodic, uses spot instances)
└── Simpler programming model

STREAM PROCESSING
├── Process data continuously
├── Lower throughput, low latency
├── Approximate (late data challenges)
├── More expensive (always on)
└── Complex (state, exactly-once, watermarks)

LAMBDA ARCHITECTURE
├── Batch layer + Speed layer + Serving layer
├── Batch for accuracy, streaming for speed
├── Two codebases (main disadvantage)
└── Best when: Huge history, mixed requirements

KAPPA ARCHITECTURE
├── Streaming only + Replay for reprocessing
├── Single codebase
├── Replay must be fast enough
└── Best when: Moderate history, simplicity valued

MODERN TREND
├── Unified frameworks (Beam, Flink, Spark Structured Streaming)
├── Write once, run as batch or stream
└── Industry moving toward unified APIs

Key Takeaways

STREAMING VS BATCH KEY TAKEAWAYS

1. THEY SOLVE DIFFERENT PROBLEMS
   Stream: "What's happening NOW?"
   Batch: "What happened EXACTLY?"

2. LATE DATA IS THE KILLER
   Streaming makes late data hard
   Batch handles it naturally

3. SIMPLICITY HAS VALUE
   Kappa's single codebase often wins
   Unless scale forces Lambda

4. COST MATTERS
   Streaming: Always on = always paying
   Batch: Pay only when running

5. THE INDUSTRY IS CONVERGING
   Unified APIs blur the line
   Same code for both modes

GOLDEN RULES:
├── Start with batch (simpler)
├── Add streaming only when needed
├── Use watermarks for late data
├── Checkpoint everything
└── One codebase when possible

What's Next

Tomorrow, we'll dive into Data Modeling for Analytics — designing schemas that make queries fast over billions of rows.

TOMORROW'S PREVIEW: DATA MODELING

Questions we'll answer:
├── What is a star schema?
├── Fact tables vs dimension tables?
├── How to handle slowly changing dimensions?
├── When to pre-aggregate?
└── Partitioning for analytics performance?

We'll design the data warehouse that our
streaming and batch jobs write to.

End of Week 8, Day 2

Next: Day 3 — Data Modeling for Analytics