Himanshu Kukreja
0%
Day 04

Week 8 — Day 4: Late-Arriving Data and Correctness

System Design Mastery Series — Analytics Pipeline Week


Preface

Yesterday, we built beautiful star schemas with pre-aggregated rollups. Our dashboards are fast. The CFO is happy.

Then Monday morning happens:

THE LATE DATA NIGHTMARE

Friday 6:00 PM: Dashboard shows weekend flash sale revenue
├── Saturday: $1,250,000
├── Sunday: $1,180,000
└── Total: $2,430,000 ✓

Monday 9:00 AM: CFO checks final numbers
├── Saturday: $1,312,000 (+$62,000!)
├── Sunday: $1,245,000 (+$65,000!)
└── Total: $2,557,000 (+$127,000!)

CFO asks: "Why did the numbers change? Which one is right?"

What happened:
├── Mobile app events arrived late (offline purchases)
├── Payment confirmations delayed from processor
├── Timezone issues caused date bucket shifts
├── Batch job picked up events from different windows
└── Manual order entries posted Monday

The dashboard wasn't wrong — it was incomplete.
Late data is the enemy of "real-time" analytics.

Today, we'll learn to handle late-arriving data without losing our minds — or the CFO's trust.


Part I: Foundations

Chapter 1: The Time Problem

1.1 Two Types of Time

EVENT TIME VS PROCESSING TIME

EVENT TIME: When something actually happened
─────────────────────────────────────────
User clicked "Buy" at 2024-01-15 14:30:00 UTC
This is the business-relevant timestamp.

PROCESSING TIME: When we received/processed the event  
─────────────────────────────────────────
Server received the event at 2024-01-15 14:30:05 UTC
This is when our system saw it.

THE GAP:
├── Network latency: milliseconds to seconds
├── Client buffering: seconds to minutes
├── Offline mobile: minutes to days
├── Batch uploads: hours to days
├── Partner data feeds: days to weeks
└── Manual corrections: any time

Example timeline:

Event Time:      14:30:00 ──────────────────────────────────────────────▶
                    │
                    │ User purchases (offline, in subway)
                    │
                    ▼
Processing Time: ──────────────────────── 15:45:00 ──────────────────────▶
                                             │
                                             │ User exits subway,
                                             │ app syncs, event arrives
                                             │ 75 MINUTES LATE!
                                             ▼

Which "hour" does this purchase belong to?
├── Event time: 14:00-15:00 window ← Correct for analytics
├── Processing time: 15:00-16:00 window ← Wrong!

1.2 Why Late Data Happens

SOURCES OF LATE DATA

1. MOBILE APPS (Most Common)
   ├── Offline usage (subway, airplane, rural)
   ├── Battery optimization (batched sends)
   ├── App backgrounded (delayed sync)
   └── Typical lateness: Minutes to hours

2. IOT DEVICES
   ├── Intermittent connectivity
   ├── Local buffering
   ├── Batch uploads
   └── Typical lateness: Minutes to days

3. PARTNER/VENDOR DATA
   ├── Daily batch files
   ├── API rate limits causing delays
   ├── Different time zones
   └── Typical lateness: Hours to days

4. PAYMENT PROCESSORS
   ├── Async confirmation
   ├── Fraud review delays
   ├── Settlement timing
   └── Typical lateness: Minutes to days

5. MANUAL ENTRIES
   ├── Customer service adjustments
   ├── Refunds and chargebacks
   ├── Data corrections
   └── Typical lateness: Days to weeks

6. SYSTEM ISSUES
   ├── Kafka consumer lag
   ├── Processing failures and retries
   ├── Backfill operations
   └── Typical lateness: Variable

1.3 The Impact on Analytics

HOW LATE DATA BREAKS ANALYTICS

SCENARIO: Hourly revenue dashboard

Without late data handling:

Hour    │ Real-time │ Final (next day) │ Difference
────────┼───────────┼──────────────────┼───────────
14:00   │ $50,000   │ $52,500          │ +5%
15:00   │ $48,000   │ $51,200          │ +6.7%
16:00   │ $55,000   │ $57,800          │ +5.1%

Problems:
├── Misleading real-time decisions
├── Reports don't match
├── Trust erodes ("the numbers are always wrong")
├── Alert thresholds trigger incorrectly
└── YoY comparisons are invalid

The business impact:
├── Flash sale looks less successful than it was
├── Staffing decisions based on wrong traffic patterns
├── Marketing ROI appears lower (late conversions)
└── SLAs with partners based on incomplete data

Chapter 2: Watermarks Deep Dive

2.1 What Are Watermarks?

WATERMARKS: TRACKING EVENT TIME PROGRESS

A watermark is a declaration:
"I believe all events with event_time <= W have arrived"

Watermark(T) means:
├── Events with timestamp <= T are (probably) complete
├── Windows ending at T can (probably) be closed
├── Late events (timestamp < T) need special handling

How watermarks progress:

Events arriving (by processing time):
    ───────────────────────────────────────────────────▶
    │         │         │         │         │
    10:01     10:02     10:03     10:04     10:05
    
Event timestamps:
    9:58      9:55      10:01     9:59      10:03

Max timestamp seen:
    9:58      9:58      10:01     10:01     10:03
    
Watermark (max - 5 min allowed lateness):
    9:53      9:53      9:56      9:56      9:58

The watermark advances as we see newer event times,
but stays behind by the "allowed lateness" buffer.

2.2 Watermark Strategies

# late_data/watermark_strategies.py

"""
Watermark generation strategies.

Different strategies for different use cases.
"""

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, Dict
from abc import ABC, abstractmethod


class WatermarkStrategy(ABC):
    """Base class for watermark strategies."""
    
    @abstractmethod
    def on_event(self, event_time: datetime) -> datetime:
        """Update watermark based on new event."""
        pass
    
    @abstractmethod
    def get_watermark(self) -> datetime:
        """Get current watermark."""
        pass


class BoundedOutOfOrdernessWatermark(WatermarkStrategy):
    """
    Most common strategy: Allow fixed lateness.
    
    Watermark = max_event_time - max_lateness
    
    Use when: Lateness is bounded and predictable
    Example: Mobile app with max 1 hour offline
    """
    
    def __init__(self, max_lateness: timedelta):
        self.max_lateness = max_lateness
        self.max_event_time: Optional[datetime] = None
    
    def on_event(self, event_time: datetime) -> datetime:
        if self.max_event_time is None:
            self.max_event_time = event_time
        else:
            self.max_event_time = max(self.max_event_time, event_time)
        
        return self.get_watermark()
    
    def get_watermark(self) -> datetime:
        if self.max_event_time is None:
            return datetime.min
        return self.max_event_time - self.max_lateness


class PerKeyWatermark(WatermarkStrategy):
    """
    Track watermark per key (e.g., per user, per device).
    
    Global watermark = min(all key watermarks)
    
    Use when: Different sources have different lateness
    Example: Some users always online, some often offline
    """
    
    def __init__(self, max_lateness: timedelta, idle_timeout: timedelta):
        self.max_lateness = max_lateness
        self.idle_timeout = idle_timeout
        self.key_watermarks: Dict[str, datetime] = {}
        self.key_last_seen: Dict[str, datetime] = {}
    
    def on_event_for_key(
        self,
        key: str,
        event_time: datetime,
        processing_time: datetime
    ) -> datetime:
        # Update key's max event time
        if key not in self.key_watermarks:
            self.key_watermarks[key] = event_time
        else:
            self.key_watermarks[key] = max(
                self.key_watermarks[key],
                event_time
            )
        
        self.key_last_seen[key] = processing_time
        
        return self.get_watermark()
    
    def get_watermark(self) -> datetime:
        now = datetime.utcnow()
        
        # Filter out idle keys (haven't seen events recently)
        active_watermarks = []
        for key, wm in self.key_watermarks.items():
            last_seen = self.key_last_seen.get(key, now)
            if now - last_seen < self.idle_timeout:
                active_watermarks.append(wm - self.max_lateness)
        
        if not active_watermarks:
            return datetime.min
        
        # Global watermark is minimum of active key watermarks
        return min(active_watermarks)


class PunctuatedWatermark(WatermarkStrategy):
    """
    Watermarks embedded in the data stream.
    
    Use when: Source can guarantee ordering
    Example: Database CDC with transaction IDs
    """
    
    def __init__(self):
        self.current_watermark = datetime.min
    
    def on_event(self, event_time: datetime) -> datetime:
        # Regular events don't advance watermark
        return self.current_watermark
    
    def on_watermark_event(self, watermark_time: datetime) -> datetime:
        # Special watermark events advance the watermark
        self.current_watermark = max(
            self.current_watermark,
            watermark_time
        )
        return self.current_watermark
    
    def get_watermark(self) -> datetime:
        return self.current_watermark


class ProcessingTimeWatermark(WatermarkStrategy):
    """
    Use processing time as watermark (ignore event time).
    
    Use when: Event time doesn't matter or isn't reliable
    Example: Log processing where arrival order is fine
    """
    
    def __init__(self, delay: timedelta = timedelta(seconds=0)):
        self.delay = delay
    
    def on_event(self, event_time: datetime) -> datetime:
        return self.get_watermark()
    
    def get_watermark(self) -> datetime:
        return datetime.utcnow() - self.delay

2.3 Choosing Lateness Tolerance

HOW TO CHOOSE ALLOWED LATENESS

Step 1: Analyze your data
─────────────────────────
Query historical events:

SELECT 
    PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY lateness) as p50,
    PERCENTILE_CONT(0.90) WITHIN GROUP (ORDER BY lateness) as p90,
    PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY lateness) as p99,
    MAX(lateness) as max_lateness
FROM (
    SELECT 
        EXTRACT(EPOCH FROM (received_time - event_time)) as lateness
    FROM events
    WHERE received_time > event_time  -- Exclude clock issues
);

Example results:
├── p50: 2 seconds
├── p90: 30 seconds
├── p99: 5 minutes
├── p999: 30 minutes
└── max: 72 hours (outliers)

Step 2: Decide on trade-offs
────────────────────────────

Allowed Lateness │ Events Captured │ Window Delay │ Memory
────────────────┼─────────────────┼──────────────┼────────
30 seconds      │ ~90%            │ Low          │ Low
5 minutes       │ ~99%            │ Medium       │ Medium
30 minutes      │ ~99.9%          │ High         │ High
1 hour          │ ~99.95%         │ Very High    │ Very High

Step 3: Match to business requirements
──────────────────────────────────────

Real-time fraud detection:
├── Lateness tolerance: 30 seconds
├── Rationale: Speed matters more than completeness
└── Late events: Process but don't undo decisions

Daily revenue reporting:
├── Lateness tolerance: 1 hour
├── Rationale: Accuracy matters, some delay okay
└── Late events: Trigger recomputation

Monthly financial close:
├── Lateness tolerance: 7 days
├── Rationale: Must be accurate
└── Late events: Wait for them

Chapter 3: Windowing and Late Data

3.1 Window Types and Late Events

HOW WINDOWS HANDLE LATE DATA

TUMBLING WINDOWS (Non-overlapping)
──────────────────────────────────

Windows: [00:00-01:00), [01:00-02:00), [02:00-03:00)

Event arrives at processing time 02:30 with event time 00:45
├── Window [00:00-01:00) already fired (watermark passed 01:00)
├── Event is LATE
└── Options: Drop, side output, or trigger update


SLIDING WINDOWS (Overlapping)
─────────────────────────────

Windows: [00:00-01:00), [00:15-01:15), [00:30-01:30)...

Late event may be late for MULTIPLE windows!
├── Event time 00:45, arrives after 01:30
├── Late for: [00:00-01:00), [00:15-01:15), [00:30-01:30)
└── Potentially many windows need updates


SESSION WINDOWS (Activity-based)
────────────────────────────────

Sessions close after inactivity gap.

Late event may:
├── Extend a closed session
├── Merge two sessions that were separate
├── Create a new session in the past
└── Complexity: Very high for late data handling

3.2 Late Data Handling Strategies

# late_data/handling_strategies.py

"""
Strategies for handling late-arriving data.
"""

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Callable
from enum import Enum
import logging

logger = logging.getLogger(__name__)


class LateDataAction(Enum):
    """What to do with late data."""
    DROP = "drop"                    # Ignore late events
    SIDE_OUTPUT = "side_output"      # Route to separate stream
    UPDATE_RESULT = "update_result"  # Update the window result
    RECOMPUTE = "recompute"          # Trigger full recomputation


@dataclass
class LateDataConfig:
    """Configuration for late data handling."""
    allowed_lateness: timedelta
    action: LateDataAction
    side_output_topic: Optional[str] = None
    max_updates_per_window: int = 10


@dataclass
class WindowResult:
    """Result from a window computation."""
    window_start: datetime
    window_end: datetime
    result: Dict[str, Any]
    is_final: bool
    update_count: int = 0
    last_updated: datetime = field(default_factory=datetime.utcnow)


class LateDataHandler:
    """
    Handles late-arriving events in windowed computations.
    """
    
    def __init__(self, config: LateDataConfig):
        self.config = config
        self.window_results: Dict[str, WindowResult] = {}
        self.late_events: List[Dict] = []
        
        # Metrics
        self.on_time_count = 0
        self.late_accepted_count = 0
        self.late_dropped_count = 0
    
    def process_event(
        self,
        event: Dict,
        watermark: datetime,
        aggregate_fn: Callable
    ) -> Optional[Dict]:
        """
        Process an event, handling lateness appropriately.
        """
        
        event_time = datetime.fromisoformat(event["event_time"])
        window_key = self._get_window_key(event_time)
        window_start, window_end = self._parse_window_key(window_key)
        
        # Check if event is late
        is_late = event_time < watermark
        
        if not is_late:
            # On-time event: normal processing
            self.on_time_count += 1
            return self._update_window(window_key, event, aggregate_fn, is_late=False)
        
        # Event is late
        lateness = watermark - event_time
        
        # Check if within allowed lateness
        if lateness <= self.config.allowed_lateness:
            return self._handle_late_within_tolerance(
                window_key, event, aggregate_fn, lateness
            )
        else:
            return self._handle_late_beyond_tolerance(event, lateness)
    
    def _handle_late_within_tolerance(
        self,
        window_key: str,
        event: Dict,
        aggregate_fn: Callable,
        lateness: timedelta
    ) -> Optional[Dict]:
        """Handle event that's late but within tolerance."""
        
        self.late_accepted_count += 1
        
        if self.config.action == LateDataAction.DROP:
            logger.debug(f"Dropping late event (within tolerance): {lateness}")
            return None
        
        elif self.config.action == LateDataAction.SIDE_OUTPUT:
            self.late_events.append({
                "event": event,
                "lateness_seconds": lateness.total_seconds(),
                "action": "side_output"
            })
            return None
        
        elif self.config.action == LateDataAction.UPDATE_RESULT:
            # Check update limit
            existing = self.window_results.get(window_key)
            if existing and existing.update_count >= self.config.max_updates_per_window:
                logger.warning(
                    f"Window {window_key} exceeded max updates, dropping event"
                )
                return None
            
            return self._update_window(window_key, event, aggregate_fn, is_late=True)
        
        return None
    
    def _handle_late_beyond_tolerance(
        self,
        event: Dict,
        lateness: timedelta
    ) -> Optional[Dict]:
        """Handle event that's later than allowed tolerance."""
        
        self.late_dropped_count += 1
        
        logger.info(
            f"Dropping very late event: {lateness} exceeds "
            f"{self.config.allowed_lateness}"
        )
        
        # Always side-output very late events for analysis
        self.late_events.append({
            "event": event,
            "lateness_seconds": lateness.total_seconds(),
            "action": "dropped_beyond_tolerance"
        })
        
        return None
    
    def _update_window(
        self,
        window_key: str,
        event: Dict,
        aggregate_fn: Callable,
        is_late: bool
    ) -> Dict:
        """Update or create window result."""
        
        window_start, window_end = self._parse_window_key(window_key)
        
        existing = self.window_results.get(window_key)
        
        if existing:
            # Update existing window
            new_result = aggregate_fn(existing.result, event)
            existing.result = new_result
            existing.update_count += 1
            existing.last_updated = datetime.utcnow()
            existing.is_final = False  # No longer final since it was updated
            
            return {
                "window_key": window_key,
                "result": new_result,
                "is_update": True,
                "is_late_update": is_late,
                "update_count": existing.update_count
            }
        else:
            # New window
            result = aggregate_fn(None, event)
            self.window_results[window_key] = WindowResult(
                window_start=window_start,
                window_end=window_end,
                result=result,
                is_final=False,
                update_count=0
            )
            
            return {
                "window_key": window_key,
                "result": result,
                "is_update": False,
                "is_late_update": is_late,
                "update_count": 0
            }
    
    def finalize_window(self, window_key: str) -> Optional[WindowResult]:
        """Mark a window as final (watermark passed window end)."""
        
        if window_key in self.window_results:
            result = self.window_results[window_key]
            result.is_final = True
            return result
        return None
    
    def _get_window_key(self, event_time: datetime) -> str:
        """Get window key for an event time (1-hour tumbling)."""
        window_start = event_time.replace(minute=0, second=0, microsecond=0)
        window_end = window_start + timedelta(hours=1)
        return f"{window_start.isoformat()}_{window_end.isoformat()}"
    
    def _parse_window_key(self, key: str) -> tuple:
        """Parse window key back to start/end times."""
        start_str, end_str = key.split("_")
        return (
            datetime.fromisoformat(start_str),
            datetime.fromisoformat(end_str)
        )
    
    def get_metrics(self) -> Dict:
        """Get late data handling metrics."""
        total = self.on_time_count + self.late_accepted_count + self.late_dropped_count
        
        return {
            "on_time_count": self.on_time_count,
            "late_accepted_count": self.late_accepted_count,
            "late_dropped_count": self.late_dropped_count,
            "on_time_rate": self.on_time_count / total if total > 0 else 0,
            "late_rate": (self.late_accepted_count + self.late_dropped_count) / total if total > 0 else 0
        }

3.3 Allowed Lateness with State Cleanup

# late_data/state_cleanup.py

"""
Managing state with allowed lateness.

Problem: If we keep window state forever to handle late data,
         memory grows unbounded.

Solution: Use allowed lateness + cleanup timers.
"""

from datetime import datetime, timedelta
from typing import Dict, Any, Optional
import logging

logger = logging.getLogger(__name__)


class WindowStateManager:
    """
    Manages window state with cleanup for late data handling.
    
    State lifecycle:
    1. Window created when first event arrives
    2. Window fires when watermark passes window end
    3. State kept for allowed_lateness period (for updates)
    4. State cleaned up after allowed_lateness expires
    """
    
    def __init__(
        self,
        allowed_lateness: timedelta,
        cleanup_interval: timedelta = timedelta(minutes=5)
    ):
        self.allowed_lateness = allowed_lateness
        self.cleanup_interval = cleanup_interval
        
        self.windows: Dict[str, Dict] = {}
        self.cleanup_timers: Dict[str, datetime] = {}
        self.last_cleanup = datetime.utcnow()
    
    def get_or_create_window(
        self,
        window_key: str,
        window_end: datetime
    ) -> Dict:
        """Get existing window state or create new."""
        
        if window_key not in self.windows:
            self.windows[window_key] = {
                "count": 0,
                "sum": 0,
                "created_at": datetime.utcnow(),
                "window_end": window_end,
                "fired": False,
                "update_count": 0
            }
            
            # Set cleanup timer
            cleanup_time = window_end + self.allowed_lateness
            self.cleanup_timers[window_key] = cleanup_time
        
        return self.windows[window_key]
    
    def update_window(self, window_key: str, value: float):
        """Add value to window aggregate."""
        
        if window_key in self.windows:
            self.windows[window_key]["count"] += 1
            self.windows[window_key]["sum"] += value
            
            if self.windows[window_key]["fired"]:
                self.windows[window_key]["update_count"] += 1
    
    def fire_window(self, window_key: str) -> Optional[Dict]:
        """Fire window and return result."""
        
        if window_key not in self.windows:
            return None
        
        window = self.windows[window_key]
        window["fired"] = True
        
        return {
            "window_key": window_key,
            "count": window["count"],
            "sum": window["sum"],
            "avg": window["sum"] / window["count"] if window["count"] > 0 else 0,
            "is_final": False  # May still receive late updates
        }
    
    def maybe_cleanup(self, current_watermark: datetime):
        """
        Clean up expired window states.
        
        Called periodically to free memory.
        """
        
        now = datetime.utcnow()
        if now - self.last_cleanup < self.cleanup_interval:
            return
        
        self.last_cleanup = now
        
        windows_to_remove = []
        
        for window_key, cleanup_time in self.cleanup_timers.items():
            if current_watermark > cleanup_time:
                windows_to_remove.append(window_key)
        
        for window_key in windows_to_remove:
            # Emit final result before cleanup
            if window_key in self.windows:
                window = self.windows[window_key]
                logger.debug(
                    f"Cleaning up window {window_key}, "
                    f"received {window['update_count']} late updates"
                )
                
                del self.windows[window_key]
            
            del self.cleanup_timers[window_key]
        
        if windows_to_remove:
            logger.info(f"Cleaned up {len(windows_to_remove)} expired windows")
    
    def get_state_size(self) -> Dict:
        """Get current state size metrics."""
        return {
            "window_count": len(self.windows),
            "timer_count": len(self.cleanup_timers),
            "total_events": sum(w["count"] for w in self.windows.values())
        }

Chapter 4: Reprocessing and Corrections

4.1 When to Reprocess

REPROCESSING VS PATCHING

PATCHING (Incremental update)
─────────────────────────────
Apply late events to existing results.

Use when:
├── Late data is small volume (<5% of original)
├── Aggregations are additive (SUM, COUNT)
├── Need quick correction
└── Can handle non-final results

Example:
  Original: revenue = $100,000
  Late event: +$500
  Patched: revenue = $100,500


REPROCESSING (Full recompute)
─────────────────────────────
Recalculate everything from raw events.

Use when:
├── Late data is significant volume
├── Aggregations are non-additive (AVG, MEDIAN, DISTINCT)
├── Need guaranteed accuracy
├── Schema or logic changed
└── Data corruption detected

Example:
  Re-read all events for the day
  Recalculate all aggregations
  Replace existing results

4.2 Implementing Reprocessing

# late_data/reprocessing.py

"""
Reprocessing pipeline for late data and corrections.

Strategies:
1. Micro-batch reprocessing (hourly windows)
2. Daily batch reprocessing
3. On-demand backfill
"""

from dataclasses import dataclass
from datetime import datetime, date, timedelta
from typing import Dict, Any, List, Optional
import logging

logger = logging.getLogger(__name__)


@dataclass
class ReprocessingJob:
    """Defines a reprocessing job."""
    job_id: str
    start_time: datetime
    end_time: datetime
    reason: str
    status: str = "pending"
    created_at: datetime = None
    completed_at: datetime = None
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.utcnow()


class ReprocessingManager:
    """
    Manages reprocessing of historical data.
    """
    
    def __init__(
        self,
        raw_data_source,
        aggregation_pipeline,
        output_sink
    ):
        self.source = raw_data_source
        self.pipeline = aggregation_pipeline
        self.sink = output_sink
        self.jobs: Dict[str, ReprocessingJob] = {}
    
    async def trigger_reprocessing(
        self,
        start_time: datetime,
        end_time: datetime,
        reason: str
    ) -> ReprocessingJob:
        """
        Trigger reprocessing for a time range.
        """
        
        job_id = f"reprocess_{start_time.strftime('%Y%m%d%H')}_{end_time.strftime('%Y%m%d%H')}"
        
        job = ReprocessingJob(
            job_id=job_id,
            start_time=start_time,
            end_time=end_time,
            reason=reason
        )
        
        self.jobs[job_id] = job
        
        logger.info(
            f"Triggered reprocessing job {job_id}: "
            f"{start_time} to {end_time}, reason: {reason}"
        )
        
        # Execute reprocessing
        await self._execute_reprocessing(job)
        
        return job
    
    async def _execute_reprocessing(self, job: ReprocessingJob):
        """Execute the reprocessing job."""
        
        try:
            job.status = "running"
            
            # Step 1: Read raw events for time range
            events = await self.source.read_events(
                start_time=job.start_time,
                end_time=job.end_time
            )
            
            logger.info(f"Read {len(events)} events for reprocessing")
            
            # Step 2: Run aggregation pipeline
            results = await self.pipeline.process_batch(events)
            
            # Step 3: Write results (replace existing)
            await self.sink.write_results(
                results,
                mode="overwrite",  # Replace, not append
                time_range=(job.start_time, job.end_time)
            )
            
            job.status = "completed"
            job.completed_at = datetime.utcnow()
            
            logger.info(
                f"Reprocessing job {job.job_id} completed, "
                f"processed {len(events)} events"
            )
            
        except Exception as e:
            job.status = "failed"
            logger.error(f"Reprocessing job {job.job_id} failed: {e}")
            raise
    
    async def schedule_daily_reprocessing(self, target_date: date):
        """
        Schedule end-of-day reprocessing to finalize numbers.
        
        Runs after allowed_lateness period to capture late data.
        """
        
        start = datetime.combine(target_date, datetime.min.time())
        end = datetime.combine(target_date + timedelta(days=1), datetime.min.time())
        
        return await self.trigger_reprocessing(
            start_time=start,
            end_time=end,
            reason="scheduled_daily_finalization"
        )


class IncrementalPatcher:
    """
    Applies incremental patches for late data.
    
    More efficient than full reprocessing for small corrections.
    """
    
    def __init__(self, result_store):
        self.store = result_store
    
    async def apply_late_events(
        self,
        late_events: List[Dict],
        aggregations: Dict[str, str]  # column -> aggregation type
    ) -> Dict[str, int]:
        """
        Apply late events as incremental patches.
        
        Only works for additive aggregations!
        """
        
        patches_by_window = {}
        
        for event in late_events:
            window_key = self._get_window_key(event)
            
            if window_key not in patches_by_window:
                patches_by_window[window_key] = {
                    "count": 0,
                    "updates": {}
                }
            
            patches_by_window[window_key]["count"] += 1
            
            for col, agg_type in aggregations.items():
                value = event.get(col, 0)
                
                if agg_type == "SUM":
                    patches_by_window[window_key]["updates"][col] = \
                        patches_by_window[window_key]["updates"].get(col, 0) + value
                
                elif agg_type == "COUNT":
                    patches_by_window[window_key]["updates"][col] = \
                        patches_by_window[window_key]["updates"].get(col, 0) + 1
                
                # Note: AVG, MEDIAN, DISTINCT require reprocessing!
        
        # Apply patches to store
        patched_count = 0
        for window_key, patch in patches_by_window.items():
            await self.store.apply_patch(window_key, patch["updates"])
            patched_count += patch["count"]
        
        return {
            "windows_patched": len(patches_by_window),
            "events_applied": patched_count
        }
    
    def _get_window_key(self, event: Dict) -> str:
        """Get window key from event."""
        event_time = datetime.fromisoformat(event["event_time"])
        return event_time.strftime("%Y-%m-%d-%H")

4.3 Versioned Results

# late_data/versioning.py

"""
Versioned analytics results.

Instead of updating results in place, maintain versions:
- Each version has a timestamp
- Queries can request specific version or latest
- Audit trail of how numbers changed
"""

from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any, List, Optional
import json


@dataclass
class ResultVersion:
    """A version of an analytics result."""
    version_id: int
    window_key: str
    result: Dict[str, Any]
    computed_at: datetime
    event_count: int
    is_final: bool
    reason: str  # "initial", "late_data", "correction", "reprocessing"


class VersionedResultStore:
    """
    Stores versioned analytics results.
    
    Benefits:
    1. Audit trail: See how numbers changed over time
    2. Debugging: Compare versions to find issues
    3. Consistency: Queries can pin to a version
    4. Rollback: Revert to previous version if needed
    """
    
    def __init__(self, storage_backend):
        self.storage = storage_backend
    
    async def store_result(
        self,
        window_key: str,
        result: Dict[str, Any],
        event_count: int,
        is_final: bool,
        reason: str
    ) -> ResultVersion:
        """
        Store a new version of a result.
        """
        
        # Get next version number
        existing = await self.storage.get_versions(window_key)
        next_version = len(existing) + 1
        
        version = ResultVersion(
            version_id=next_version,
            window_key=window_key,
            result=result,
            computed_at=datetime.utcnow(),
            event_count=event_count,
            is_final=is_final,
            reason=reason
        )
        
        await self.storage.save_version(version)
        
        return version
    
    async def get_latest(self, window_key: str) -> Optional[ResultVersion]:
        """Get the most recent version."""
        versions = await self.storage.get_versions(window_key)
        return versions[-1] if versions else None
    
    async def get_final(self, window_key: str) -> Optional[ResultVersion]:
        """Get the final (official) version."""
        versions = await self.storage.get_versions(window_key)
        
        for v in reversed(versions):
            if v.is_final:
                return v
        
        return versions[-1] if versions else None
    
    async def get_at_time(
        self,
        window_key: str,
        as_of: datetime
    ) -> Optional[ResultVersion]:
        """Get the version that was current at a specific time."""
        
        versions = await self.storage.get_versions(window_key)
        
        # Find latest version computed before as_of time
        valid_versions = [
            v for v in versions if v.computed_at <= as_of
        ]
        
        return valid_versions[-1] if valid_versions else None
    
    async def get_version_history(
        self,
        window_key: str
    ) -> List[Dict]:
        """Get full version history for debugging."""
        
        versions = await self.storage.get_versions(window_key)
        
        history = []
        for i, v in enumerate(versions):
            entry = {
                "version": v.version_id,
                "computed_at": v.computed_at.isoformat(),
                "result": v.result,
                "event_count": v.event_count,
                "is_final": v.is_final,
                "reason": v.reason
            }
            
            # Add delta from previous version
            if i > 0:
                prev = versions[i-1]
                entry["delta"] = self._compute_delta(prev.result, v.result)
            
            history.append(entry)
        
        return history
    
    def _compute_delta(
        self,
        old_result: Dict,
        new_result: Dict
    ) -> Dict:
        """Compute difference between two result versions."""
        
        delta = {}
        
        for key in set(old_result.keys()) | set(new_result.keys()):
            old_val = old_result.get(key, 0)
            new_val = new_result.get(key, 0)
            
            if isinstance(old_val, (int, float)) and isinstance(new_val, (int, float)):
                diff = new_val - old_val
                if diff != 0:
                    delta[key] = {
                        "old": old_val,
                        "new": new_val,
                        "diff": diff,
                        "pct_change": (diff / old_val * 100) if old_val != 0 else None
                    }
        
        return delta


VERSION_HISTORY_EXAMPLE = """
VERSION HISTORY FOR WINDOW 2024-01-15-14

Version 1 (Initial):
  computed_at: 2024-01-15T15:00:00Z
  result: {revenue: 50000, orders: 500}
  event_count: 500
  is_final: false
  reason: initial

Version 2 (Late data):
  computed_at: 2024-01-15T16:30:00Z
  result: {revenue: 52500, orders: 525}
  event_count: 525
  is_final: false
  reason: late_data
  delta: {
    revenue: {old: 50000, new: 52500, diff: +2500, pct: +5%},
    orders: {old: 500, new: 525, diff: +25, pct: +5%}
  }

Version 3 (Final):
  computed_at: 2024-01-16T01:00:00Z
  result: {revenue: 53200, orders: 532}
  event_count: 532
  is_final: true
  reason: daily_finalization
  delta: {
    revenue: {old: 52500, new: 53200, diff: +700, pct: +1.3%},
    orders: {old: 525, new: 532, diff: +7, pct: +1.3%}
  }

This shows:
- Initial result was 5% low (mobile offline events)
- Daily finalization added 1.3% more (payment confirmations)
- Total correction: +6.4% from initial to final
"""

Part II: Implementation

Chapter 5: Production Late Data Pipeline

5.1 End-to-End Architecture

LATE DATA HANDLING ARCHITECTURE

┌────────────────────────────────────────────────────────────────────────┐
│                                                                        │
│                         EVENT SOURCES                                  │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐                    │
│  │  Web    │  │ Mobile  │  │  API    │  │ Partners│                    │
│  │(on-time)│  │(late)   │  │(on-time)│  │(batch)  │                    │
│  └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘                    │
│       │            │            │            │                         │
└───────┼────────────┼────────────┼────────────┼─────────────────────────┘
        │            │            │            │
        └────────────┴──────┬─────┴────────────┘
                            │
                            ▼
┌────────────────────────────────────────────────────────────────────────┐
│                        INGESTION LAYER                                 │
│                                                                        │
│  ┌────────────────────────────────────────────────────────────────┐    │
│  │                    Kafka (events topic)                        │    │
│  │   Partition by user_id for ordering                            │    │
│  │   7-day retention for replay                                   │    │
│  └────────────────────────────────────────────────────────────────┘    │
│                            │                                           │
└────────────────────────────┼───────────────────────────────────────────┘
                             │
                             ▼
┌───────────────────────────────────────────────────────────────────────┐
│                    STREAM PROCESSING LAYER                            │
│                                                                       │
│  ┌───────────────────────────────────────────────────────────────┐    │
│  │                      Flink Job                                │    │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐         │    │
│  │  │  Watermark   │  │   Window     │  │    Late      │         │    │
│  │  │  Generator   │──│  Aggregator  │──│   Handler    │         │    │
│  │  │ (5min late)  │  │ (1hr tumble) │  │(side output) │         │    │
│  │  └──────────────┘  └──────────────┘  └──────────────┘         │    │
│  └───────────────────────────────────────────────────────────────┘    │
│                   │                               │                   │
│                   ▼                               ▼                   │
│            ┌────────────┐                 ┌────────────┐              │
│            │  On-time   │                 │   Late     │              │
│            │  Results   │                 │  Events    │              │
│            └─────┬──────┘                 └─────┬──────┘              │
│                  │                              │                     │
└──────────────────┼──────────────────────────────┼─────────────────────┘
                   │                              │
                   ▼                              ▼
┌───────────────────────────────────────────────────────────────────────┐
│                         SERVING LAYER                                 │
│                                                                       │
│  ┌──────────────────────┐        ┌──────────────────────┐             │
│  │    Druid (Real-time) │        │  Late Events Queue   │             │
│  │   Versioned results  │        │  (for patching)      │             │
│  └──────────┬───────────┘        └──────────┬───────────┘             │
│             │                               │                         │
└─────────────┼───────────────────────────────┼─────────────────────────┘
              │                               │
              │                               ▼
              │            ┌─────────────────────────────────┐
              │            │      Batch Reprocessing         │
              │            │  (Daily finalization job)       │
              │            │  Spark: Read raw → Aggregate    │
              │            │         → Overwrite results     │
              │            └─────────────────────────────────┘
              │                               │
              └───────────────┬───────────────┘
                              │
                              ▼
                    ┌─────────────────────┐
                    │    Dashboard API    │
                    │  - Latest version   │
                    │  - Final version    │
                    │  - As-of version    │
                    └─────────────────────┘

5.2 Complete Implementation

# late_data/production_pipeline.py

"""
Production late data handling pipeline.

Combines streaming for real-time with batch for accuracy.
"""

from dataclasses import dataclass, field
from datetime import datetime, timedelta, date
from typing import Dict, Any, List, Optional
import asyncio
import logging

logger = logging.getLogger(__name__)


@dataclass
class PipelineConfig:
    """Configuration for the late data pipeline."""
    
    # Watermark settings
    allowed_lateness: timedelta = timedelta(minutes=5)
    watermark_idle_timeout: timedelta = timedelta(minutes=1)
    
    # Window settings
    window_size: timedelta = timedelta(hours=1)
    
    # Reprocessing settings
    daily_reprocessing_hour: int = 2  # 2 AM
    reprocessing_lookback: timedelta = timedelta(days=1)
    
    # Output settings
    realtime_output: str = "druid://metrics"
    batch_output: str = "s3://warehouse/metrics"
    late_events_output: str = "kafka://late-events"


class LateDataPipeline:
    """
    Complete pipeline for handling late data in analytics.
    """
    
    def __init__(
        self,
        config: PipelineConfig,
        kafka_source,
        flink_env,
        spark_session,
        result_store: VersionedResultStore
    ):
        self.config = config
        self.kafka = kafka_source
        self.flink = flink_env
        self.spark = spark_session
        self.results = result_store
        
        # Components
        self.watermark_strategy = BoundedOutOfOrdernessWatermark(
            config.allowed_lateness
        )
        self.late_handler = LateDataHandler(
            LateDataConfig(
                allowed_lateness=config.allowed_lateness,
                action=LateDataAction.SIDE_OUTPUT,
                side_output_topic=config.late_events_output
            )
        )
        self.patcher = IncrementalPatcher(result_store)
        self.reprocessor = ReprocessingManager(
            raw_data_source=kafka_source,
            aggregation_pipeline=self,
            output_sink=result_store
        )
    
    async def start_streaming(self):
        """
        Start the streaming pipeline.
        
        This runs continuously, processing events as they arrive.
        """
        
        logger.info("Starting streaming pipeline")
        
        # Read from Kafka
        events = await self.kafka.read_stream("events")
        
        # Process events with watermarks
        async for event in events:
            await self._process_streaming_event(event)
    
    async def _process_streaming_event(self, event: Dict):
        """Process a single event in streaming mode."""
        
        event_time = datetime.fromisoformat(event["event_time"])
        
        # Update watermark
        watermark = self.watermark_strategy.on_event(event_time)
        
        # Process through late handler
        result = self.late_handler.process_event(
            event=event,
            watermark=watermark,
            aggregate_fn=self._aggregate_event
        )
        
        if result:
            # Store result version
            await self.results.store_result(
                window_key=result["window_key"],
                result=result["result"],
                event_count=result.get("event_count", 1),
                is_final=False,
                reason="late_data" if result["is_late_update"] else "streaming"
            )
    
    def _aggregate_event(
        self,
        current: Optional[Dict],
        event: Dict
    ) -> Dict:
        """Aggregate function for events."""
        
        if current is None:
            current = {
                "revenue": 0,
                "orders": 0,
                "items": 0
            }
        
        if event.get("event_type") == "order.placed":
            current["revenue"] += event.get("payload", {}).get("total", 0)
            current["orders"] += 1
            current["items"] += event.get("payload", {}).get("item_count", 0)
        
        return current
    
    async def run_daily_finalization(self, target_date: date):
        """
        Run daily finalization batch job.
        
        This reprocesses the full day to produce final numbers.
        """
        
        logger.info(f"Running daily finalization for {target_date}")
        
        start_time = datetime.combine(target_date, datetime.min.time())
        end_time = start_time + timedelta(days=1)
        
        # Read all events for the day from raw storage
        events_df = self.spark.read.parquet(
            f"s3://data-lake/events/date={target_date}"
        )
        
        # Aggregate by hour
        results = events_df.filter(
            "event_type = 'order.placed'"
        ).groupBy(
            "date_trunc('hour', event_time) as window_start"
        ).agg({
            "payload.total": "sum",
            "*": "count",
            "payload.item_count": "sum"
        }).collect()
        
        # Store final results
        for row in results:
            window_key = row["window_start"].strftime("%Y-%m-%d-%H")
            
            await self.results.store_result(
                window_key=window_key,
                result={
                    "revenue": row["sum(total)"],
                    "orders": row["count(*)"],
                    "items": row["sum(item_count)"]
                },
                event_count=row["count(*)"],
                is_final=True,
                reason="daily_finalization"
            )
        
        logger.info(
            f"Daily finalization complete for {target_date}, "
            f"finalized {len(results)} hourly windows"
        )
    
    async def apply_late_event_patches(self):
        """
        Process queued late events as patches.
        
        Runs periodically to update results with late data
        without full reprocessing.
        """
        
        late_events = self.late_handler.late_events
        
        if not late_events:
            return
        
        logger.info(f"Applying {len(late_events)} late event patches")
        
        # Apply as incremental patches
        result = await self.patcher.apply_late_events(
            late_events=[e["event"] for e in late_events],
            aggregations={
                "revenue": "SUM",
                "orders": "COUNT",
                "items": "SUM"
            }
        )
        
        # Clear processed events
        self.late_handler.late_events = []
        
        logger.info(
            f"Applied patches to {result['windows_patched']} windows, "
            f"{result['events_applied']} events"
        )
    
    async def get_metrics_for_dashboard(
        self,
        window_key: str,
        version_type: str = "latest"  # "latest", "final", or ISO timestamp
    ) -> Dict:
        """
        Get metrics for dashboard display.
        
        Supports different version types:
        - latest: Most recent computation (may change)
        - final: Official finalized number (stable)
        - timestamp: As-of a specific time (for auditing)
        """
        
        if version_type == "latest":
            version = await self.results.get_latest(window_key)
        elif version_type == "final":
            version = await self.results.get_final(window_key)
        else:
            # Assume it's a timestamp
            as_of = datetime.fromisoformat(version_type)
            version = await self.results.get_at_time(window_key, as_of)
        
        if not version:
            return {"error": "No data found"}
        
        return {
            "window_key": window_key,
            "result": version.result,
            "computed_at": version.computed_at.isoformat(),
            "is_final": version.is_final,
            "version": version.version_id,
            "event_count": version.event_count,
            # Include staleness indicator
            "staleness": (datetime.utcnow() - version.computed_at).total_seconds()
        }

5.3 Dashboard Integration

# late_data/dashboard_api.py

"""
API for dashboards to query late-data-aware metrics.
"""

from fastapi import FastAPI, Query
from datetime import datetime, date
from typing import Optional

app = FastAPI(title="Analytics API")


@app.get("/metrics/{metric_name}")
async def get_metric(
    metric_name: str,
    window: str = Query(..., description="Window key (e.g., 2024-01-15-14)"),
    version: str = Query(
        "latest",
        description="Version: 'latest', 'final', or ISO timestamp"
    ),
    include_history: bool = Query(
        False,
        description="Include version history"
    )
):
    """
    Get a metric value with late-data awareness.
    
    Returns:
    - result: The metric values
    - is_final: Whether this is the finalized number
    - staleness: Seconds since last computation
    - versions: (optional) Full version history
    """
    
    pipeline = get_pipeline()  # Dependency injection
    
    result = await pipeline.get_metrics_for_dashboard(
        window_key=window,
        version_type=version
    )
    
    if include_history:
        result["history"] = await pipeline.results.get_version_history(window)
    
    return result


@app.get("/metrics/{metric_name}/compare")
async def compare_versions(
    metric_name: str,
    window: str,
    version_a: str = "latest",
    version_b: str = "final"
):
    """
    Compare two versions of a metric.
    
    Useful for seeing real-time vs final discrepancy.
    """
    
    pipeline = get_pipeline()
    
    result_a = await pipeline.get_metrics_for_dashboard(window, version_a)
    result_b = await pipeline.get_metrics_for_dashboard(window, version_b)
    
    comparison = {
        "version_a": result_a,
        "version_b": result_b,
        "delta": {}
    }
    
    # Calculate delta
    for key in result_a.get("result", {}):
        val_a = result_a["result"].get(key, 0)
        val_b = result_b["result"].get(key, 0)
        
        if isinstance(val_a, (int, float)):
            comparison["delta"][key] = {
                "absolute": val_b - val_a,
                "percentage": ((val_b - val_a) / val_a * 100) if val_a else None
            }
    
    return comparison


@app.get("/data-quality/lateness")
async def get_lateness_metrics(
    start_date: date,
    end_date: date
):
    """
    Get data lateness metrics for monitoring.
    
    Shows how much data arrives late and how late.
    """
    
    pipeline = get_pipeline()
    handler = pipeline.late_handler
    
    metrics = handler.get_metrics()
    
    return {
        "period": {
            "start": start_date.isoformat(),
            "end": end_date.isoformat()
        },
        "on_time_rate": metrics["on_time_rate"],
        "late_rate": metrics["late_rate"],
        "late_accepted": metrics["late_accepted_count"],
        "late_dropped": metrics["late_dropped_count"],
        "recommendations": _get_recommendations(metrics)
    }


def _get_recommendations(metrics: dict) -> list:
    """Generate recommendations based on lateness metrics."""
    
    recommendations = []
    
    if metrics["late_rate"] > 0.10:  # >10% late
        recommendations.append({
            "severity": "high",
            "message": "High late data rate. Consider increasing allowed lateness.",
            "current_late_rate": f"{metrics['late_rate']*100:.1f}%"
        })
    
    if metrics["late_dropped_count"] > 0:
        recommendations.append({
            "severity": "medium",
            "message": f"{metrics['late_dropped_count']} events dropped as too late. "
                      "Review late event patterns."
        })
    
    return recommendations

Part III: Real-World Application

Chapter 6: Case Studies

6.1 LinkedIn's Lambda+ Architecture

LINKEDIN'S APPROACH TO LATE DATA

Challenge:
├── 500B+ events/day
├── Events from mobile can be hours late
├── Business needs both real-time and accurate daily numbers

Solution: Lambda+ Architecture

┌────────────────────────────────────────────────────────────────────────┐
│                    LINKEDIN LAMBDA+ ARCHITECTURE                       │
│                                                                        │
│  Speed Layer (Real-time):                                              │
│  ├── Samza for streaming aggregations                                  │
│  ├── 5-minute allowed lateness                                         │
│  ├── Results marked as "provisional"                                   │
│  └── Updates dashboards immediately                                    │
│                                                                        │
│  Batch Layer (Accuracy):                                               │
│  ├── Spark jobs run T+1 day (24h after)                                │
│  ├── Processes ALL events including late                               │
│  ├── Results marked as "final"                                         │
│  └── Used for reporting, ML training                                   │
│                                                                        │
│  Reconciliation Layer (Trust):                                         │
│  ├── Compares speed vs batch results                                   │
│  ├── Alerts if discrepancy > threshold                                 │
│  ├── Auto-adjusts allowed lateness based on patterns                   │
│  └── Generates data quality reports                                    │
│                                                                        │
│  Key Innovation: AUTOMATED LATENESS TUNING                             │
│  ├── Analyze historical lateness distribution                          │
│  ├── Set allowed_lateness to capture 99% of events                     │
│  ├── Per-event-type tuning (mobile vs web)                             │
│  └── Continuous adjustment based on trends                             │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

6.2 Uber's Correction Pattern

UBER'S LATE DATA CORRECTION PATTERN

Challenge:
├── Trip events can be delayed (driver phone offline)
├── Payment events arrive later (processing time)
├── Financial reports must be accurate

Solution: T+1 and T+7 Reports

┌────────────────────────────────────────────────────────────────────────┐
│                                                                        │
│  REPORTING TIERS                                                       │
│                                                                        │
│  Real-time (T+0):                                                      │
│  ├── Shows: "Today so far"                                             │
│  ├── Completeness: ~95%                                                │
│  ├── Use case: Operations dashboard                                    │
│  └── Label: "Live (updating)"                                          │
│                                                                        │
│  Preliminary (T+1):                                                    │
│  ├── Shows: "Yesterday"                                                │
│  ├── Completeness: ~99%                                                │
│  ├── Use case: Daily standup, operations review                        │
│  └── Label: "Preliminary"                                              │
│                                                                        │
│  Final (T+7):                                                          │
│  ├── Shows: "Last week"                                                │
│  ├── Completeness: ~99.9%                                              │
│  ├── Use case: Finance, compliance, ML training                        │
│  └── Label: "Final"                                                    │
│                                                                        │
│  CORRECTION WORKFLOW:                                                  │
│                                                                        │
│  Day 0: Real-time shows $10M                                           │
│  Day 1: T+1 job runs, updates to $10.2M (+2%)                          │
│  Day 7: T+7 job runs, finalizes at $10.25M (+0.5%)                     │
│  Day 8+: Immutable (any changes are "adjustments" in new period)       │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Chapter 7: Common Mistakes

7.1 Late Data Anti-Patterns

LATE DATA ANTI-PATTERNS

❌ MISTAKE 1: Ignoring Late Data

Wrong:
  # Just use processing time
  df.groupBy(window(col("processing_time"), "1 hour")).sum()

Problem:
  Mobile offline events land in wrong hour
  Numbers shift unpredictably

Right:
  # Use event time with watermarks
  df.withWatermark("event_time", "5 minutes")
    .groupBy(window(col("event_time"), "1 hour"))
    .sum()


❌ MISTAKE 2: Infinite Allowed Lateness

Wrong:
  allowed_lateness = timedelta(days=365)  # Wait forever!

Problem:
  State grows unbounded → OOM
  Windows never close → no results

Right:
  # Match lateness to business reality
  allowed_lateness = timedelta(hours=1)  # For web
  allowed_lateness = timedelta(hours=24)  # For mobile


❌ MISTAKE 3: No Versioning

Wrong:
  # Overwrite results in place
  result_table.update(window_key, new_value)

Problem:
  No audit trail
  Can't explain changes
  "Why did yesterday's number change?"

Right:
  # Version all results
  result_table.insert_version(
      window_key, new_value, 
      version=next_version,
      reason="late_data"
  )


❌ MISTAKE 4: Same Lateness for Everything

Wrong:
  # One size fits all
  ALLOWED_LATENESS = "1 hour"  # For all events

Problem:
  Web events: 99.9% arrive in 1 minute (over-buffered)
  Mobile events: 20% arrive after 1 hour (under-buffered)

Right:
  # Per-source lateness configuration
  WEB_LATENESS = "5 minutes"
  MOBILE_LATENESS = "4 hours"
  PARTNER_LATENESS = "24 hours"

7.2 Dashboard Mistakes

DASHBOARD ANTI-PATTERNS

❌ MISTAKE 1: Showing "Real-time" Without Caveat

Wrong:
  Dashboard title: "Revenue Today: $1,234,567"

Problem:
  User thinks this is final
  Makes decisions on incomplete data
  Loses trust when it changes

Right:
  Dashboard title: "Revenue Today: $1,234,567"
  Subtitle: "Live • Updates every minute • Final at midnight"


❌ MISTAKE 2: No Comparison to Final

Wrong:
  Show only latest number
  No way to see what changed

Problem:
  User can't assess data quality
  No understanding of late data impact

Right:
  "Today (live): $1,234,567"
  "Yesterday (final): $1,456,789"
  "Yesterday (at this time): $1,380,000" ← For comparison


❌ MISTAKE 3: Alerting on Non-Final Data

Wrong:
  if revenue < threshold:
      send_alert("Revenue is low!")

Problem:
  Alert fires, then number corrects
  Alert fatigue
  Team ignores real issues

Right:
  if is_final and revenue < threshold:
      send_alert("Final revenue below threshold")
  elif not is_final and revenue < threshold * 0.9:
      # Only alert if significantly below
      send_alert("Warning: Live revenue tracking low")

Part IV: Interview Preparation

Chapter 8: Interview Tips

8.1 When to Discuss Late Data

LATE DATA INTERVIEW TRIGGERS

"Design a real-time analytics system"
  → "How do we handle events that arrive late?"

"Design a dashboard for [X]"
  → "Should we show live or final numbers?"

"Why might the numbers not match?"
  → "Late data is a common cause..."

"How do you ensure data accuracy?"
  → "Versioning and finalization process..."

"Design event tracking for mobile app"
  → "Mobile events often arrive late due to offline usage..."

8.2 Key Phrases

TALKING ABOUT LATE DATA

"Late data is inevitable in distributed systems. Mobile apps
go offline, payment confirmations are async, partner feeds
are batched. The key is designing for it from the start."

"I'd use event time with bounded out-of-orderness watermarks.
Based on the lateness distribution — which we'd measure —
we'd set allowed lateness to capture 99% of events."

"For dashboards, I'd version all results and clearly label
whether data is 'live' or 'final'. Users need to know what
they're looking at. Real-time is valuable, but so is accuracy."

"Late events beyond our tolerance go to a side output for
later processing. Nothing is dropped silently — we either
process it or track it for analysis."

"We'd run a daily finalization job that reprocesses the full
day's data. This produces the 'official' numbers that finance
uses. The streaming numbers are provisional."

Chapter 9: Practice Problems

Problem 1: Mobile Gaming Analytics

Setup: Design analytics for a mobile game where:

  • Users play offline (airplane mode, subway)
  • Events buffer on device for up to 24 hours
  • Need real-time leaderboards AND accurate daily reports

Questions:

  1. What watermark strategy would you use?
  2. How do you handle a user who goes offline for 2 days?
  3. How do you reconcile real-time vs daily leaderboards?
  • Separate allowed lateness for leaderboard vs reports
  • Per-user watermarks with idle timeout
  • Leaderboard: Show provisional, update on sync
  • Daily: T+1 finalization job

Problem 2: Ad Attribution

Setup: Attribute ad conversions where:

  • Ad impression → (up to 30 days) → Conversion
  • Conversions may arrive days after they happen
  • Revenue attribution must be accurate for billing

Questions:

  1. How long do you keep attribution state?
  2. How do you handle attribution windows reopening?
  3. When do you finalize attribution for billing?
  • Keep impression state for 30+ days
  • Attribution is mutable until billing close
  • Monthly billing close is "final" cutoff
  • Corrections after billing are adjustments

Chapter 10: Sample Interview Dialogue

Interviewer: "We're seeing our dashboard numbers change after the fact. Users are losing trust. How would you fix this?"

You: "This is a classic late data problem. Let me understand first — what kinds of numbers are changing, and by how much?"

Interviewer: "Daily revenue. Sometimes it increases by 5-10% the next day."

You: "That's significant. The 5-10% is likely from mobile app events arriving late, payment confirmations, or batch data feeds. First, I'd measure the actual lateness distribution — what percentage arrives within 1 hour, 4 hours, 24 hours.

Based on that data, I'd implement a multi-pronged approach:

First, watermarks in streaming with appropriate allowed lateness. If 99% of events arrive within 4 hours, set that as the watermark lag. Events arriving after that go to a side output for later processing.

Second, versioned results. Instead of updating in place, store each computation as a new version. This creates an audit trail and lets users understand how numbers evolve.

Third, clear labeling on dashboards. Show 'Live (updating)' for streaming results and 'Final' for batch-processed results. The T+1 batch job reprocesses the full day to produce the official number.

Fourth, educate users. Provide a comparison view showing 'Today at this time yesterday' alongside 'Yesterday final'. This sets expectations that live numbers will increase."

Interviewer: "What if some users really need real-time and others need final?"

You: "Great point. I'd expose both through the API with a version parameter. Dashboards for operations show 'latest' — they need speed. Finance reports request 'final' — they need accuracy.

We can also add a 'completeness estimate' based on historical patterns. If we typically see 95% of events within the first hour, we can show 'Today: $1M (estimated 95% complete)'. This gives users both the number and the confidence level."


Summary

DAY 4 SUMMARY: LATE-ARRIVING DATA

TWO TYPES OF TIME
├── Event time: When it happened (business truth)
├── Processing time: When we saw it
└── Gap can be seconds to days

WATERMARKS
├── Track event time progress
├── "All events <= W have (probably) arrived"
├── Bounded out-of-orderness most common
└── Set based on measured lateness distribution

HANDLING STRATEGIES
├── DROP: Ignore late events (simple, loses data)
├── SIDE OUTPUT: Route for separate processing
├── UPDATE: Modify window result (complex, accurate)
└── REPROCESS: Full recomputation (expensive, exact)

VERSIONING
├── Never update in place
├── Store each computation as version
├── Support "latest", "final", "as-of" queries
└── Audit trail for debugging

PRODUCTION PATTERNS
├── Streaming for real-time (provisional)
├── Daily batch for finalization (official)
├── T+1 reports vs T+7 reports
└── Clear labeling on dashboards

Key Takeaways

LATE DATA KEY TAKEAWAYS

1. LATE DATA IS INEVITABLE
   Design for it from day one

2. MEASURE YOUR LATENESS
   Can't set watermarks without data

3. VERSION EVERYTHING
   Audit trail saves debugging time

4. LABEL YOUR DATA
   Users must know live vs final

5. FINALIZE EXPLICITLY
   Batch job produces official numbers

GOLDEN RULES:
├── Event time, not processing time
├── Watermarks matched to reality
├── Nothing dropped silently
├── Versions, not overwrites
└── "Live" and "Final" are both valid

What's Next

Tomorrow, we'll tackle Query Layer and Optimization — making queries over billions of rows return in milliseconds.

TOMORROW'S PREVIEW: QUERY OPTIMIZATION

Questions we'll answer:
├── Which OLAP database should I use?
├── How do materialized views help?
├── When to pre-aggregate vs compute on-the-fly?
├── How to handle multi-tenant query isolation?
└── How to manage query costs at scale?

We have the data modeled and late data handled.
Now we make it FAST.

End of Week 8, Day 4

Next: Day 5 — Query Layer and Optimization