Week 8 — Day 1: Event Ingestion at Scale
System Design Mastery Series — Analytics Pipeline Week
Preface
Last week, we built search systems that help users find information. But how do we know what users are actually searching for? How do we track which results they click? How do we measure if our search is actually helping?
THE VISIBILITY PROBLEM
Your e-commerce platform is live:
├── 50,000 users browsing right now
├── Searches happening every second
├── Products being viewed, added to cart, purchased
├── Errors occurring somewhere
├── Performance varying across regions
But you have no idea:
├── What's the conversion rate right now?
├── Which products are trending?
├── Where are users dropping off?
├── Is the new feature helping or hurting?
├── Why did revenue drop 20% this morning?
Without events, you're flying blind.
This is why event ingestion matters.
Today, we'll learn to capture every meaningful action in your system — reliably, at scale, without losing data or blocking your application.
Part I: Foundations
Chapter 1: What Is Event Ingestion?
1.1 The Simple Definition
Event ingestion is the process of capturing, validating, and storing events from various sources into a system that can process and analyze them.
EVENTS ARE FACTS ABOUT WHAT HAPPENED
An event is an immutable record that something occurred:
{
"event_id": "evt_abc123",
"event_type": "product.viewed",
"timestamp": "2024-01-15T10:30:00Z",
"user_id": "user_456",
"data": {
"product_id": "prod_789",
"source": "search_results",
"position": 3
}
}
Key properties:
├── Immutable: Once recorded, never changed
├── Timestamped: When did it happen?
├── Typed: What kind of event?
├── Contextual: Who, what, where, how?
└── Unique: Can be deduplicated
1.2 Why Events, Not Logs?
LOGS VS EVENTS
LOGS (unstructured):
"2024-01-15 10:30:00 INFO User 456 viewed product 789 from search"
Problems:
├── Parsing is fragile (regex hell)
├── Schema changes break parsers
├── Missing context (what search query?)
└── Hard to aggregate
EVENTS (structured):
{
"event_type": "product.viewed",
"user_id": "user_456",
"product_id": "prod_789",
"source": "search",
"search_query": "running shoes",
"position": 3
}
Benefits:
├── Schema-aware (validation possible)
├── Self-describing
├── Easy to query and aggregate
└── Evolves safely with versioning
1.3 The Ingestion Challenge
SCALE OF EVENT INGESTION
Small startup:
├── 10,000 events/day
├── Simple logging to database
└── Query with SQL
Growing company:
├── 10,000,000 events/day
├── Need streaming pipeline
└── Batch processing for analytics
Large platform:
├── 1,000,000,000+ events/day
├── Distributed ingestion
├── Real-time + batch processing
├── Multi-region replication
└── Complex event routing
The challenge isn't just volume — it's:
├── Never losing events (durability)
├── Never blocking producers (availability)
├── Handling spikes (elasticity)
├── Maintaining order (where needed)
└── Enabling replay (reprocessing)
Chapter 2: Event Schema Design
2.1 Anatomy of a Good Event
# schemas/event_schema.py
"""
Event schema design principles.
A well-designed event is:
- Self-describing (contains its own context)
- Immutable (never modified after creation)
- Uniquely identifiable (for deduplication)
- Timestamped (event time vs processing time)
- Versioned (for schema evolution)
"""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Dict, Any
from enum import Enum
import uuid
class EventType(Enum):
"""Categorized event types."""
# User actions
PAGE_VIEWED = "page.viewed"
PRODUCT_VIEWED = "product.viewed"
PRODUCT_SEARCHED = "product.searched"
CART_UPDATED = "cart.updated"
ORDER_PLACED = "order.placed"
# System events
ERROR_OCCURRED = "error.occurred"
PERFORMANCE_MEASURED = "performance.measured"
# Business events
PAYMENT_PROCESSED = "payment.processed"
SHIPMENT_UPDATED = "shipment.updated"
@dataclass
class EventMetadata:
"""
Standard metadata for all events.
This is the "envelope" that wraps every event.
"""
# Identity
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = ""
event_version: str = "1.0"
# Timing
event_time: datetime = field(default_factory=datetime.utcnow) # When it happened
received_time: Optional[datetime] = None # When server received it
# Source
source_service: str = "" # Which service emitted this
source_instance: str = "" # Which instance
# Correlation
correlation_id: Optional[str] = None # For tracing across services
causation_id: Optional[str] = None # What event caused this
# User context
user_id: Optional[str] = None
session_id: Optional[str] = None
device_id: Optional[str] = None
@dataclass
class DeviceContext:
"""Device and client information."""
platform: str = "" # web, ios, android
app_version: str = ""
os_name: str = ""
os_version: str = ""
browser: str = ""
screen_resolution: str = ""
timezone: str = ""
language: str = ""
@dataclass
class LocationContext:
"""Geographic context (if available)."""
country: str = ""
region: str = ""
city: str = ""
ip_address: str = "" # Often hashed for privacy
@dataclass
class BaseEvent:
"""
Base class for all events.
Every event has:
1. Metadata (envelope)
2. Device context
3. Location context
4. Event-specific payload
"""
metadata: EventMetadata
device: DeviceContext
location: LocationContext
payload: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Serialize event to dictionary."""
return {
"event_id": self.metadata.event_id,
"event_type": self.metadata.event_type,
"event_version": self.metadata.event_version,
"event_time": self.metadata.event_time.isoformat(),
"received_time": (
self.metadata.received_time.isoformat()
if self.metadata.received_time else None
),
"source": {
"service": self.metadata.source_service,
"instance": self.metadata.source_instance
},
"correlation": {
"correlation_id": self.metadata.correlation_id,
"causation_id": self.metadata.causation_id
},
"user": {
"user_id": self.metadata.user_id,
"session_id": self.metadata.session_id,
"device_id": self.metadata.device_id
},
"device": {
"platform": self.device.platform,
"app_version": self.device.app_version,
"os": f"{self.device.os_name} {self.device.os_version}",
"browser": self.device.browser
},
"location": {
"country": self.location.country,
"city": self.location.city
},
"payload": self.payload
}
# =============================================================================
# Specific Event Types
# =============================================================================
@dataclass
class ProductViewedEvent(BaseEvent):
"""User viewed a product page."""
def __post_init__(self):
self.metadata.event_type = EventType.PRODUCT_VIEWED.value
# Validate required payload fields
required = ["product_id"]
for field in required:
if field not in self.payload:
raise ValueError(f"Missing required field: {field}")
@dataclass
class OrderPlacedEvent(BaseEvent):
"""User placed an order."""
def __post_init__(self):
self.metadata.event_type = EventType.ORDER_PLACED.value
required = ["order_id", "total_amount", "currency", "items"]
for field in required:
if field not in self.payload:
raise ValueError(f"Missing required field: {field}")
# =============================================================================
# Example Usage
# =============================================================================
def create_product_viewed_event(
user_id: str,
product_id: str,
source: str,
position: int = None
) -> ProductViewedEvent:
"""Factory function to create product viewed event."""
return ProductViewedEvent(
metadata=EventMetadata(
user_id=user_id,
source_service="web-frontend"
),
device=DeviceContext(
platform="web",
browser="Chrome 120"
),
location=LocationContext(
country="US",
city="San Francisco"
),
payload={
"product_id": product_id,
"source": source,
"position": position,
"referrer": "search_results"
}
)
2.2 Schema Evolution
# schemas/schema_evolution.py
"""
Schema evolution strategies.
Events are immutable, but schemas evolve. We need to handle:
1. Adding new fields (safe)
2. Removing fields (dangerous)
3. Changing field types (dangerous)
4. Renaming fields (very dangerous)
"""
from dataclasses import dataclass
from typing import Dict, Any, Optional
from enum import Enum
class SchemaCompatibility(Enum):
"""Schema compatibility modes."""
BACKWARD = "backward" # New schema can read old data
FORWARD = "forward" # Old schema can read new data
FULL = "full" # Both directions
NONE = "none" # No compatibility guaranteed
@dataclass
class SchemaVersion:
"""Tracks schema versions."""
event_type: str
version: str
schema: Dict[str, Any]
compatibility: SchemaCompatibility
class SchemaRegistry:
"""
Manages event schema versions.
In production, use:
- Confluent Schema Registry (with Kafka)
- AWS Glue Schema Registry
- Custom implementation
"""
def __init__(self):
self.schemas: Dict[str, Dict[str, SchemaVersion]] = {}
def register_schema(
self,
event_type: str,
version: str,
schema: Dict[str, Any],
compatibility: SchemaCompatibility = SchemaCompatibility.BACKWARD
) -> SchemaVersion:
"""Register a new schema version."""
if event_type not in self.schemas:
self.schemas[event_type] = {}
# Check compatibility with previous version
if self.schemas[event_type]:
latest = self._get_latest_version(event_type)
if not self._check_compatibility(latest.schema, schema, compatibility):
raise SchemaIncompatibleError(
f"Schema {version} is not {compatibility.value} compatible "
f"with {latest.version}"
)
schema_version = SchemaVersion(
event_type=event_type,
version=version,
schema=schema,
compatibility=compatibility
)
self.schemas[event_type][version] = schema_version
return schema_version
def _get_latest_version(self, event_type: str) -> SchemaVersion:
"""Get the latest registered schema version."""
versions = sorted(self.schemas[event_type].keys())
return self.schemas[event_type][versions[-1]]
def _check_compatibility(
self,
old_schema: Dict,
new_schema: Dict,
mode: SchemaCompatibility
) -> bool:
"""Check if schemas are compatible."""
old_fields = set(old_schema.get("required", []))
new_fields = set(new_schema.get("required", []))
if mode == SchemaCompatibility.BACKWARD:
# New schema can read old data
# New required fields are NOT allowed
new_required = new_fields - old_fields
return len(new_required) == 0
elif mode == SchemaCompatibility.FORWARD:
# Old schema can read new data
# Removing required fields is NOT allowed
removed_required = old_fields - new_fields
return len(removed_required) == 0
elif mode == SchemaCompatibility.FULL:
# Both directions
return old_fields == new_fields
return True # NONE compatibility
# =============================================================================
# Schema Evolution Examples
# =============================================================================
SCHEMA_V1 = {
"type": "object",
"required": ["event_id", "event_type", "user_id", "product_id"],
"properties": {
"event_id": {"type": "string"},
"event_type": {"type": "string"},
"user_id": {"type": "string"},
"product_id": {"type": "string"},
"timestamp": {"type": "string", "format": "date-time"}
}
}
# SAFE: Adding optional field
SCHEMA_V2 = {
"type": "object",
"required": ["event_id", "event_type", "user_id", "product_id"],
"properties": {
"event_id": {"type": "string"},
"event_type": {"type": "string"},
"user_id": {"type": "string"},
"product_id": {"type": "string"},
"timestamp": {"type": "string", "format": "date-time"},
"source": {"type": "string"} # NEW: optional field
}
}
# DANGEROUS: Adding required field (breaks backward compatibility)
SCHEMA_V3_BAD = {
"type": "object",
"required": ["event_id", "event_type", "user_id", "product_id", "session_id"],
"properties": {
# ... session_id is now required, old events don't have it
}
}
# SAFE WAY: Add with default
SCHEMA_V3_GOOD = {
"type": "object",
"required": ["event_id", "event_type", "user_id", "product_id"],
"properties": {
"event_id": {"type": "string"},
"event_type": {"type": "string"},
"user_id": {"type": "string"},
"product_id": {"type": "string"},
"session_id": {"type": "string", "default": "unknown"} # Optional with default
}
}
2.3 Event Naming Conventions
# schemas/naming_conventions.py
"""
Event naming conventions.
Good event names are:
- Hierarchical (domain.entity.action)
- Past tense (something happened)
- Specific but not too granular
"""
# Event naming patterns
EVENT_NAMING = """
NAMING CONVENTION: domain.entity.action
EXAMPLES:
User events:
├── user.signed_up
├── user.logged_in
├── user.logged_out
├── user.password_changed
├── user.profile_updated
Product events:
├── product.viewed
├── product.searched
├── product.added_to_cart
├── product.removed_from_cart
├── product.wishlisted
Order events:
├── order.placed
├── order.confirmed
├── order.shipped
├── order.delivered
├── order.cancelled
├── order.refunded
Payment events:
├── payment.initiated
├── payment.succeeded
├── payment.failed
├── payment.refunded
System events:
├── system.error_occurred
├── system.performance_measured
├── system.health_checked
ANTI-PATTERNS:
❌ Too generic:
"event", "action", "update"
❌ Future tense:
"user.will_signup", "order.placing"
❌ Imperative:
"create_user", "place_order"
❌ Too specific:
"user.clicked_blue_signup_button_on_homepage_v2"
❌ Inconsistent:
"userSignup", "user-login", "USER_LOGOUT"
"""
Chapter 3: Ingestion Architecture
3.1 The Ingestion Pipeline
EVENT INGESTION PIPELINE
┌─────────────────────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Web │ │ Mobile │ │ Backend │ │ IoT │ │ Third │ │
│ │ Browser │ │ Apps │ │Services │ │ Devices │ │ Party │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
└───────┼────────────┼────────────┼────────────┼────────────┼─────────────┘
│ │ │ │ │
└────────────┴─────┬──────┴────────────┴────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────────┐
│ COLLECTION LAYER │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Event Collection API │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ /events │ │ /batch │ │ /health │ │ │
│ │ │ (single) │ │ (bulk) │ │ (status) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Validation Layer │ │
│ │ • Schema validation • Required fields check │ │
│ │ • Type coercion • Size limits │ │
│ │ • Timestamp normalization • Deduplication check │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────┴───────────┐ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Valid │ │ Invalid │ │
│ │ Events │ │ Events │ │
│ └────┬─────┘ └────┬─────┘ │
│ │ │ │
└──────────────────┼───────────────────────┼───────────────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────────────────┐
│ STREAMING LAYER (KAFKA) │
│ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │
│ │ raw_events │ │ validated_ │ │ dead_letter_ │ │
│ │ (all events) │ │ events │ │ events │ │
│ │ │ │ (clean data) │ │ (failures) │ │
│ └────────────────┘ └────────────────┘ └────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ PROCESSING LAYER │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Real-time │ │ Batch │ │ Raw Lake │ │
│ │ Processor │ │ Processor │ │ Storage │ │
│ │ (Flink) │ │ (Spark) │ │ (S3) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
3.2 Collection API Design
# ingestion/collection_api.py
"""
Event Collection API.
High-performance API for receiving events from clients.
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
import asyncio
import logging
logger = logging.getLogger(__name__)
app = FastAPI(title="Event Collection API")
# =============================================================================
# Request/Response Models
# =============================================================================
class EventRequest(BaseModel):
"""Single event submission."""
event_id: str = Field(..., description="Unique event identifier")
event_type: str = Field(..., description="Event type (e.g., product.viewed)")
event_time: datetime = Field(..., description="When the event occurred")
user_id: Optional[str] = None
session_id: Optional[str] = None
payload: Dict[str, Any] = Field(default_factory=dict)
class Config:
json_schema_extra = {
"example": {
"event_id": "evt_abc123",
"event_type": "product.viewed",
"event_time": "2024-01-15T10:30:00Z",
"user_id": "user_456",
"payload": {
"product_id": "prod_789",
"source": "search"
}
}
}
class BatchEventRequest(BaseModel):
"""Batch event submission."""
events: List[EventRequest] = Field(..., max_length=1000)
class EventResponse(BaseModel):
"""Response for event submission."""
status: str
event_id: str
received_at: datetime
class BatchEventResponse(BaseModel):
"""Response for batch submission."""
status: str
accepted: int
rejected: int
errors: List[Dict[str, str]] = []
# =============================================================================
# Collection Service
# =============================================================================
class EventCollector:
"""
Collects and forwards events to Kafka.
"""
def __init__(self, kafka_producer, validator, enricher):
self.producer = kafka_producer
self.validator = validator
self.enricher = enricher
# Metrics
self.events_received = 0
self.events_accepted = 0
self.events_rejected = 0
async def collect_event(
self,
event: EventRequest,
request_context: Dict[str, Any]
) -> EventResponse:
"""
Process a single event.
Steps:
1. Add server-side metadata
2. Validate against schema
3. Enrich with additional context
4. Send to Kafka
"""
self.events_received += 1
received_at = datetime.utcnow()
# Build full event
full_event = {
**event.dict(),
"received_time": received_at.isoformat(),
"server_context": {
"collector_instance": request_context.get("instance_id"),
"client_ip": request_context.get("client_ip"),
"user_agent": request_context.get("user_agent")
}
}
# Validate
validation_result = await self.validator.validate(full_event)
if not validation_result.is_valid:
self.events_rejected += 1
# Send to dead letter queue
await self._send_to_dlq(full_event, validation_result.errors)
raise HTTPException(
status_code=400,
detail={
"error": "Validation failed",
"details": validation_result.errors
}
)
# Enrich
enriched_event = await self.enricher.enrich(full_event)
# Send to Kafka
await self._send_to_kafka(enriched_event)
self.events_accepted += 1
return EventResponse(
status="accepted",
event_id=event.event_id,
received_at=received_at
)
async def collect_batch(
self,
batch: BatchEventRequest,
request_context: Dict[str, Any]
) -> BatchEventResponse:
"""
Process a batch of events.
For efficiency, we:
1. Validate all events first
2. Send valid events in a single Kafka batch
3. Send invalid events to DLQ
"""
received_at = datetime.utcnow()
accepted = []
rejected = []
errors = []
for event in batch.events:
full_event = {
**event.dict(),
"received_time": received_at.isoformat(),
"server_context": {
"collector_instance": request_context.get("instance_id"),
"client_ip": request_context.get("client_ip")
}
}
validation_result = await self.validator.validate(full_event)
if validation_result.is_valid:
enriched = await self.enricher.enrich(full_event)
accepted.append(enriched)
else:
rejected.append(full_event)
errors.append({
"event_id": event.event_id,
"error": validation_result.errors[0]
})
# Batch send to Kafka
if accepted:
await self._send_batch_to_kafka(accepted)
# Batch send to DLQ
if rejected:
await self._send_batch_to_dlq(rejected)
self.events_received += len(batch.events)
self.events_accepted += len(accepted)
self.events_rejected += len(rejected)
return BatchEventResponse(
status="processed",
accepted=len(accepted),
rejected=len(rejected),
errors=errors[:10] # Limit error details
)
async def _send_to_kafka(self, event: Dict):
"""Send single event to Kafka."""
topic = self._get_topic(event["event_type"])
key = event.get("user_id") or event["event_id"]
await self.producer.send(
topic=topic,
key=key,
value=event
)
async def _send_batch_to_kafka(self, events: List[Dict]):
"""Send batch of events to Kafka."""
# Group by topic
by_topic = {}
for event in events:
topic = self._get_topic(event["event_type"])
if topic not in by_topic:
by_topic[topic] = []
by_topic[topic].append(event)
# Send each topic batch
for topic, topic_events in by_topic.items():
await self.producer.send_batch(topic, topic_events)
async def _send_to_dlq(self, event: Dict, errors: List[str]):
"""Send failed event to dead letter queue."""
await self.producer.send(
topic="events.dead_letter",
value={
"original_event": event,
"errors": errors,
"failed_at": datetime.utcnow().isoformat()
}
)
async def _send_batch_to_dlq(self, events: List[Dict]):
"""Send batch of failed events to DLQ."""
for event in events:
await self._send_to_dlq(event, ["Batch validation failed"])
def _get_topic(self, event_type: str) -> str:
"""Route event to appropriate Kafka topic."""
# Route by event category
if event_type.startswith("user."):
return "events.user"
elif event_type.startswith("product."):
return "events.product"
elif event_type.startswith("order."):
return "events.order"
elif event_type.startswith("payment."):
return "events.payment"
else:
return "events.general"
# =============================================================================
# API Endpoints
# =============================================================================
# Global collector instance (initialized on startup)
collector: EventCollector = None
@app.post("/v1/events", response_model=EventResponse)
async def collect_single_event(
event: EventRequest,
request: Request,
background_tasks: BackgroundTasks
):
"""
Collect a single event.
Use this for real-time event tracking where immediate
confirmation is needed.
"""
context = {
"instance_id": "collector-1",
"client_ip": request.client.host,
"user_agent": request.headers.get("user-agent")
}
return await collector.collect_event(event, context)
@app.post("/v1/events/batch", response_model=BatchEventResponse)
async def collect_batch_events(
batch: BatchEventRequest,
request: Request
):
"""
Collect a batch of events.
Use this for:
- Mobile apps sending buffered events
- Backend services with high event volume
- Periodic sync of offline events
Maximum 1000 events per batch.
"""
context = {
"instance_id": "collector-1",
"client_ip": request.client.host
}
return await collector.collect_batch(batch, context)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"metrics": {
"events_received": collector.events_received,
"events_accepted": collector.events_accepted,
"events_rejected": collector.events_rejected
}
}
Part II: Implementation
Chapter 4: Validation and Enrichment
4.1 Event Validation
# ingestion/validation.py
"""
Event validation layer.
Validates events against schemas and business rules.
"""
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import re
import json
@dataclass
class ValidationResult:
"""Result of event validation."""
is_valid: bool
errors: List[str]
warnings: List[str]
class EventValidator:
"""
Validates events before ingestion.
Validation layers:
1. Schema validation (structure)
2. Type validation (field types)
3. Business rules (logical constraints)
4. Security validation (injection, size)
"""
def __init__(self, schema_registry):
self.schema_registry = schema_registry
# Limits
self.max_event_size_bytes = 1024 * 100 # 100KB
self.max_string_length = 10000
self.max_array_length = 1000
self.max_payload_depth = 10
# Time bounds
self.max_future_seconds = 300 # 5 minutes
self.max_past_days = 30 # 30 days old
async def validate(self, event: Dict[str, Any]) -> ValidationResult:
"""
Validate an event through all validation layers.
"""
errors = []
warnings = []
# Layer 1: Size validation
size_errors = self._validate_size(event)
errors.extend(size_errors)
if errors:
return ValidationResult(False, errors, warnings)
# Layer 2: Required fields
required_errors = self._validate_required_fields(event)
errors.extend(required_errors)
if errors:
return ValidationResult(False, errors, warnings)
# Layer 3: Schema validation
schema_errors = self._validate_schema(event)
errors.extend(schema_errors)
# Layer 4: Timestamp validation
time_errors, time_warnings = self._validate_timestamp(event)
errors.extend(time_errors)
warnings.extend(time_warnings)
# Layer 5: Business rules
business_errors = self._validate_business_rules(event)
errors.extend(business_errors)
# Layer 6: Security validation
security_errors = self._validate_security(event)
errors.extend(security_errors)
return ValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings
)
def _validate_size(self, event: Dict) -> List[str]:
"""Validate event size limits."""
errors = []
# Total size
event_json = json.dumps(event)
if len(event_json) > self.max_event_size_bytes:
errors.append(
f"Event size {len(event_json)} exceeds maximum "
f"{self.max_event_size_bytes} bytes"
)
return errors
def _validate_required_fields(self, event: Dict) -> List[str]:
"""Validate required fields are present."""
errors = []
required = ["event_id", "event_type", "event_time"]
for field in required:
if field not in event or event[field] is None:
errors.append(f"Missing required field: {field}")
return errors
def _validate_schema(self, event: Dict) -> List[str]:
"""Validate against registered schema."""
errors = []
event_type = event.get("event_type")
if not event_type:
return errors # Already caught in required fields
# Get schema for this event type
schema = self.schema_registry.get_schema(event_type)
if not schema:
# Unknown event type - warning but allow
return []
# Validate payload against schema
payload = event.get("payload", {})
for field, field_schema in schema.get("properties", {}).items():
if field in schema.get("required", []):
if field not in payload:
errors.append(f"Missing required payload field: {field}")
if field in payload:
field_errors = self._validate_field_type(
field, payload[field], field_schema
)
errors.extend(field_errors)
return errors
def _validate_field_type(
self,
field_name: str,
value: Any,
schema: Dict
) -> List[str]:
"""Validate a single field against its schema."""
errors = []
expected_type = schema.get("type")
type_mapping = {
"string": str,
"integer": int,
"number": (int, float),
"boolean": bool,
"array": list,
"object": dict
}
if expected_type in type_mapping:
expected = type_mapping[expected_type]
if not isinstance(value, expected):
errors.append(
f"Field '{field_name}' expected {expected_type}, "
f"got {type(value).__name__}"
)
return errors
def _validate_timestamp(self, event: Dict) -> tuple:
"""Validate event timestamp is reasonable."""
errors = []
warnings = []
event_time_str = event.get("event_time")
if not event_time_str:
return errors, warnings
try:
if isinstance(event_time_str, str):
event_time = datetime.fromisoformat(
event_time_str.replace("Z", "+00:00")
)
else:
event_time = event_time_str
now = datetime.utcnow()
# Check future timestamps
max_future = now + timedelta(seconds=self.max_future_seconds)
if event_time.replace(tzinfo=None) > max_future:
errors.append(
f"Event time {event_time} is too far in the future"
)
# Check old timestamps
max_past = now - timedelta(days=self.max_past_days)
if event_time.replace(tzinfo=None) < max_past:
warnings.append(
f"Event time {event_time} is more than "
f"{self.max_past_days} days old"
)
except Exception as e:
errors.append(f"Invalid event_time format: {e}")
return errors, warnings
def _validate_business_rules(self, event: Dict) -> List[str]:
"""Validate business-specific rules."""
errors = []
event_type = event.get("event_type")
payload = event.get("payload", {})
# Example: Order events must have positive amounts
if event_type == "order.placed":
amount = payload.get("total_amount", 0)
if amount <= 0:
errors.append("Order total_amount must be positive")
# Example: Product views must have product_id
if event_type == "product.viewed":
if not payload.get("product_id"):
errors.append("product.viewed requires product_id")
return errors
def _validate_security(self, event: Dict) -> List[str]:
"""Validate for security concerns."""
errors = []
# Check for potential injection in string fields
dangerous_patterns = [
r"<script",
r"javascript:",
r"data:text/html",
r"on\w+\s*=", # onclick, onerror, etc.
]
def check_strings(obj, path=""):
if isinstance(obj, str):
for pattern in dangerous_patterns:
if re.search(pattern, obj, re.IGNORECASE):
errors.append(
f"Potentially dangerous content in {path}"
)
elif isinstance(obj, dict):
for key, value in obj.items():
check_strings(value, f"{path}.{key}")
elif isinstance(obj, list):
for i, item in enumerate(obj):
check_strings(item, f"{path}[{i}]")
check_strings(event)
return errors
4.2 Event Enrichment
# ingestion/enrichment.py
"""
Event enrichment layer.
Adds additional context to events before storage.
"""
from dataclasses import dataclass
from typing import Dict, Any, Optional
from datetime import datetime
import hashlib
@dataclass
class GeoLocation:
"""Geographic location data."""
country: str
country_code: str
region: str
city: str
latitude: float
longitude: float
timezone: str
class EventEnricher:
"""
Enriches events with additional context.
Enrichment types:
1. IP geolocation
2. User agent parsing
3. User profile data
4. Product catalog data
5. Session reconstruction
6. Derived fields
"""
def __init__(
self,
geo_service,
user_service,
product_service,
session_service
):
self.geo = geo_service
self.users = user_service
self.products = product_service
self.sessions = session_service
async def enrich(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""
Enrich an event with additional context.
"""
enriched = {**event}
enriched["enrichment"] = {}
# Enrich with geolocation
if ip := event.get("server_context", {}).get("client_ip"):
geo = await self._enrich_geo(ip)
if geo:
enriched["enrichment"]["geo"] = geo
# Enrich with user data
if user_id := event.get("user_id"):
user_data = await self._enrich_user(user_id)
if user_data:
enriched["enrichment"]["user"] = user_data
# Enrich with product data
if product_id := event.get("payload", {}).get("product_id"):
product_data = await self._enrich_product(product_id)
if product_data:
enriched["enrichment"]["product"] = product_data
# Enrich with session data
if session_id := event.get("session_id"):
session_data = await self._enrich_session(session_id)
if session_data:
enriched["enrichment"]["session"] = session_data
# Add derived fields
enriched["derived"] = self._derive_fields(enriched)
return enriched
async def _enrich_geo(self, ip: str) -> Optional[Dict]:
"""Look up geolocation from IP."""
try:
# Skip private IPs
if ip.startswith(("10.", "172.", "192.168.", "127.")):
return None
location = await self.geo.lookup(ip)
if location:
return {
"country": location.country,
"country_code": location.country_code,
"region": location.region,
"city": location.city,
"timezone": location.timezone
}
except Exception:
pass
return None
async def _enrich_user(self, user_id: str) -> Optional[Dict]:
"""Look up user profile data."""
try:
user = await self.users.get_profile(user_id)
if user:
return {
"segment": user.segment,
"tier": user.tier,
"signup_date": user.signup_date.isoformat(),
"lifetime_value": user.ltv,
"is_subscriber": user.is_subscriber
}
except Exception:
pass
return None
async def _enrich_product(self, product_id: str) -> Optional[Dict]:
"""Look up product catalog data."""
try:
product = await self.products.get_product(product_id)
if product:
return {
"category": product.category,
"subcategory": product.subcategory,
"brand": product.brand,
"price": product.price,
"is_on_sale": product.is_on_sale
}
except Exception:
pass
return None
async def _enrich_session(self, session_id: str) -> Optional[Dict]:
"""Look up session data."""
try:
session = await self.sessions.get_session(session_id)
if session:
return {
"session_start": session.start_time.isoformat(),
"page_views": session.page_view_count,
"is_new_session": session.is_first_session,
"entry_page": session.entry_page,
"traffic_source": session.traffic_source
}
except Exception:
pass
return None
def _derive_fields(self, event: Dict) -> Dict:
"""Derive additional fields from event data."""
derived = {}
# Derive time dimensions
event_time = event.get("event_time")
if event_time:
if isinstance(event_time, str):
dt = datetime.fromisoformat(event_time.replace("Z", "+00:00"))
else:
dt = event_time
derived["time_dimensions"] = {
"year": dt.year,
"month": dt.month,
"day": dt.day,
"hour": dt.hour,
"day_of_week": dt.weekday(),
"is_weekend": dt.weekday() >= 5,
"quarter": (dt.month - 1) // 3 + 1
}
# Derive event category
event_type = event.get("event_type", "")
derived["event_category"] = event_type.split(".")[0] if "." in event_type else "unknown"
# Hash sensitive fields for analytics
if user_id := event.get("user_id"):
derived["user_id_hash"] = hashlib.sha256(
user_id.encode()
).hexdigest()[:16]
return derived
Chapter 5: Kafka as the Event Backbone
5.1 Kafka Topic Design
# ingestion/kafka_config.py
"""
Kafka configuration for event ingestion.
Topic design principles:
1. Partition by user_id for user-centric ordering
2. Separate topics by event category for independent scaling
3. Retention based on reprocessing needs
4. Compaction for lookup topics
"""
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
class CleanupPolicy(Enum):
DELETE = "delete"
COMPACT = "compact"
COMPACT_DELETE = "compact,delete"
@dataclass
class TopicConfig:
"""Kafka topic configuration."""
name: str
partitions: int
replication_factor: int
retention_ms: int # -1 for infinite
cleanup_policy: CleanupPolicy
min_insync_replicas: int = 2
def to_dict(self) -> Dict:
return {
"num.partitions": self.partitions,
"replication.factor": self.replication_factor,
"retention.ms": self.retention_ms,
"cleanup.policy": self.cleanup_policy.value,
"min.insync.replicas": self.min_insync_replicas
}
# =============================================================================
# Topic Definitions
# =============================================================================
RETENTION_7_DAYS = 7 * 24 * 60 * 60 * 1000
RETENTION_30_DAYS = 30 * 24 * 60 * 60 * 1000
RETENTION_90_DAYS = 90 * 24 * 60 * 60 * 1000
RETENTION_INFINITE = -1
TOPICS = {
# Raw events (all events, short retention)
"events.raw": TopicConfig(
name="events.raw",
partitions=64,
replication_factor=3,
retention_ms=RETENTION_7_DAYS,
cleanup_policy=CleanupPolicy.DELETE
),
# User events (user actions, longer retention)
"events.user": TopicConfig(
name="events.user",
partitions=32,
replication_factor=3,
retention_ms=RETENTION_30_DAYS,
cleanup_policy=CleanupPolicy.DELETE
),
# Product events (views, searches, carts)
"events.product": TopicConfig(
name="events.product",
partitions=32,
replication_factor=3,
retention_ms=RETENTION_30_DAYS,
cleanup_policy=CleanupPolicy.DELETE
),
# Order events (transactions, keep longer)
"events.order": TopicConfig(
name="events.order",
partitions=16,
replication_factor=3,
retention_ms=RETENTION_90_DAYS,
cleanup_policy=CleanupPolicy.DELETE
),
# Payment events (financial, keep indefinitely)
"events.payment": TopicConfig(
name="events.payment",
partitions=16,
replication_factor=3,
retention_ms=RETENTION_INFINITE,
cleanup_policy=CleanupPolicy.DELETE
),
# Dead letter queue (failed events)
"events.dead_letter": TopicConfig(
name="events.dead_letter",
partitions=8,
replication_factor=3,
retention_ms=RETENTION_30_DAYS,
cleanup_policy=CleanupPolicy.DELETE
),
# Aggregated metrics (compacted for current state)
"metrics.realtime": TopicConfig(
name="metrics.realtime",
partitions=16,
replication_factor=3,
retention_ms=RETENTION_INFINITE,
cleanup_policy=CleanupPolicy.COMPACT
)
}
# =============================================================================
# Partitioning Strategy
# =============================================================================
class PartitionStrategy:
"""
Determines which partition an event goes to.
Goals:
1. Events for same user go to same partition (ordering)
2. Even distribution across partitions (load balancing)
3. Consistent assignment (same key always same partition)
"""
@staticmethod
def get_partition_key(event: Dict) -> str:
"""
Get the partition key for an event.
Strategy:
- User events: partition by user_id
- Anonymous events: partition by session_id or device_id
- System events: partition by source service
"""
event_type = event.get("event_type", "")
# User-centric events
if event_type.startswith(("user.", "order.", "payment.")):
return event.get("user_id") or event.get("session_id") or "anonymous"
# Product events - partition by product for aggregation
if event_type.startswith("product."):
# Use user_id if available for user journey analysis
if user_id := event.get("user_id"):
return user_id
# Fall back to product_id for product analytics
return event.get("payload", {}).get("product_id", "unknown")
# System events - partition by service
if event_type.startswith("system."):
return event.get("server_context", {}).get("collector_instance", "default")
# Default: use event_id for even distribution
return event.get("event_id", "unknown")
5.2 Kafka Producer with Exactly-Once
# ingestion/kafka_producer.py
"""
High-performance Kafka producer for event ingestion.
Features:
- Batching for throughput
- Exactly-once semantics
- Async sending with callbacks
- Compression
- Retries with backoff
"""
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass
import asyncio
import json
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
@dataclass
class ProducerConfig:
"""Kafka producer configuration."""
bootstrap_servers: List[str]
client_id: str = "event-collector"
# Batching
batch_size: int = 16384 # 16KB
linger_ms: int = 5 # Wait up to 5ms for batch
buffer_memory: int = 33554432 # 32MB
# Reliability
acks: str = "all" # Wait for all replicas
retries: int = 5
retry_backoff_ms: int = 100
enable_idempotence: bool = True # Exactly-once
# Compression
compression_type: str = "lz4"
# Timeouts
request_timeout_ms: int = 30000
delivery_timeout_ms: int = 120000
class EventProducer:
"""
Async Kafka producer for events.
"""
def __init__(self, config: ProducerConfig):
self.config = config
self.producer = None
# Metrics
self.messages_sent = 0
self.messages_failed = 0
self.bytes_sent = 0
async def start(self):
"""Initialize the Kafka producer."""
from aiokafka import AIOKafkaProducer
self.producer = AIOKafkaProducer(
bootstrap_servers=self.config.bootstrap_servers,
client_id=self.config.client_id,
acks=self.config.acks,
enable_idempotence=self.config.enable_idempotence,
compression_type=self.config.compression_type,
linger_ms=self.config.linger_ms,
max_batch_size=self.config.batch_size,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
await self.producer.start()
logger.info("Kafka producer started")
async def stop(self):
"""Shutdown the producer gracefully."""
if self.producer:
await self.producer.stop()
logger.info("Kafka producer stopped")
async def send(
self,
topic: str,
value: Dict[str, Any],
key: str = None,
headers: Dict[str, str] = None
) -> bool:
"""
Send a single event to Kafka.
Args:
topic: Target topic
value: Event data
key: Partition key
headers: Optional headers
Returns:
True if sent successfully
"""
try:
# Convert headers to Kafka format
kafka_headers = None
if headers:
kafka_headers = [
(k, v.encode('utf-8')) for k, v in headers.items()
]
# Send and wait for acknowledgment
result = await self.producer.send_and_wait(
topic=topic,
value=value,
key=key,
headers=kafka_headers
)
self.messages_sent += 1
self.bytes_sent += len(json.dumps(value))
logger.debug(
f"Event sent to {topic}:{result.partition}@{result.offset}"
)
return True
except Exception as e:
self.messages_failed += 1
logger.error(f"Failed to send event to {topic}: {e}")
raise
async def send_batch(
self,
topic: str,
events: List[Dict[str, Any]],
key_extractor: Callable[[Dict], str] = None
) -> int:
"""
Send a batch of events efficiently.
Args:
topic: Target topic
events: List of events
key_extractor: Function to extract key from event
Returns:
Number of successfully sent events
"""
if not events:
return 0
# Create batch of futures
futures = []
for event in events:
key = None
if key_extractor:
key = key_extractor(event)
elif "user_id" in event:
key = event["user_id"]
future = await self.producer.send(
topic=topic,
value=event,
key=key
)
futures.append(future)
# Wait for all to complete
success_count = 0
for i, future in enumerate(futures):
try:
await future
success_count += 1
except Exception as e:
logger.error(f"Failed to send event {i}: {e}")
self.messages_failed += 1
self.messages_sent += success_count
return success_count
async def flush(self):
"""Flush any pending messages."""
if self.producer:
await self.producer.flush()
def get_metrics(self) -> Dict[str, int]:
"""Get producer metrics."""
return {
"messages_sent": self.messages_sent,
"messages_failed": self.messages_failed,
"bytes_sent": self.bytes_sent
}
# =============================================================================
# Transaction Support (Exactly-Once)
# =============================================================================
class TransactionalEventProducer(EventProducer):
"""
Kafka producer with transaction support for exactly-once semantics.
Use when you need to:
1. Send events atomically (all or nothing)
2. Coordinate with other Kafka operations
3. Ensure exactly-once delivery
"""
def __init__(self, config: ProducerConfig, transactional_id: str):
super().__init__(config)
self.transactional_id = transactional_id
async def start(self):
"""Initialize transactional producer."""
from aiokafka import AIOKafkaProducer
self.producer = AIOKafkaProducer(
bootstrap_servers=self.config.bootstrap_servers,
client_id=self.config.client_id,
transactional_id=self.transactional_id,
acks=self.config.acks,
enable_idempotence=True, # Required for transactions
compression_type=self.config.compression_type
)
await self.producer.start()
# Initialize transactions
await self.producer.begin_transaction()
logger.info(f"Transactional producer started: {self.transactional_id}")
async def send_transactional(
self,
events: List[tuple] # List of (topic, event) tuples
) -> bool:
"""
Send multiple events in a single transaction.
Either all events are delivered or none are.
"""
try:
# Begin transaction
await self.producer.begin_transaction()
# Send all events
for topic, event in events:
await self.producer.send(topic, value=event)
# Commit transaction
await self.producer.commit_transaction()
self.messages_sent += len(events)
return True
except Exception as e:
logger.error(f"Transaction failed: {e}")
# Abort transaction
await self.producer.abort_transaction()
self.messages_failed += len(events)
return False
Chapter 6: Handling Ingestion Spikes
6.1 Backpressure and Rate Limiting
# ingestion/backpressure.py
"""
Backpressure handling for event ingestion.
When ingestion rate exceeds processing capacity:
1. Buffer events temporarily
2. Apply rate limiting
3. Shed load gracefully
4. Alert operators
"""
from dataclasses import dataclass
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
import asyncio
from collections import deque
import logging
logger = logging.getLogger(__name__)
@dataclass
class BackpressureConfig:
"""Backpressure configuration."""
# Queue limits
max_queue_size: int = 100000
warning_queue_size: int = 50000
# Rate limits
max_events_per_second: int = 50000
max_events_per_client: int = 1000 # Per second
# Timeouts
queue_timeout_seconds: int = 30
# Load shedding
enable_load_shedding: bool = True
shed_threshold: float = 0.9 # Shed when queue > 90% full
class BackpressureManager:
"""
Manages backpressure for event ingestion.
"""
def __init__(self, config: BackpressureConfig):
self.config = config
# Event queue
self.queue = asyncio.Queue(maxsize=config.max_queue_size)
# Rate tracking
self.event_count_window = deque(maxlen=1000) # Last 1000 events
self.client_event_counts: Dict[str, int] = {}
self.last_reset = datetime.utcnow()
# Status
self.is_shedding = False
self.events_shed = 0
async def accept_event(
self,
event: Dict[str, Any],
client_id: str
) -> bool:
"""
Try to accept an event for processing.
Returns True if accepted, False if rejected.
"""
# Check client rate limit
if not self._check_client_rate(client_id):
logger.warning(f"Client {client_id} exceeded rate limit")
return False
# Check global rate limit
if not self._check_global_rate():
logger.warning("Global rate limit exceeded")
return False
# Check queue capacity
queue_usage = self.queue.qsize() / self.config.max_queue_size
# Load shedding
if self.config.enable_load_shedding and queue_usage > self.config.shed_threshold:
if not self.is_shedding:
self.is_shedding = True
logger.warning(f"Entering load shedding mode. Queue: {queue_usage:.1%}")
# Shed non-critical events
if self._should_shed(event):
self.events_shed += 1
return False
else:
if self.is_shedding:
self.is_shedding = False
logger.info("Exiting load shedding mode")
# Try to queue event
try:
self.queue.put_nowait(event)
self._record_event(client_id)
return True
except asyncio.QueueFull:
logger.error("Event queue full, rejecting event")
return False
async def get_event(self, timeout: float = None) -> Optional[Dict]:
"""Get next event from queue."""
try:
if timeout:
return await asyncio.wait_for(
self.queue.get(),
timeout=timeout
)
return await self.queue.get()
except asyncio.TimeoutError:
return None
def _check_client_rate(self, client_id: str) -> bool:
"""Check if client is within rate limit."""
self._reset_if_needed()
count = self.client_event_counts.get(client_id, 0)
return count < self.config.max_events_per_client
def _check_global_rate(self) -> bool:
"""Check if global rate is within limit."""
# Count events in last second
now = datetime.utcnow()
one_second_ago = now - timedelta(seconds=1)
recent_count = sum(
1 for ts in self.event_count_window
if ts > one_second_ago
)
return recent_count < self.config.max_events_per_second
def _record_event(self, client_id: str):
"""Record event for rate tracking."""
now = datetime.utcnow()
self.event_count_window.append(now)
if client_id not in self.client_event_counts:
self.client_event_counts[client_id] = 0
self.client_event_counts[client_id] += 1
def _reset_if_needed(self):
"""Reset per-second counters if needed."""
now = datetime.utcnow()
if (now - self.last_reset).total_seconds() >= 1:
self.client_event_counts.clear()
self.last_reset = now
def _should_shed(self, event: Dict) -> bool:
"""Determine if event should be shed during overload."""
event_type = event.get("event_type", "")
# Never shed critical events
critical_types = [
"order.placed",
"payment.processed",
"user.signed_up"
]
if any(event_type.startswith(t) for t in critical_types):
return False
# Shed less important events
shedable_types = [
"page.viewed",
"product.viewed",
"system.performance"
]
return any(event_type.startswith(t) for t in shedable_types)
def get_status(self) -> Dict:
"""Get backpressure status."""
return {
"queue_size": self.queue.qsize(),
"queue_capacity": self.config.max_queue_size,
"queue_usage": self.queue.qsize() / self.config.max_queue_size,
"is_shedding": self.is_shedding,
"events_shed": self.events_shed,
"recent_rate": len(self.event_count_window)
}
# =============================================================================
# Adaptive Rate Limiting
# =============================================================================
class AdaptiveRateLimiter:
"""
Adaptive rate limiter that adjusts based on system health.
When downstream is healthy: allow more traffic
When downstream is struggling: reduce limits
"""
def __init__(
self,
initial_rate: int = 10000,
min_rate: int = 1000,
max_rate: int = 100000
):
self.current_rate = initial_rate
self.min_rate = min_rate
self.max_rate = max_rate
# Sliding window for health tracking
self.success_count = 0
self.failure_count = 0
self.last_adjustment = datetime.utcnow()
def record_success(self):
"""Record successful event processing."""
self.success_count += 1
self._maybe_adjust()
def record_failure(self):
"""Record failed event processing."""
self.failure_count += 1
self._maybe_adjust()
def _maybe_adjust(self):
"""Adjust rate limit based on health."""
now = datetime.utcnow()
if (now - self.last_adjustment).total_seconds() < 10:
return # Adjust every 10 seconds
total = self.success_count + self.failure_count
if total < 100:
return # Need enough samples
failure_rate = self.failure_count / total
if failure_rate < 0.01: # <1% failures, increase limit
self.current_rate = min(
self.max_rate,
int(self.current_rate * 1.1)
)
logger.info(f"Increased rate limit to {self.current_rate}")
elif failure_rate > 0.05: # >5% failures, decrease limit
self.current_rate = max(
self.min_rate,
int(self.current_rate * 0.8)
)
logger.warning(f"Decreased rate limit to {self.current_rate}")
# Reset counters
self.success_count = 0
self.failure_count = 0
self.last_adjustment = now
def get_current_limit(self) -> int:
"""Get current rate limit."""
return self.current_rate
6.2 Multi-Region Ingestion
# ingestion/multi_region.py
"""
Multi-region event ingestion architecture.
For global applications:
1. Collect events in each region
2. Aggregate to central location
3. Handle cross-region replication
"""
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
class Region(Enum):
US_EAST = "us-east-1"
US_WEST = "us-west-2"
EU_WEST = "eu-west-1"
AP_SOUTH = "ap-south-1"
AP_NORTHEAST = "ap-northeast-1"
@dataclass
class RegionalConfig:
"""Configuration for a regional collector."""
region: Region
kafka_brokers: List[str]
is_primary: bool = False
MULTI_REGION_ARCHITECTURE = """
MULTI-REGION EVENT INGESTION
GLOBAL USERS
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│US East │ │EU West │ │AP South │
│Collector│ │Collector│ │Collector│
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Regional │ │Regional │ │Regional │
│ Kafka │ │ Kafka │ │ Kafka │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└───────────────────┼───────────────────┘
│
┌───────▼───────┐
│ MirrorMaker │
│ (Replicator) │
└───────┬───────┘
│
┌───────▼───────┐
│Central Kafka │
│ (US East) │
└───────┬───────┘
│
┌─────────────┼─────────────┐
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│ Flink │ │ Spark │ │ S3 │
│Stream │ │ Batch │ │ Lake │
└────────┘ └────────┘ └────────┘
BENEFITS:
├── Low latency for users (collect locally)
├── Fault tolerance (region can fail)
├── Compliance (data stays in region)
└── Central analytics (aggregate view)
CHALLENGES:
├── Cross-region replication lag
├── Ordering across regions
├── Cost of data transfer
└── Schema consistency
"""
class MultiRegionRouter:
"""
Routes events to appropriate regional collector.
"""
def __init__(self, regional_configs: Dict[Region, RegionalConfig]):
self.configs = regional_configs
def get_collector_for_user(self, user_region: str) -> RegionalConfig:
"""Get the nearest collector for a user."""
# Map user regions to our regions
region_mapping = {
"US": Region.US_EAST,
"CA": Region.US_EAST,
"MX": Region.US_EAST,
"BR": Region.US_EAST,
"GB": Region.EU_WEST,
"DE": Region.EU_WEST,
"FR": Region.EU_WEST,
"IN": Region.AP_SOUTH,
"SG": Region.AP_SOUTH,
"JP": Region.AP_NORTHEAST,
"KR": Region.AP_NORTHEAST,
"AU": Region.AP_NORTHEAST,
}
region = region_mapping.get(user_region, Region.US_EAST)
return self.configs.get(region, self.configs[Region.US_EAST])
Part III: Real-World Application
Chapter 7: Case Studies
7.1 Case Study: Segment
SEGMENT'S EVENT INGESTION
Scale:
├── 200 billion events/month
├── 25,000+ customers
├── Sub-second ingestion latency
Architecture:
├── Edge collection: CDN-based collectors globally
├── Validation: Schema enforcement per source
├── Routing: Events routed to 300+ integrations
├── Storage: Raw events stored in S3
Key decisions:
├── Schema registry for each source
├── At-least-once delivery (idempotent destinations)
├── Customer-specific rate limits
└── Real-time + batch destinations
Lessons:
├── Schema validation prevents downstream issues
├── Rate limiting per customer is essential
└── Idempotency keys enable replay
7.2 Case Study: Uber
UBER'S EVENT INGESTION
Scale:
├── Trillions of events/day
├── 100+ event types
├── Multi-region deployment
Architecture:
├── Apache Kafka as backbone
├── Schema registry (Apache Avro)
├── Flink for real-time processing
├── HDFS/S3 for raw storage
Key decisions:
├── Avro for schema evolution
├── Kafka clusters per region
├── Tiered storage (hot/warm/cold)
└── Sampling for high-volume events
Lessons:
├── Schema evolution is critical at scale
├── Regional clusters reduce latency
└── Not all events need same retention
Chapter 8: Common Mistakes
8.1 Schema Design Mistakes
❌ MISTAKE 1: No Event ID
Wrong:
{"type": "viewed", "product": "123"}
Problem:
- Can't deduplicate
- Can't track event through pipeline
- Can't correlate with other systems
Right:
{
"event_id": "evt_abc123",
"event_type": "product.viewed",
"product_id": "123"
}
❌ MISTAKE 2: Mutable Events
Wrong:
# Updating existing event
event["status"] = "processed"
save(event)
Problem:
- Breaks immutability
- Can't replay from any point
- Audit trail lost
Right:
# Emit new event
emit({
"event_type": "event.processed",
"original_event_id": event["event_id"]
})
❌ MISTAKE 3: Inconsistent Timestamps
Wrong:
{"timestamp": "Jan 15, 2024"}
{"timestamp": 1705312200}
{"timestamp": "2024-01-15T10:30:00"}
Problem:
- Parsing nightmare
- Timezone confusion
- Ordering issues
Right:
{"event_time": "2024-01-15T10:30:00Z"} # Always ISO 8601 UTC
8.2 Infrastructure Mistakes
❌ MISTAKE 4: Synchronous Ingestion
Wrong:
def handle_event(event):
validate(event)
enrich(event)
save_to_database(event) # Blocks!
return "OK"
Problem:
- Database slow = API slow
- No buffering for spikes
- Single point of failure
Right:
async def handle_event(event):
validate(event)
await kafka.send(event) # Async, buffered
return "OK"
❌ MISTAKE 5: No Dead Letter Queue
Wrong:
try:
process(event)
except:
log.error("Failed")
# Event lost forever!
Problem:
- Data loss
- No retry capability
- No debugging
Right:
try:
process(event)
except:
await dlq.send(event, error=str(e))
# Can investigate and replay later
Part IV: Interview Preparation
Chapter 9: Interview Tips
9.1 When to Discuss Event Ingestion
BRING UP EVENT INGESTION WHEN:
1. "Design an analytics system"
→ Start with how events are captured
2. "How would you track user behavior?"
→ Event schema and ingestion pipeline
3. "Design a recommendation system"
→ Needs event data for training
4. "Build a real-time dashboard"
→ Starts with event streaming
5. "Design a fraud detection system"
→ Events are the input
9.2 Key Phrases to Use
TALKING ABOUT EVENT INGESTION:
"For event ingestion, I'd use Kafka as the backbone because
it provides durability, replay capability, and handles
backpressure naturally through consumer lag."
"Events should be immutable and self-describing. Each event
needs a unique ID for deduplication, a timestamp for ordering,
and enough context to be useful without joining other data."
"Schema evolution is critical. I'd use a schema registry to
enforce backward compatibility so we can add fields without
breaking existing consumers."
"For handling spikes, I'd implement backpressure at the
collection layer and use Kafka's buffering. We can shed
low-priority events if needed while always accepting
critical events like orders and payments."
Chapter 10: Practice Problems
Problem 1: Mobile App Event Collection
Setup: Design an event collection system for a mobile app with:
- 10M daily active users
- Average 50 events per user per day
- Offline support (events buffered on device)
- Poor network conditions
Questions:
- How do you handle offline events that arrive days later?
- How do you prevent duplicate events from retries?
- How do you handle clock skew on mobile devices?
- Use client-generated event IDs for deduplication
- Include both client timestamp and server timestamp
- Batch events in mobile SDK before sending
- Accept late events with warnings
Problem 2: High-Volume Gaming Events
Setup: Design event ingestion for a multiplayer game:
- 1M concurrent players
- 100 events per second per player during gameplay
- Need real-time leaderboards
- Some events are critical (purchases), some are telemetry
Questions:
- How do you handle 100M events/second?
- How do you prioritize critical vs telemetry events?
- How do you aggregate for real-time leaderboards?
- Sample telemetry events (1 in 100)
- Separate Kafka topics by priority
- Edge aggregation before central ingestion
- Pre-aggregate scores in streaming layer
Chapter 11: Sample Interview Dialogue
Interviewer: "Design an event tracking system for our e-commerce platform."
You: "I'd start by understanding the event volume and use cases. How many events per day are we expecting, and what's the primary use case — real-time analytics, historical analysis, or both?"
Interviewer: "About 100 million events per day. We need both real-time dashboards and historical analysis."
You: "Got it. Let me design the ingestion layer first, then we can discuss processing.
For ingestion, I'd use a three-tier architecture:
First, a collection layer — lightweight API servers that receive events from web and mobile clients. These validate the schema, add server-side metadata, and immediately write to Kafka. They don't do any heavy processing.
Second, Kafka as the event backbone. I'd create topics by event category — user events, product events, order events. This lets us scale and process them independently. Kafka gives us durability and replay capability.
Third, a validation and enrichment layer — consumers that validate events against our schema registry, enrich them with additional context like geo-location, and route them to the appropriate downstream systems.
For handling spikes, Kafka naturally buffers events. If producers are faster than consumers, the lag increases but nothing is lost. We can also scale consumers horizontally."
Interviewer: "What about events from mobile devices that might be offline?"
You: "Good point. Mobile SDK should buffer events locally and send in batches when connectivity is available. Each event needs a client-generated UUID so we can deduplicate if the same batch is sent twice due to network retries.
For late-arriving events, I'd include both the client timestamp (when it happened) and server timestamp (when we received it). The processing layer can handle late events appropriately — for real-time dashboards, we might show them with a delay indicator, while for historical analysis, they're processed normally.
We should also validate that client timestamps are reasonable — not too far in the future, and not older than our retention policy allows."
Summary
DAY 1 SUMMARY: EVENT INGESTION AT SCALE
EVENT DESIGN
├── Immutable, timestamped facts
├── Unique ID for deduplication
├── Self-describing (schema + context)
├── Versioned for evolution
└── ISO 8601 UTC timestamps
INGESTION ARCHITECTURE
├── Collection layer (lightweight APIs)
├── Validation layer (schema enforcement)
├── Kafka backbone (durability + buffering)
├── Enrichment layer (add context)
└── Routing to downstream systems
SCHEMA EVOLUTION
├── Backward compatible changes only
├── Add optional fields (safe)
├── Never remove required fields
├── Use schema registry
└── Version all events
HANDLING SPIKES
├── Kafka buffers naturally
├── Rate limiting per client
├── Load shedding for non-critical
├── Adaptive rate limits
└── Multi-region for global scale
RELIABILITY
├── At-least-once delivery default
├── Exactly-once with transactions
├── Dead letter queue for failures
├── Idempotency keys for dedup
└── Replay capability always
Key Takeaways
EVENT INGESTION KEY TAKEAWAYS
1. EVENTS ARE IMMUTABLE FACTS
Never update an event — emit a new one
2. SCHEMA IS CONTRACT
Schema registry prevents downstream breakage
3. KAFKA IS THE BACKBONE
Durability, buffering, replay all built-in
4. VALIDATE EARLY
Catch bad data at ingestion, not in analytics
5. DESIGN FOR FAILURE
DLQ, retries, idempotency from day one
GOLDEN RULES:
├── Every event has unique ID
├── Always use UTC timestamps
├── Never drop events silently
├── Schema evolution = backward compatible
└── Client timestamp ≠ Server timestamp
What's Next
Tomorrow, we'll tackle Streaming vs Batch Processing — understanding when to use real-time stream processing versus batch processing, and how to architect systems that use both.
TOMORROW'S PREVIEW: STREAMING VS BATCH
Questions we'll answer:
├── When do you need real-time vs batch?
├── What is Lambda Architecture?
├── What is Kappa Architecture?
├── How do you handle state in streaming?
└── When is Flink vs Spark the right choice?
We'll take the events we're now capturing and process
them into useful metrics and insights.
End of Week 8, Day 1
Next: Day 2 — Streaming vs Batch Processing