Week 8 — Day 4: Late-Arriving Data and Correctness
System Design Mastery Series — Analytics Pipeline Week
Preface
Yesterday, we built beautiful star schemas with pre-aggregated rollups. Our dashboards are fast. The CFO is happy.
Then Monday morning happens:
THE LATE DATA NIGHTMARE
Friday 6:00 PM: Dashboard shows weekend flash sale revenue
├── Saturday: $1,250,000
├── Sunday: $1,180,000
└── Total: $2,430,000 ✓
Monday 9:00 AM: CFO checks final numbers
├── Saturday: $1,312,000 (+$62,000!)
├── Sunday: $1,245,000 (+$65,000!)
└── Total: $2,557,000 (+$127,000!)
CFO asks: "Why did the numbers change? Which one is right?"
What happened:
├── Mobile app events arrived late (offline purchases)
├── Payment confirmations delayed from processor
├── Timezone issues caused date bucket shifts
├── Batch job picked up events from different windows
└── Manual order entries posted Monday
The dashboard wasn't wrong — it was incomplete.
Late data is the enemy of "real-time" analytics.
Today, we'll learn to handle late-arriving data without losing our minds — or the CFO's trust.
Part I: Foundations
Chapter 1: The Time Problem
1.1 Two Types of Time
EVENT TIME VS PROCESSING TIME
EVENT TIME: When something actually happened
─────────────────────────────────────────
User clicked "Buy" at 2024-01-15 14:30:00 UTC
This is the business-relevant timestamp.
PROCESSING TIME: When we received/processed the event
─────────────────────────────────────────
Server received the event at 2024-01-15 14:30:05 UTC
This is when our system saw it.
THE GAP:
├── Network latency: milliseconds to seconds
├── Client buffering: seconds to minutes
├── Offline mobile: minutes to days
├── Batch uploads: hours to days
├── Partner data feeds: days to weeks
└── Manual corrections: any time
Example timeline:
Event Time: 14:30:00 ──────────────────────────────────────────────▶
│
│ User purchases (offline, in subway)
│
▼
Processing Time: ──────────────────────── 15:45:00 ──────────────────────▶
│
│ User exits subway,
│ app syncs, event arrives
│ 75 MINUTES LATE!
▼
Which "hour" does this purchase belong to?
├── Event time: 14:00-15:00 window ← Correct for analytics
├── Processing time: 15:00-16:00 window ← Wrong!
1.2 Why Late Data Happens
SOURCES OF LATE DATA
1. MOBILE APPS (Most Common)
├── Offline usage (subway, airplane, rural)
├── Battery optimization (batched sends)
├── App backgrounded (delayed sync)
└── Typical lateness: Minutes to hours
2. IOT DEVICES
├── Intermittent connectivity
├── Local buffering
├── Batch uploads
└── Typical lateness: Minutes to days
3. PARTNER/VENDOR DATA
├── Daily batch files
├── API rate limits causing delays
├── Different time zones
└── Typical lateness: Hours to days
4. PAYMENT PROCESSORS
├── Async confirmation
├── Fraud review delays
├── Settlement timing
└── Typical lateness: Minutes to days
5. MANUAL ENTRIES
├── Customer service adjustments
├── Refunds and chargebacks
├── Data corrections
└── Typical lateness: Days to weeks
6. SYSTEM ISSUES
├── Kafka consumer lag
├── Processing failures and retries
├── Backfill operations
└── Typical lateness: Variable
1.3 The Impact on Analytics
HOW LATE DATA BREAKS ANALYTICS
SCENARIO: Hourly revenue dashboard
Without late data handling:
Hour │ Real-time │ Final (next day) │ Difference
────────┼───────────┼──────────────────┼───────────
14:00 │ $50,000 │ $52,500 │ +5%
15:00 │ $48,000 │ $51,200 │ +6.7%
16:00 │ $55,000 │ $57,800 │ +5.1%
Problems:
├── Misleading real-time decisions
├── Reports don't match
├── Trust erodes ("the numbers are always wrong")
├── Alert thresholds trigger incorrectly
└── YoY comparisons are invalid
The business impact:
├── Flash sale looks less successful than it was
├── Staffing decisions based on wrong traffic patterns
├── Marketing ROI appears lower (late conversions)
└── SLAs with partners based on incomplete data
Chapter 2: Watermarks Deep Dive
2.1 What Are Watermarks?
WATERMARKS: TRACKING EVENT TIME PROGRESS
A watermark is a declaration:
"I believe all events with event_time <= W have arrived"
Watermark(T) means:
├── Events with timestamp <= T are (probably) complete
├── Windows ending at T can (probably) be closed
├── Late events (timestamp < T) need special handling
How watermarks progress:
Events arriving (by processing time):
───────────────────────────────────────────────────▶
│ │ │ │ │
10:01 10:02 10:03 10:04 10:05
Event timestamps:
9:58 9:55 10:01 9:59 10:03
Max timestamp seen:
9:58 9:58 10:01 10:01 10:03
Watermark (max - 5 min allowed lateness):
9:53 9:53 9:56 9:56 9:58
The watermark advances as we see newer event times,
but stays behind by the "allowed lateness" buffer.
2.2 Watermark Strategies
# late_data/watermark_strategies.py
"""
Watermark generation strategies.
Different strategies for different use cases.
"""
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, Dict
from abc import ABC, abstractmethod
class WatermarkStrategy(ABC):
"""Base class for watermark strategies."""
@abstractmethod
def on_event(self, event_time: datetime) -> datetime:
"""Update watermark based on new event."""
pass
@abstractmethod
def get_watermark(self) -> datetime:
"""Get current watermark."""
pass
class BoundedOutOfOrdernessWatermark(WatermarkStrategy):
"""
Most common strategy: Allow fixed lateness.
Watermark = max_event_time - max_lateness
Use when: Lateness is bounded and predictable
Example: Mobile app with max 1 hour offline
"""
def __init__(self, max_lateness: timedelta):
self.max_lateness = max_lateness
self.max_event_time: Optional[datetime] = None
def on_event(self, event_time: datetime) -> datetime:
if self.max_event_time is None:
self.max_event_time = event_time
else:
self.max_event_time = max(self.max_event_time, event_time)
return self.get_watermark()
def get_watermark(self) -> datetime:
if self.max_event_time is None:
return datetime.min
return self.max_event_time - self.max_lateness
class PerKeyWatermark(WatermarkStrategy):
"""
Track watermark per key (e.g., per user, per device).
Global watermark = min(all key watermarks)
Use when: Different sources have different lateness
Example: Some users always online, some often offline
"""
def __init__(self, max_lateness: timedelta, idle_timeout: timedelta):
self.max_lateness = max_lateness
self.idle_timeout = idle_timeout
self.key_watermarks: Dict[str, datetime] = {}
self.key_last_seen: Dict[str, datetime] = {}
def on_event_for_key(
self,
key: str,
event_time: datetime,
processing_time: datetime
) -> datetime:
# Update key's max event time
if key not in self.key_watermarks:
self.key_watermarks[key] = event_time
else:
self.key_watermarks[key] = max(
self.key_watermarks[key],
event_time
)
self.key_last_seen[key] = processing_time
return self.get_watermark()
def get_watermark(self) -> datetime:
now = datetime.utcnow()
# Filter out idle keys (haven't seen events recently)
active_watermarks = []
for key, wm in self.key_watermarks.items():
last_seen = self.key_last_seen.get(key, now)
if now - last_seen < self.idle_timeout:
active_watermarks.append(wm - self.max_lateness)
if not active_watermarks:
return datetime.min
# Global watermark is minimum of active key watermarks
return min(active_watermarks)
class PunctuatedWatermark(WatermarkStrategy):
"""
Watermarks embedded in the data stream.
Use when: Source can guarantee ordering
Example: Database CDC with transaction IDs
"""
def __init__(self):
self.current_watermark = datetime.min
def on_event(self, event_time: datetime) -> datetime:
# Regular events don't advance watermark
return self.current_watermark
def on_watermark_event(self, watermark_time: datetime) -> datetime:
# Special watermark events advance the watermark
self.current_watermark = max(
self.current_watermark,
watermark_time
)
return self.current_watermark
def get_watermark(self) -> datetime:
return self.current_watermark
class ProcessingTimeWatermark(WatermarkStrategy):
"""
Use processing time as watermark (ignore event time).
Use when: Event time doesn't matter or isn't reliable
Example: Log processing where arrival order is fine
"""
def __init__(self, delay: timedelta = timedelta(seconds=0)):
self.delay = delay
def on_event(self, event_time: datetime) -> datetime:
return self.get_watermark()
def get_watermark(self) -> datetime:
return datetime.utcnow() - self.delay
2.3 Choosing Lateness Tolerance
HOW TO CHOOSE ALLOWED LATENESS
Step 1: Analyze your data
─────────────────────────
Query historical events:
SELECT
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY lateness) as p50,
PERCENTILE_CONT(0.90) WITHIN GROUP (ORDER BY lateness) as p90,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY lateness) as p99,
MAX(lateness) as max_lateness
FROM (
SELECT
EXTRACT(EPOCH FROM (received_time - event_time)) as lateness
FROM events
WHERE received_time > event_time -- Exclude clock issues
);
Example results:
├── p50: 2 seconds
├── p90: 30 seconds
├── p99: 5 minutes
├── p999: 30 minutes
└── max: 72 hours (outliers)
Step 2: Decide on trade-offs
────────────────────────────
Allowed Lateness │ Events Captured │ Window Delay │ Memory
────────────────┼─────────────────┼──────────────┼────────
30 seconds │ ~90% │ Low │ Low
5 minutes │ ~99% │ Medium │ Medium
30 minutes │ ~99.9% │ High │ High
1 hour │ ~99.95% │ Very High │ Very High
Step 3: Match to business requirements
──────────────────────────────────────
Real-time fraud detection:
├── Lateness tolerance: 30 seconds
├── Rationale: Speed matters more than completeness
└── Late events: Process but don't undo decisions
Daily revenue reporting:
├── Lateness tolerance: 1 hour
├── Rationale: Accuracy matters, some delay okay
└── Late events: Trigger recomputation
Monthly financial close:
├── Lateness tolerance: 7 days
├── Rationale: Must be accurate
└── Late events: Wait for them
Chapter 3: Windowing and Late Data
3.1 Window Types and Late Events
HOW WINDOWS HANDLE LATE DATA
TUMBLING WINDOWS (Non-overlapping)
──────────────────────────────────
Windows: [00:00-01:00), [01:00-02:00), [02:00-03:00)
Event arrives at processing time 02:30 with event time 00:45
├── Window [00:00-01:00) already fired (watermark passed 01:00)
├── Event is LATE
└── Options: Drop, side output, or trigger update
SLIDING WINDOWS (Overlapping)
─────────────────────────────
Windows: [00:00-01:00), [00:15-01:15), [00:30-01:30)...
Late event may be late for MULTIPLE windows!
├── Event time 00:45, arrives after 01:30
├── Late for: [00:00-01:00), [00:15-01:15), [00:30-01:30)
└── Potentially many windows need updates
SESSION WINDOWS (Activity-based)
────────────────────────────────
Sessions close after inactivity gap.
Late event may:
├── Extend a closed session
├── Merge two sessions that were separate
├── Create a new session in the past
└── Complexity: Very high for late data handling
3.2 Late Data Handling Strategies
# late_data/handling_strategies.py
"""
Strategies for handling late-arriving data.
"""
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Callable
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class LateDataAction(Enum):
"""What to do with late data."""
DROP = "drop" # Ignore late events
SIDE_OUTPUT = "side_output" # Route to separate stream
UPDATE_RESULT = "update_result" # Update the window result
RECOMPUTE = "recompute" # Trigger full recomputation
@dataclass
class LateDataConfig:
"""Configuration for late data handling."""
allowed_lateness: timedelta
action: LateDataAction
side_output_topic: Optional[str] = None
max_updates_per_window: int = 10
@dataclass
class WindowResult:
"""Result from a window computation."""
window_start: datetime
window_end: datetime
result: Dict[str, Any]
is_final: bool
update_count: int = 0
last_updated: datetime = field(default_factory=datetime.utcnow)
class LateDataHandler:
"""
Handles late-arriving events in windowed computations.
"""
def __init__(self, config: LateDataConfig):
self.config = config
self.window_results: Dict[str, WindowResult] = {}
self.late_events: List[Dict] = []
# Metrics
self.on_time_count = 0
self.late_accepted_count = 0
self.late_dropped_count = 0
def process_event(
self,
event: Dict,
watermark: datetime,
aggregate_fn: Callable
) -> Optional[Dict]:
"""
Process an event, handling lateness appropriately.
"""
event_time = datetime.fromisoformat(event["event_time"])
window_key = self._get_window_key(event_time)
window_start, window_end = self._parse_window_key(window_key)
# Check if event is late
is_late = event_time < watermark
if not is_late:
# On-time event: normal processing
self.on_time_count += 1
return self._update_window(window_key, event, aggregate_fn, is_late=False)
# Event is late
lateness = watermark - event_time
# Check if within allowed lateness
if lateness <= self.config.allowed_lateness:
return self._handle_late_within_tolerance(
window_key, event, aggregate_fn, lateness
)
else:
return self._handle_late_beyond_tolerance(event, lateness)
def _handle_late_within_tolerance(
self,
window_key: str,
event: Dict,
aggregate_fn: Callable,
lateness: timedelta
) -> Optional[Dict]:
"""Handle event that's late but within tolerance."""
self.late_accepted_count += 1
if self.config.action == LateDataAction.DROP:
logger.debug(f"Dropping late event (within tolerance): {lateness}")
return None
elif self.config.action == LateDataAction.SIDE_OUTPUT:
self.late_events.append({
"event": event,
"lateness_seconds": lateness.total_seconds(),
"action": "side_output"
})
return None
elif self.config.action == LateDataAction.UPDATE_RESULT:
# Check update limit
existing = self.window_results.get(window_key)
if existing and existing.update_count >= self.config.max_updates_per_window:
logger.warning(
f"Window {window_key} exceeded max updates, dropping event"
)
return None
return self._update_window(window_key, event, aggregate_fn, is_late=True)
return None
def _handle_late_beyond_tolerance(
self,
event: Dict,
lateness: timedelta
) -> Optional[Dict]:
"""Handle event that's later than allowed tolerance."""
self.late_dropped_count += 1
logger.info(
f"Dropping very late event: {lateness} exceeds "
f"{self.config.allowed_lateness}"
)
# Always side-output very late events for analysis
self.late_events.append({
"event": event,
"lateness_seconds": lateness.total_seconds(),
"action": "dropped_beyond_tolerance"
})
return None
def _update_window(
self,
window_key: str,
event: Dict,
aggregate_fn: Callable,
is_late: bool
) -> Dict:
"""Update or create window result."""
window_start, window_end = self._parse_window_key(window_key)
existing = self.window_results.get(window_key)
if existing:
# Update existing window
new_result = aggregate_fn(existing.result, event)
existing.result = new_result
existing.update_count += 1
existing.last_updated = datetime.utcnow()
existing.is_final = False # No longer final since it was updated
return {
"window_key": window_key,
"result": new_result,
"is_update": True,
"is_late_update": is_late,
"update_count": existing.update_count
}
else:
# New window
result = aggregate_fn(None, event)
self.window_results[window_key] = WindowResult(
window_start=window_start,
window_end=window_end,
result=result,
is_final=False,
update_count=0
)
return {
"window_key": window_key,
"result": result,
"is_update": False,
"is_late_update": is_late,
"update_count": 0
}
def finalize_window(self, window_key: str) -> Optional[WindowResult]:
"""Mark a window as final (watermark passed window end)."""
if window_key in self.window_results:
result = self.window_results[window_key]
result.is_final = True
return result
return None
def _get_window_key(self, event_time: datetime) -> str:
"""Get window key for an event time (1-hour tumbling)."""
window_start = event_time.replace(minute=0, second=0, microsecond=0)
window_end = window_start + timedelta(hours=1)
return f"{window_start.isoformat()}_{window_end.isoformat()}"
def _parse_window_key(self, key: str) -> tuple:
"""Parse window key back to start/end times."""
start_str, end_str = key.split("_")
return (
datetime.fromisoformat(start_str),
datetime.fromisoformat(end_str)
)
def get_metrics(self) -> Dict:
"""Get late data handling metrics."""
total = self.on_time_count + self.late_accepted_count + self.late_dropped_count
return {
"on_time_count": self.on_time_count,
"late_accepted_count": self.late_accepted_count,
"late_dropped_count": self.late_dropped_count,
"on_time_rate": self.on_time_count / total if total > 0 else 0,
"late_rate": (self.late_accepted_count + self.late_dropped_count) / total if total > 0 else 0
}
3.3 Allowed Lateness with State Cleanup
# late_data/state_cleanup.py
"""
Managing state with allowed lateness.
Problem: If we keep window state forever to handle late data,
memory grows unbounded.
Solution: Use allowed lateness + cleanup timers.
"""
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
import logging
logger = logging.getLogger(__name__)
class WindowStateManager:
"""
Manages window state with cleanup for late data handling.
State lifecycle:
1. Window created when first event arrives
2. Window fires when watermark passes window end
3. State kept for allowed_lateness period (for updates)
4. State cleaned up after allowed_lateness expires
"""
def __init__(
self,
allowed_lateness: timedelta,
cleanup_interval: timedelta = timedelta(minutes=5)
):
self.allowed_lateness = allowed_lateness
self.cleanup_interval = cleanup_interval
self.windows: Dict[str, Dict] = {}
self.cleanup_timers: Dict[str, datetime] = {}
self.last_cleanup = datetime.utcnow()
def get_or_create_window(
self,
window_key: str,
window_end: datetime
) -> Dict:
"""Get existing window state or create new."""
if window_key not in self.windows:
self.windows[window_key] = {
"count": 0,
"sum": 0,
"created_at": datetime.utcnow(),
"window_end": window_end,
"fired": False,
"update_count": 0
}
# Set cleanup timer
cleanup_time = window_end + self.allowed_lateness
self.cleanup_timers[window_key] = cleanup_time
return self.windows[window_key]
def update_window(self, window_key: str, value: float):
"""Add value to window aggregate."""
if window_key in self.windows:
self.windows[window_key]["count"] += 1
self.windows[window_key]["sum"] += value
if self.windows[window_key]["fired"]:
self.windows[window_key]["update_count"] += 1
def fire_window(self, window_key: str) -> Optional[Dict]:
"""Fire window and return result."""
if window_key not in self.windows:
return None
window = self.windows[window_key]
window["fired"] = True
return {
"window_key": window_key,
"count": window["count"],
"sum": window["sum"],
"avg": window["sum"] / window["count"] if window["count"] > 0 else 0,
"is_final": False # May still receive late updates
}
def maybe_cleanup(self, current_watermark: datetime):
"""
Clean up expired window states.
Called periodically to free memory.
"""
now = datetime.utcnow()
if now - self.last_cleanup < self.cleanup_interval:
return
self.last_cleanup = now
windows_to_remove = []
for window_key, cleanup_time in self.cleanup_timers.items():
if current_watermark > cleanup_time:
windows_to_remove.append(window_key)
for window_key in windows_to_remove:
# Emit final result before cleanup
if window_key in self.windows:
window = self.windows[window_key]
logger.debug(
f"Cleaning up window {window_key}, "
f"received {window['update_count']} late updates"
)
del self.windows[window_key]
del self.cleanup_timers[window_key]
if windows_to_remove:
logger.info(f"Cleaned up {len(windows_to_remove)} expired windows")
def get_state_size(self) -> Dict:
"""Get current state size metrics."""
return {
"window_count": len(self.windows),
"timer_count": len(self.cleanup_timers),
"total_events": sum(w["count"] for w in self.windows.values())
}
Chapter 4: Reprocessing and Corrections
4.1 When to Reprocess
REPROCESSING VS PATCHING
PATCHING (Incremental update)
─────────────────────────────
Apply late events to existing results.
Use when:
├── Late data is small volume (<5% of original)
├── Aggregations are additive (SUM, COUNT)
├── Need quick correction
└── Can handle non-final results
Example:
Original: revenue = $100,000
Late event: +$500
Patched: revenue = $100,500
REPROCESSING (Full recompute)
─────────────────────────────
Recalculate everything from raw events.
Use when:
├── Late data is significant volume
├── Aggregations are non-additive (AVG, MEDIAN, DISTINCT)
├── Need guaranteed accuracy
├── Schema or logic changed
└── Data corruption detected
Example:
Re-read all events for the day
Recalculate all aggregations
Replace existing results
4.2 Implementing Reprocessing
# late_data/reprocessing.py
"""
Reprocessing pipeline for late data and corrections.
Strategies:
1. Micro-batch reprocessing (hourly windows)
2. Daily batch reprocessing
3. On-demand backfill
"""
from dataclasses import dataclass
from datetime import datetime, date, timedelta
from typing import Dict, Any, List, Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class ReprocessingJob:
"""Defines a reprocessing job."""
job_id: str
start_time: datetime
end_time: datetime
reason: str
status: str = "pending"
created_at: datetime = None
completed_at: datetime = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.utcnow()
class ReprocessingManager:
"""
Manages reprocessing of historical data.
"""
def __init__(
self,
raw_data_source,
aggregation_pipeline,
output_sink
):
self.source = raw_data_source
self.pipeline = aggregation_pipeline
self.sink = output_sink
self.jobs: Dict[str, ReprocessingJob] = {}
async def trigger_reprocessing(
self,
start_time: datetime,
end_time: datetime,
reason: str
) -> ReprocessingJob:
"""
Trigger reprocessing for a time range.
"""
job_id = f"reprocess_{start_time.strftime('%Y%m%d%H')}_{end_time.strftime('%Y%m%d%H')}"
job = ReprocessingJob(
job_id=job_id,
start_time=start_time,
end_time=end_time,
reason=reason
)
self.jobs[job_id] = job
logger.info(
f"Triggered reprocessing job {job_id}: "
f"{start_time} to {end_time}, reason: {reason}"
)
# Execute reprocessing
await self._execute_reprocessing(job)
return job
async def _execute_reprocessing(self, job: ReprocessingJob):
"""Execute the reprocessing job."""
try:
job.status = "running"
# Step 1: Read raw events for time range
events = await self.source.read_events(
start_time=job.start_time,
end_time=job.end_time
)
logger.info(f"Read {len(events)} events for reprocessing")
# Step 2: Run aggregation pipeline
results = await self.pipeline.process_batch(events)
# Step 3: Write results (replace existing)
await self.sink.write_results(
results,
mode="overwrite", # Replace, not append
time_range=(job.start_time, job.end_time)
)
job.status = "completed"
job.completed_at = datetime.utcnow()
logger.info(
f"Reprocessing job {job.job_id} completed, "
f"processed {len(events)} events"
)
except Exception as e:
job.status = "failed"
logger.error(f"Reprocessing job {job.job_id} failed: {e}")
raise
async def schedule_daily_reprocessing(self, target_date: date):
"""
Schedule end-of-day reprocessing to finalize numbers.
Runs after allowed_lateness period to capture late data.
"""
start = datetime.combine(target_date, datetime.min.time())
end = datetime.combine(target_date + timedelta(days=1), datetime.min.time())
return await self.trigger_reprocessing(
start_time=start,
end_time=end,
reason="scheduled_daily_finalization"
)
class IncrementalPatcher:
"""
Applies incremental patches for late data.
More efficient than full reprocessing for small corrections.
"""
def __init__(self, result_store):
self.store = result_store
async def apply_late_events(
self,
late_events: List[Dict],
aggregations: Dict[str, str] # column -> aggregation type
) -> Dict[str, int]:
"""
Apply late events as incremental patches.
Only works for additive aggregations!
"""
patches_by_window = {}
for event in late_events:
window_key = self._get_window_key(event)
if window_key not in patches_by_window:
patches_by_window[window_key] = {
"count": 0,
"updates": {}
}
patches_by_window[window_key]["count"] += 1
for col, agg_type in aggregations.items():
value = event.get(col, 0)
if agg_type == "SUM":
patches_by_window[window_key]["updates"][col] = \
patches_by_window[window_key]["updates"].get(col, 0) + value
elif agg_type == "COUNT":
patches_by_window[window_key]["updates"][col] = \
patches_by_window[window_key]["updates"].get(col, 0) + 1
# Note: AVG, MEDIAN, DISTINCT require reprocessing!
# Apply patches to store
patched_count = 0
for window_key, patch in patches_by_window.items():
await self.store.apply_patch(window_key, patch["updates"])
patched_count += patch["count"]
return {
"windows_patched": len(patches_by_window),
"events_applied": patched_count
}
def _get_window_key(self, event: Dict) -> str:
"""Get window key from event."""
event_time = datetime.fromisoformat(event["event_time"])
return event_time.strftime("%Y-%m-%d-%H")
4.3 Versioned Results
# late_data/versioning.py
"""
Versioned analytics results.
Instead of updating results in place, maintain versions:
- Each version has a timestamp
- Queries can request specific version or latest
- Audit trail of how numbers changed
"""
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any, List, Optional
import json
@dataclass
class ResultVersion:
"""A version of an analytics result."""
version_id: int
window_key: str
result: Dict[str, Any]
computed_at: datetime
event_count: int
is_final: bool
reason: str # "initial", "late_data", "correction", "reprocessing"
class VersionedResultStore:
"""
Stores versioned analytics results.
Benefits:
1. Audit trail: See how numbers changed over time
2. Debugging: Compare versions to find issues
3. Consistency: Queries can pin to a version
4. Rollback: Revert to previous version if needed
"""
def __init__(self, storage_backend):
self.storage = storage_backend
async def store_result(
self,
window_key: str,
result: Dict[str, Any],
event_count: int,
is_final: bool,
reason: str
) -> ResultVersion:
"""
Store a new version of a result.
"""
# Get next version number
existing = await self.storage.get_versions(window_key)
next_version = len(existing) + 1
version = ResultVersion(
version_id=next_version,
window_key=window_key,
result=result,
computed_at=datetime.utcnow(),
event_count=event_count,
is_final=is_final,
reason=reason
)
await self.storage.save_version(version)
return version
async def get_latest(self, window_key: str) -> Optional[ResultVersion]:
"""Get the most recent version."""
versions = await self.storage.get_versions(window_key)
return versions[-1] if versions else None
async def get_final(self, window_key: str) -> Optional[ResultVersion]:
"""Get the final (official) version."""
versions = await self.storage.get_versions(window_key)
for v in reversed(versions):
if v.is_final:
return v
return versions[-1] if versions else None
async def get_at_time(
self,
window_key: str,
as_of: datetime
) -> Optional[ResultVersion]:
"""Get the version that was current at a specific time."""
versions = await self.storage.get_versions(window_key)
# Find latest version computed before as_of time
valid_versions = [
v for v in versions if v.computed_at <= as_of
]
return valid_versions[-1] if valid_versions else None
async def get_version_history(
self,
window_key: str
) -> List[Dict]:
"""Get full version history for debugging."""
versions = await self.storage.get_versions(window_key)
history = []
for i, v in enumerate(versions):
entry = {
"version": v.version_id,
"computed_at": v.computed_at.isoformat(),
"result": v.result,
"event_count": v.event_count,
"is_final": v.is_final,
"reason": v.reason
}
# Add delta from previous version
if i > 0:
prev = versions[i-1]
entry["delta"] = self._compute_delta(prev.result, v.result)
history.append(entry)
return history
def _compute_delta(
self,
old_result: Dict,
new_result: Dict
) -> Dict:
"""Compute difference between two result versions."""
delta = {}
for key in set(old_result.keys()) | set(new_result.keys()):
old_val = old_result.get(key, 0)
new_val = new_result.get(key, 0)
if isinstance(old_val, (int, float)) and isinstance(new_val, (int, float)):
diff = new_val - old_val
if diff != 0:
delta[key] = {
"old": old_val,
"new": new_val,
"diff": diff,
"pct_change": (diff / old_val * 100) if old_val != 0 else None
}
return delta
VERSION_HISTORY_EXAMPLE = """
VERSION HISTORY FOR WINDOW 2024-01-15-14
Version 1 (Initial):
computed_at: 2024-01-15T15:00:00Z
result: {revenue: 50000, orders: 500}
event_count: 500
is_final: false
reason: initial
Version 2 (Late data):
computed_at: 2024-01-15T16:30:00Z
result: {revenue: 52500, orders: 525}
event_count: 525
is_final: false
reason: late_data
delta: {
revenue: {old: 50000, new: 52500, diff: +2500, pct: +5%},
orders: {old: 500, new: 525, diff: +25, pct: +5%}
}
Version 3 (Final):
computed_at: 2024-01-16T01:00:00Z
result: {revenue: 53200, orders: 532}
event_count: 532
is_final: true
reason: daily_finalization
delta: {
revenue: {old: 52500, new: 53200, diff: +700, pct: +1.3%},
orders: {old: 525, new: 532, diff: +7, pct: +1.3%}
}
This shows:
- Initial result was 5% low (mobile offline events)
- Daily finalization added 1.3% more (payment confirmations)
- Total correction: +6.4% from initial to final
"""
Part II: Implementation
Chapter 5: Production Late Data Pipeline
5.1 End-to-End Architecture
LATE DATA HANDLING ARCHITECTURE
┌────────────────────────────────────────────────────────────────────────┐
│ │
│ EVENT SOURCES │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Web │ │ Mobile │ │ API │ │ Partners│ │
│ │(on-time)│ │(late) │ │(on-time)│ │(batch) │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │
└───────┼────────────┼────────────┼────────────┼─────────────────────────┘
│ │ │ │
└────────────┴──────┬─────┴────────────┘
│
▼
┌────────────────────────────────────────────────────────────────────────┐
│ INGESTION LAYER │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ Kafka (events topic) │ │
│ │ Partition by user_id for ordering │ │
│ │ 7-day retention for replay │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │ │
└────────────────────────────┼───────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────────────┐
│ STREAM PROCESSING LAYER │
│ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ Flink Job │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Watermark │ │ Window │ │ Late │ │ │
│ │ │ Generator │──│ Aggregator │──│ Handler │ │ │
│ │ │ (5min late) │ │ (1hr tumble) │ │(side output) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌────────────┐ ┌────────────┐ │
│ │ On-time │ │ Late │ │
│ │ Results │ │ Events │ │
│ └─────┬──────┘ └─────┬──────┘ │
│ │ │ │
└──────────────────┼──────────────────────────────┼─────────────────────┘
│ │
▼ ▼
┌───────────────────────────────────────────────────────────────────────┐
│ SERVING LAYER │
│ │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ Druid (Real-time) │ │ Late Events Queue │ │
│ │ Versioned results │ │ (for patching) │ │
│ └──────────┬───────────┘ └──────────┬───────────┘ │
│ │ │ │
└─────────────┼───────────────────────────────┼─────────────────────────┘
│ │
│ ▼
│ ┌─────────────────────────────────┐
│ │ Batch Reprocessing │
│ │ (Daily finalization job) │
│ │ Spark: Read raw → Aggregate │
│ │ → Overwrite results │
│ └─────────────────────────────────┘
│ │
└───────────────┬───────────────┘
│
▼
┌─────────────────────┐
│ Dashboard API │
│ - Latest version │
│ - Final version │
│ - As-of version │
└─────────────────────┘
5.2 Complete Implementation
# late_data/production_pipeline.py
"""
Production late data handling pipeline.
Combines streaming for real-time with batch for accuracy.
"""
from dataclasses import dataclass, field
from datetime import datetime, timedelta, date
from typing import Dict, Any, List, Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
@dataclass
class PipelineConfig:
"""Configuration for the late data pipeline."""
# Watermark settings
allowed_lateness: timedelta = timedelta(minutes=5)
watermark_idle_timeout: timedelta = timedelta(minutes=1)
# Window settings
window_size: timedelta = timedelta(hours=1)
# Reprocessing settings
daily_reprocessing_hour: int = 2 # 2 AM
reprocessing_lookback: timedelta = timedelta(days=1)
# Output settings
realtime_output: str = "druid://metrics"
batch_output: str = "s3://warehouse/metrics"
late_events_output: str = "kafka://late-events"
class LateDataPipeline:
"""
Complete pipeline for handling late data in analytics.
"""
def __init__(
self,
config: PipelineConfig,
kafka_source,
flink_env,
spark_session,
result_store: VersionedResultStore
):
self.config = config
self.kafka = kafka_source
self.flink = flink_env
self.spark = spark_session
self.results = result_store
# Components
self.watermark_strategy = BoundedOutOfOrdernessWatermark(
config.allowed_lateness
)
self.late_handler = LateDataHandler(
LateDataConfig(
allowed_lateness=config.allowed_lateness,
action=LateDataAction.SIDE_OUTPUT,
side_output_topic=config.late_events_output
)
)
self.patcher = IncrementalPatcher(result_store)
self.reprocessor = ReprocessingManager(
raw_data_source=kafka_source,
aggregation_pipeline=self,
output_sink=result_store
)
async def start_streaming(self):
"""
Start the streaming pipeline.
This runs continuously, processing events as they arrive.
"""
logger.info("Starting streaming pipeline")
# Read from Kafka
events = await self.kafka.read_stream("events")
# Process events with watermarks
async for event in events:
await self._process_streaming_event(event)
async def _process_streaming_event(self, event: Dict):
"""Process a single event in streaming mode."""
event_time = datetime.fromisoformat(event["event_time"])
# Update watermark
watermark = self.watermark_strategy.on_event(event_time)
# Process through late handler
result = self.late_handler.process_event(
event=event,
watermark=watermark,
aggregate_fn=self._aggregate_event
)
if result:
# Store result version
await self.results.store_result(
window_key=result["window_key"],
result=result["result"],
event_count=result.get("event_count", 1),
is_final=False,
reason="late_data" if result["is_late_update"] else "streaming"
)
def _aggregate_event(
self,
current: Optional[Dict],
event: Dict
) -> Dict:
"""Aggregate function for events."""
if current is None:
current = {
"revenue": 0,
"orders": 0,
"items": 0
}
if event.get("event_type") == "order.placed":
current["revenue"] += event.get("payload", {}).get("total", 0)
current["orders"] += 1
current["items"] += event.get("payload", {}).get("item_count", 0)
return current
async def run_daily_finalization(self, target_date: date):
"""
Run daily finalization batch job.
This reprocesses the full day to produce final numbers.
"""
logger.info(f"Running daily finalization for {target_date}")
start_time = datetime.combine(target_date, datetime.min.time())
end_time = start_time + timedelta(days=1)
# Read all events for the day from raw storage
events_df = self.spark.read.parquet(
f"s3://data-lake/events/date={target_date}"
)
# Aggregate by hour
results = events_df.filter(
"event_type = 'order.placed'"
).groupBy(
"date_trunc('hour', event_time) as window_start"
).agg({
"payload.total": "sum",
"*": "count",
"payload.item_count": "sum"
}).collect()
# Store final results
for row in results:
window_key = row["window_start"].strftime("%Y-%m-%d-%H")
await self.results.store_result(
window_key=window_key,
result={
"revenue": row["sum(total)"],
"orders": row["count(*)"],
"items": row["sum(item_count)"]
},
event_count=row["count(*)"],
is_final=True,
reason="daily_finalization"
)
logger.info(
f"Daily finalization complete for {target_date}, "
f"finalized {len(results)} hourly windows"
)
async def apply_late_event_patches(self):
"""
Process queued late events as patches.
Runs periodically to update results with late data
without full reprocessing.
"""
late_events = self.late_handler.late_events
if not late_events:
return
logger.info(f"Applying {len(late_events)} late event patches")
# Apply as incremental patches
result = await self.patcher.apply_late_events(
late_events=[e["event"] for e in late_events],
aggregations={
"revenue": "SUM",
"orders": "COUNT",
"items": "SUM"
}
)
# Clear processed events
self.late_handler.late_events = []
logger.info(
f"Applied patches to {result['windows_patched']} windows, "
f"{result['events_applied']} events"
)
async def get_metrics_for_dashboard(
self,
window_key: str,
version_type: str = "latest" # "latest", "final", or ISO timestamp
) -> Dict:
"""
Get metrics for dashboard display.
Supports different version types:
- latest: Most recent computation (may change)
- final: Official finalized number (stable)
- timestamp: As-of a specific time (for auditing)
"""
if version_type == "latest":
version = await self.results.get_latest(window_key)
elif version_type == "final":
version = await self.results.get_final(window_key)
else:
# Assume it's a timestamp
as_of = datetime.fromisoformat(version_type)
version = await self.results.get_at_time(window_key, as_of)
if not version:
return {"error": "No data found"}
return {
"window_key": window_key,
"result": version.result,
"computed_at": version.computed_at.isoformat(),
"is_final": version.is_final,
"version": version.version_id,
"event_count": version.event_count,
# Include staleness indicator
"staleness": (datetime.utcnow() - version.computed_at).total_seconds()
}
5.3 Dashboard Integration
# late_data/dashboard_api.py
"""
API for dashboards to query late-data-aware metrics.
"""
from fastapi import FastAPI, Query
from datetime import datetime, date
from typing import Optional
app = FastAPI(title="Analytics API")
@app.get("/metrics/{metric_name}")
async def get_metric(
metric_name: str,
window: str = Query(..., description="Window key (e.g., 2024-01-15-14)"),
version: str = Query(
"latest",
description="Version: 'latest', 'final', or ISO timestamp"
),
include_history: bool = Query(
False,
description="Include version history"
)
):
"""
Get a metric value with late-data awareness.
Returns:
- result: The metric values
- is_final: Whether this is the finalized number
- staleness: Seconds since last computation
- versions: (optional) Full version history
"""
pipeline = get_pipeline() # Dependency injection
result = await pipeline.get_metrics_for_dashboard(
window_key=window,
version_type=version
)
if include_history:
result["history"] = await pipeline.results.get_version_history(window)
return result
@app.get("/metrics/{metric_name}/compare")
async def compare_versions(
metric_name: str,
window: str,
version_a: str = "latest",
version_b: str = "final"
):
"""
Compare two versions of a metric.
Useful for seeing real-time vs final discrepancy.
"""
pipeline = get_pipeline()
result_a = await pipeline.get_metrics_for_dashboard(window, version_a)
result_b = await pipeline.get_metrics_for_dashboard(window, version_b)
comparison = {
"version_a": result_a,
"version_b": result_b,
"delta": {}
}
# Calculate delta
for key in result_a.get("result", {}):
val_a = result_a["result"].get(key, 0)
val_b = result_b["result"].get(key, 0)
if isinstance(val_a, (int, float)):
comparison["delta"][key] = {
"absolute": val_b - val_a,
"percentage": ((val_b - val_a) / val_a * 100) if val_a else None
}
return comparison
@app.get("/data-quality/lateness")
async def get_lateness_metrics(
start_date: date,
end_date: date
):
"""
Get data lateness metrics for monitoring.
Shows how much data arrives late and how late.
"""
pipeline = get_pipeline()
handler = pipeline.late_handler
metrics = handler.get_metrics()
return {
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat()
},
"on_time_rate": metrics["on_time_rate"],
"late_rate": metrics["late_rate"],
"late_accepted": metrics["late_accepted_count"],
"late_dropped": metrics["late_dropped_count"],
"recommendations": _get_recommendations(metrics)
}
def _get_recommendations(metrics: dict) -> list:
"""Generate recommendations based on lateness metrics."""
recommendations = []
if metrics["late_rate"] > 0.10: # >10% late
recommendations.append({
"severity": "high",
"message": "High late data rate. Consider increasing allowed lateness.",
"current_late_rate": f"{metrics['late_rate']*100:.1f}%"
})
if metrics["late_dropped_count"] > 0:
recommendations.append({
"severity": "medium",
"message": f"{metrics['late_dropped_count']} events dropped as too late. "
"Review late event patterns."
})
return recommendations
Part III: Real-World Application
Chapter 6: Case Studies
6.1 LinkedIn's Lambda+ Architecture
LINKEDIN'S APPROACH TO LATE DATA
Challenge:
├── 500B+ events/day
├── Events from mobile can be hours late
├── Business needs both real-time and accurate daily numbers
Solution: Lambda+ Architecture
┌────────────────────────────────────────────────────────────────────────┐
│ LINKEDIN LAMBDA+ ARCHITECTURE │
│ │
│ Speed Layer (Real-time): │
│ ├── Samza for streaming aggregations │
│ ├── 5-minute allowed lateness │
│ ├── Results marked as "provisional" │
│ └── Updates dashboards immediately │
│ │
│ Batch Layer (Accuracy): │
│ ├── Spark jobs run T+1 day (24h after) │
│ ├── Processes ALL events including late │
│ ├── Results marked as "final" │
│ └── Used for reporting, ML training │
│ │
│ Reconciliation Layer (Trust): │
│ ├── Compares speed vs batch results │
│ ├── Alerts if discrepancy > threshold │
│ ├── Auto-adjusts allowed lateness based on patterns │
│ └── Generates data quality reports │
│ │
│ Key Innovation: AUTOMATED LATENESS TUNING │
│ ├── Analyze historical lateness distribution │
│ ├── Set allowed_lateness to capture 99% of events │
│ ├── Per-event-type tuning (mobile vs web) │
│ └── Continuous adjustment based on trends │
│ │
└────────────────────────────────────────────────────────────────────────┘
6.2 Uber's Correction Pattern
UBER'S LATE DATA CORRECTION PATTERN
Challenge:
├── Trip events can be delayed (driver phone offline)
├── Payment events arrive later (processing time)
├── Financial reports must be accurate
Solution: T+1 and T+7 Reports
┌────────────────────────────────────────────────────────────────────────┐
│ │
│ REPORTING TIERS │
│ │
│ Real-time (T+0): │
│ ├── Shows: "Today so far" │
│ ├── Completeness: ~95% │
│ ├── Use case: Operations dashboard │
│ └── Label: "Live (updating)" │
│ │
│ Preliminary (T+1): │
│ ├── Shows: "Yesterday" │
│ ├── Completeness: ~99% │
│ ├── Use case: Daily standup, operations review │
│ └── Label: "Preliminary" │
│ │
│ Final (T+7): │
│ ├── Shows: "Last week" │
│ ├── Completeness: ~99.9% │
│ ├── Use case: Finance, compliance, ML training │
│ └── Label: "Final" │
│ │
│ CORRECTION WORKFLOW: │
│ │
│ Day 0: Real-time shows $10M │
│ Day 1: T+1 job runs, updates to $10.2M (+2%) │
│ Day 7: T+7 job runs, finalizes at $10.25M (+0.5%) │
│ Day 8+: Immutable (any changes are "adjustments" in new period) │
│ │
└────────────────────────────────────────────────────────────────────────┘
Chapter 7: Common Mistakes
7.1 Late Data Anti-Patterns
LATE DATA ANTI-PATTERNS
❌ MISTAKE 1: Ignoring Late Data
Wrong:
# Just use processing time
df.groupBy(window(col("processing_time"), "1 hour")).sum()
Problem:
Mobile offline events land in wrong hour
Numbers shift unpredictably
Right:
# Use event time with watermarks
df.withWatermark("event_time", "5 minutes")
.groupBy(window(col("event_time"), "1 hour"))
.sum()
❌ MISTAKE 2: Infinite Allowed Lateness
Wrong:
allowed_lateness = timedelta(days=365) # Wait forever!
Problem:
State grows unbounded → OOM
Windows never close → no results
Right:
# Match lateness to business reality
allowed_lateness = timedelta(hours=1) # For web
allowed_lateness = timedelta(hours=24) # For mobile
❌ MISTAKE 3: No Versioning
Wrong:
# Overwrite results in place
result_table.update(window_key, new_value)
Problem:
No audit trail
Can't explain changes
"Why did yesterday's number change?"
Right:
# Version all results
result_table.insert_version(
window_key, new_value,
version=next_version,
reason="late_data"
)
❌ MISTAKE 4: Same Lateness for Everything
Wrong:
# One size fits all
ALLOWED_LATENESS = "1 hour" # For all events
Problem:
Web events: 99.9% arrive in 1 minute (over-buffered)
Mobile events: 20% arrive after 1 hour (under-buffered)
Right:
# Per-source lateness configuration
WEB_LATENESS = "5 minutes"
MOBILE_LATENESS = "4 hours"
PARTNER_LATENESS = "24 hours"
7.2 Dashboard Mistakes
DASHBOARD ANTI-PATTERNS
❌ MISTAKE 1: Showing "Real-time" Without Caveat
Wrong:
Dashboard title: "Revenue Today: $1,234,567"
Problem:
User thinks this is final
Makes decisions on incomplete data
Loses trust when it changes
Right:
Dashboard title: "Revenue Today: $1,234,567"
Subtitle: "Live • Updates every minute • Final at midnight"
❌ MISTAKE 2: No Comparison to Final
Wrong:
Show only latest number
No way to see what changed
Problem:
User can't assess data quality
No understanding of late data impact
Right:
"Today (live): $1,234,567"
"Yesterday (final): $1,456,789"
"Yesterday (at this time): $1,380,000" ← For comparison
❌ MISTAKE 3: Alerting on Non-Final Data
Wrong:
if revenue < threshold:
send_alert("Revenue is low!")
Problem:
Alert fires, then number corrects
Alert fatigue
Team ignores real issues
Right:
if is_final and revenue < threshold:
send_alert("Final revenue below threshold")
elif not is_final and revenue < threshold * 0.9:
# Only alert if significantly below
send_alert("Warning: Live revenue tracking low")
Part IV: Interview Preparation
Chapter 8: Interview Tips
8.1 When to Discuss Late Data
LATE DATA INTERVIEW TRIGGERS
"Design a real-time analytics system"
→ "How do we handle events that arrive late?"
"Design a dashboard for [X]"
→ "Should we show live or final numbers?"
"Why might the numbers not match?"
→ "Late data is a common cause..."
"How do you ensure data accuracy?"
→ "Versioning and finalization process..."
"Design event tracking for mobile app"
→ "Mobile events often arrive late due to offline usage..."
8.2 Key Phrases
TALKING ABOUT LATE DATA
"Late data is inevitable in distributed systems. Mobile apps
go offline, payment confirmations are async, partner feeds
are batched. The key is designing for it from the start."
"I'd use event time with bounded out-of-orderness watermarks.
Based on the lateness distribution — which we'd measure —
we'd set allowed lateness to capture 99% of events."
"For dashboards, I'd version all results and clearly label
whether data is 'live' or 'final'. Users need to know what
they're looking at. Real-time is valuable, but so is accuracy."
"Late events beyond our tolerance go to a side output for
later processing. Nothing is dropped silently — we either
process it or track it for analysis."
"We'd run a daily finalization job that reprocesses the full
day's data. This produces the 'official' numbers that finance
uses. The streaming numbers are provisional."
Chapter 9: Practice Problems
Problem 1: Mobile Gaming Analytics
Setup: Design analytics for a mobile game where:
- Users play offline (airplane mode, subway)
- Events buffer on device for up to 24 hours
- Need real-time leaderboards AND accurate daily reports
Questions:
- What watermark strategy would you use?
- How do you handle a user who goes offline for 2 days?
- How do you reconcile real-time vs daily leaderboards?
- Separate allowed lateness for leaderboard vs reports
- Per-user watermarks with idle timeout
- Leaderboard: Show provisional, update on sync
- Daily: T+1 finalization job
Problem 2: Ad Attribution
Setup: Attribute ad conversions where:
- Ad impression → (up to 30 days) → Conversion
- Conversions may arrive days after they happen
- Revenue attribution must be accurate for billing
Questions:
- How long do you keep attribution state?
- How do you handle attribution windows reopening?
- When do you finalize attribution for billing?
- Keep impression state for 30+ days
- Attribution is mutable until billing close
- Monthly billing close is "final" cutoff
- Corrections after billing are adjustments
Chapter 10: Sample Interview Dialogue
Interviewer: "We're seeing our dashboard numbers change after the fact. Users are losing trust. How would you fix this?"
You: "This is a classic late data problem. Let me understand first — what kinds of numbers are changing, and by how much?"
Interviewer: "Daily revenue. Sometimes it increases by 5-10% the next day."
You: "That's significant. The 5-10% is likely from mobile app events arriving late, payment confirmations, or batch data feeds. First, I'd measure the actual lateness distribution — what percentage arrives within 1 hour, 4 hours, 24 hours.
Based on that data, I'd implement a multi-pronged approach:
First, watermarks in streaming with appropriate allowed lateness. If 99% of events arrive within 4 hours, set that as the watermark lag. Events arriving after that go to a side output for later processing.
Second, versioned results. Instead of updating in place, store each computation as a new version. This creates an audit trail and lets users understand how numbers evolve.
Third, clear labeling on dashboards. Show 'Live (updating)' for streaming results and 'Final' for batch-processed results. The T+1 batch job reprocesses the full day to produce the official number.
Fourth, educate users. Provide a comparison view showing 'Today at this time yesterday' alongside 'Yesterday final'. This sets expectations that live numbers will increase."
Interviewer: "What if some users really need real-time and others need final?"
You: "Great point. I'd expose both through the API with a version parameter. Dashboards for operations show 'latest' — they need speed. Finance reports request 'final' — they need accuracy.
We can also add a 'completeness estimate' based on historical patterns. If we typically see 95% of events within the first hour, we can show 'Today: $1M (estimated 95% complete)'. This gives users both the number and the confidence level."
Summary
DAY 4 SUMMARY: LATE-ARRIVING DATA
TWO TYPES OF TIME
├── Event time: When it happened (business truth)
├── Processing time: When we saw it
└── Gap can be seconds to days
WATERMARKS
├── Track event time progress
├── "All events <= W have (probably) arrived"
├── Bounded out-of-orderness most common
└── Set based on measured lateness distribution
HANDLING STRATEGIES
├── DROP: Ignore late events (simple, loses data)
├── SIDE OUTPUT: Route for separate processing
├── UPDATE: Modify window result (complex, accurate)
└── REPROCESS: Full recomputation (expensive, exact)
VERSIONING
├── Never update in place
├── Store each computation as version
├── Support "latest", "final", "as-of" queries
└── Audit trail for debugging
PRODUCTION PATTERNS
├── Streaming for real-time (provisional)
├── Daily batch for finalization (official)
├── T+1 reports vs T+7 reports
└── Clear labeling on dashboards
Key Takeaways
LATE DATA KEY TAKEAWAYS
1. LATE DATA IS INEVITABLE
Design for it from day one
2. MEASURE YOUR LATENESS
Can't set watermarks without data
3. VERSION EVERYTHING
Audit trail saves debugging time
4. LABEL YOUR DATA
Users must know live vs final
5. FINALIZE EXPLICITLY
Batch job produces official numbers
GOLDEN RULES:
├── Event time, not processing time
├── Watermarks matched to reality
├── Nothing dropped silently
├── Versions, not overwrites
└── "Live" and "Final" are both valid
What's Next
Tomorrow, we'll tackle Query Layer and Optimization — making queries over billions of rows return in milliseconds.
TOMORROW'S PREVIEW: QUERY OPTIMIZATION
Questions we'll answer:
├── Which OLAP database should I use?
├── How do materialized views help?
├── When to pre-aggregate vs compute on-the-fly?
├── How to handle multi-tenant query isolation?
└── How to manage query costs at scale?
We have the data modeled and late data handled.
Now we make it FAST.
End of Week 8, Day 4
Next: Day 5 — Query Layer and Optimization