Himanshu Kukreja
0%
Day 01

Week 8 — Day 1: Event Ingestion at Scale

System Design Mastery Series — Analytics Pipeline Week


Preface

Last week, we built search systems that help users find information. But how do we know what users are actually searching for? How do we track which results they click? How do we measure if our search is actually helping?

THE VISIBILITY PROBLEM

Your e-commerce platform is live:
├── 50,000 users browsing right now
├── Searches happening every second
├── Products being viewed, added to cart, purchased
├── Errors occurring somewhere
├── Performance varying across regions

But you have no idea:
├── What's the conversion rate right now?
├── Which products are trending?
├── Where are users dropping off?
├── Is the new feature helping or hurting?
├── Why did revenue drop 20% this morning?

Without events, you're flying blind.

This is why event ingestion matters.

Today, we'll learn to capture every meaningful action in your system — reliably, at scale, without losing data or blocking your application.


Part I: Foundations

Chapter 1: What Is Event Ingestion?

1.1 The Simple Definition

Event ingestion is the process of capturing, validating, and storing events from various sources into a system that can process and analyze them.

EVENTS ARE FACTS ABOUT WHAT HAPPENED

An event is an immutable record that something occurred:

    {
      "event_id": "evt_abc123",
      "event_type": "product.viewed",
      "timestamp": "2024-01-15T10:30:00Z",
      "user_id": "user_456",
      "data": {
        "product_id": "prod_789",
        "source": "search_results",
        "position": 3
      }
    }

Key properties:
├── Immutable: Once recorded, never changed
├── Timestamped: When did it happen?
├── Typed: What kind of event?
├── Contextual: Who, what, where, how?
└── Unique: Can be deduplicated

1.2 Why Events, Not Logs?

LOGS VS EVENTS

LOGS (unstructured):
  "2024-01-15 10:30:00 INFO User 456 viewed product 789 from search"
  
  Problems:
  ├── Parsing is fragile (regex hell)
  ├── Schema changes break parsers
  ├── Missing context (what search query?)
  └── Hard to aggregate

EVENTS (structured):
  {
    "event_type": "product.viewed",
    "user_id": "user_456",
    "product_id": "prod_789",
    "source": "search",
    "search_query": "running shoes",
    "position": 3
  }
  
  Benefits:
  ├── Schema-aware (validation possible)
  ├── Self-describing
  ├── Easy to query and aggregate
  └── Evolves safely with versioning

1.3 The Ingestion Challenge

SCALE OF EVENT INGESTION

Small startup:
├── 10,000 events/day
├── Simple logging to database
└── Query with SQL

Growing company:
├── 10,000,000 events/day
├── Need streaming pipeline
└── Batch processing for analytics

Large platform:
├── 1,000,000,000+ events/day
├── Distributed ingestion
├── Real-time + batch processing
├── Multi-region replication
└── Complex event routing

The challenge isn't just volume — it's:
├── Never losing events (durability)
├── Never blocking producers (availability)
├── Handling spikes (elasticity)
├── Maintaining order (where needed)
└── Enabling replay (reprocessing)

Chapter 2: Event Schema Design

2.1 Anatomy of a Good Event

# schemas/event_schema.py

"""
Event schema design principles.

A well-designed event is:
- Self-describing (contains its own context)
- Immutable (never modified after creation)
- Uniquely identifiable (for deduplication)
- Timestamped (event time vs processing time)
- Versioned (for schema evolution)
"""

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Dict, Any
from enum import Enum
import uuid


class EventType(Enum):
    """Categorized event types."""
    # User actions
    PAGE_VIEWED = "page.viewed"
    PRODUCT_VIEWED = "product.viewed"
    PRODUCT_SEARCHED = "product.searched"
    CART_UPDATED = "cart.updated"
    ORDER_PLACED = "order.placed"
    
    # System events
    ERROR_OCCURRED = "error.occurred"
    PERFORMANCE_MEASURED = "performance.measured"
    
    # Business events
    PAYMENT_PROCESSED = "payment.processed"
    SHIPMENT_UPDATED = "shipment.updated"


@dataclass
class EventMetadata:
    """
    Standard metadata for all events.
    
    This is the "envelope" that wraps every event.
    """
    # Identity
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    event_type: str = ""
    event_version: str = "1.0"
    
    # Timing
    event_time: datetime = field(default_factory=datetime.utcnow)  # When it happened
    received_time: Optional[datetime] = None  # When server received it
    
    # Source
    source_service: str = ""  # Which service emitted this
    source_instance: str = ""  # Which instance
    
    # Correlation
    correlation_id: Optional[str] = None  # For tracing across services
    causation_id: Optional[str] = None  # What event caused this
    
    # User context
    user_id: Optional[str] = None
    session_id: Optional[str] = None
    device_id: Optional[str] = None


@dataclass
class DeviceContext:
    """Device and client information."""
    platform: str = ""  # web, ios, android
    app_version: str = ""
    os_name: str = ""
    os_version: str = ""
    browser: str = ""
    screen_resolution: str = ""
    timezone: str = ""
    language: str = ""


@dataclass
class LocationContext:
    """Geographic context (if available)."""
    country: str = ""
    region: str = ""
    city: str = ""
    ip_address: str = ""  # Often hashed for privacy


@dataclass
class BaseEvent:
    """
    Base class for all events.
    
    Every event has:
    1. Metadata (envelope)
    2. Device context
    3. Location context
    4. Event-specific payload
    """
    metadata: EventMetadata
    device: DeviceContext
    location: LocationContext
    payload: Dict[str, Any] = field(default_factory=dict)
    
    def to_dict(self) -> Dict[str, Any]:
        """Serialize event to dictionary."""
        return {
            "event_id": self.metadata.event_id,
            "event_type": self.metadata.event_type,
            "event_version": self.metadata.event_version,
            "event_time": self.metadata.event_time.isoformat(),
            "received_time": (
                self.metadata.received_time.isoformat()
                if self.metadata.received_time else None
            ),
            "source": {
                "service": self.metadata.source_service,
                "instance": self.metadata.source_instance
            },
            "correlation": {
                "correlation_id": self.metadata.correlation_id,
                "causation_id": self.metadata.causation_id
            },
            "user": {
                "user_id": self.metadata.user_id,
                "session_id": self.metadata.session_id,
                "device_id": self.metadata.device_id
            },
            "device": {
                "platform": self.device.platform,
                "app_version": self.device.app_version,
                "os": f"{self.device.os_name} {self.device.os_version}",
                "browser": self.device.browser
            },
            "location": {
                "country": self.location.country,
                "city": self.location.city
            },
            "payload": self.payload
        }


# =============================================================================
# Specific Event Types
# =============================================================================

@dataclass
class ProductViewedEvent(BaseEvent):
    """User viewed a product page."""
    
    def __post_init__(self):
        self.metadata.event_type = EventType.PRODUCT_VIEWED.value
        # Validate required payload fields
        required = ["product_id"]
        for field in required:
            if field not in self.payload:
                raise ValueError(f"Missing required field: {field}")


@dataclass  
class OrderPlacedEvent(BaseEvent):
    """User placed an order."""
    
    def __post_init__(self):
        self.metadata.event_type = EventType.ORDER_PLACED.value
        required = ["order_id", "total_amount", "currency", "items"]
        for field in required:
            if field not in self.payload:
                raise ValueError(f"Missing required field: {field}")


# =============================================================================
# Example Usage
# =============================================================================

def create_product_viewed_event(
    user_id: str,
    product_id: str,
    source: str,
    position: int = None
) -> ProductViewedEvent:
    """Factory function to create product viewed event."""
    
    return ProductViewedEvent(
        metadata=EventMetadata(
            user_id=user_id,
            source_service="web-frontend"
        ),
        device=DeviceContext(
            platform="web",
            browser="Chrome 120"
        ),
        location=LocationContext(
            country="US",
            city="San Francisco"
        ),
        payload={
            "product_id": product_id,
            "source": source,
            "position": position,
            "referrer": "search_results"
        }
    )

2.2 Schema Evolution

# schemas/schema_evolution.py

"""
Schema evolution strategies.

Events are immutable, but schemas evolve. We need to handle:
1. Adding new fields (safe)
2. Removing fields (dangerous)
3. Changing field types (dangerous)
4. Renaming fields (very dangerous)
"""

from dataclasses import dataclass
from typing import Dict, Any, Optional
from enum import Enum


class SchemaCompatibility(Enum):
    """Schema compatibility modes."""
    BACKWARD = "backward"  # New schema can read old data
    FORWARD = "forward"    # Old schema can read new data
    FULL = "full"          # Both directions
    NONE = "none"          # No compatibility guaranteed


@dataclass
class SchemaVersion:
    """Tracks schema versions."""
    event_type: str
    version: str
    schema: Dict[str, Any]
    compatibility: SchemaCompatibility


class SchemaRegistry:
    """
    Manages event schema versions.
    
    In production, use:
    - Confluent Schema Registry (with Kafka)
    - AWS Glue Schema Registry
    - Custom implementation
    """
    
    def __init__(self):
        self.schemas: Dict[str, Dict[str, SchemaVersion]] = {}
    
    def register_schema(
        self,
        event_type: str,
        version: str,
        schema: Dict[str, Any],
        compatibility: SchemaCompatibility = SchemaCompatibility.BACKWARD
    ) -> SchemaVersion:
        """Register a new schema version."""
        
        if event_type not in self.schemas:
            self.schemas[event_type] = {}
        
        # Check compatibility with previous version
        if self.schemas[event_type]:
            latest = self._get_latest_version(event_type)
            if not self._check_compatibility(latest.schema, schema, compatibility):
                raise SchemaIncompatibleError(
                    f"Schema {version} is not {compatibility.value} compatible "
                    f"with {latest.version}"
                )
        
        schema_version = SchemaVersion(
            event_type=event_type,
            version=version,
            schema=schema,
            compatibility=compatibility
        )
        
        self.schemas[event_type][version] = schema_version
        return schema_version
    
    def _get_latest_version(self, event_type: str) -> SchemaVersion:
        """Get the latest registered schema version."""
        versions = sorted(self.schemas[event_type].keys())
        return self.schemas[event_type][versions[-1]]
    
    def _check_compatibility(
        self,
        old_schema: Dict,
        new_schema: Dict,
        mode: SchemaCompatibility
    ) -> bool:
        """Check if schemas are compatible."""
        
        old_fields = set(old_schema.get("required", []))
        new_fields = set(new_schema.get("required", []))
        
        if mode == SchemaCompatibility.BACKWARD:
            # New schema can read old data
            # New required fields are NOT allowed
            new_required = new_fields - old_fields
            return len(new_required) == 0
        
        elif mode == SchemaCompatibility.FORWARD:
            # Old schema can read new data
            # Removing required fields is NOT allowed
            removed_required = old_fields - new_fields
            return len(removed_required) == 0
        
        elif mode == SchemaCompatibility.FULL:
            # Both directions
            return old_fields == new_fields
        
        return True  # NONE compatibility


# =============================================================================
# Schema Evolution Examples
# =============================================================================

SCHEMA_V1 = {
    "type": "object",
    "required": ["event_id", "event_type", "user_id", "product_id"],
    "properties": {
        "event_id": {"type": "string"},
        "event_type": {"type": "string"},
        "user_id": {"type": "string"},
        "product_id": {"type": "string"},
        "timestamp": {"type": "string", "format": "date-time"}
    }
}

# SAFE: Adding optional field
SCHEMA_V2 = {
    "type": "object",
    "required": ["event_id", "event_type", "user_id", "product_id"],
    "properties": {
        "event_id": {"type": "string"},
        "event_type": {"type": "string"},
        "user_id": {"type": "string"},
        "product_id": {"type": "string"},
        "timestamp": {"type": "string", "format": "date-time"},
        "source": {"type": "string"}  # NEW: optional field
    }
}

# DANGEROUS: Adding required field (breaks backward compatibility)
SCHEMA_V3_BAD = {
    "type": "object",
    "required": ["event_id", "event_type", "user_id", "product_id", "session_id"],
    "properties": {
        # ... session_id is now required, old events don't have it
    }
}

# SAFE WAY: Add with default
SCHEMA_V3_GOOD = {
    "type": "object",
    "required": ["event_id", "event_type", "user_id", "product_id"],
    "properties": {
        "event_id": {"type": "string"},
        "event_type": {"type": "string"},
        "user_id": {"type": "string"},
        "product_id": {"type": "string"},
        "session_id": {"type": "string", "default": "unknown"}  # Optional with default
    }
}

2.3 Event Naming Conventions

# schemas/naming_conventions.py

"""
Event naming conventions.

Good event names are:
- Hierarchical (domain.entity.action)
- Past tense (something happened)
- Specific but not too granular
"""

# Event naming patterns

EVENT_NAMING = """
NAMING CONVENTION: domain.entity.action

EXAMPLES:

User events:
├── user.signed_up
├── user.logged_in
├── user.logged_out
├── user.password_changed
├── user.profile_updated

Product events:
├── product.viewed
├── product.searched
├── product.added_to_cart
├── product.removed_from_cart
├── product.wishlisted

Order events:
├── order.placed
├── order.confirmed
├── order.shipped
├── order.delivered
├── order.cancelled
├── order.refunded

Payment events:
├── payment.initiated
├── payment.succeeded
├── payment.failed
├── payment.refunded

System events:
├── system.error_occurred
├── system.performance_measured
├── system.health_checked

ANTI-PATTERNS:

❌ Too generic:
   "event", "action", "update"

❌ Future tense:
   "user.will_signup", "order.placing"

❌ Imperative:
   "create_user", "place_order"

❌ Too specific:
   "user.clicked_blue_signup_button_on_homepage_v2"

❌ Inconsistent:
   "userSignup", "user-login", "USER_LOGOUT"
"""

Chapter 3: Ingestion Architecture

3.1 The Ingestion Pipeline

EVENT INGESTION PIPELINE

┌─────────────────────────────────────────────────────────────────────────┐
│                         DATA SOURCES                                    │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐        │
│  │  Web    │  │ Mobile  │  │ Backend │  │   IoT   │  │  Third  │        │
│  │ Browser │  │  Apps   │  │Services │  │ Devices │  │  Party  │        │
│  └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘        │
└───────┼────────────┼────────────┼────────────┼────────────┼─────────────┘
        │            │            │            │            │
        └────────────┴─────┬──────┴────────────┴────────────┘
                           │
                           ▼
┌──────────────────────────────────────────────────────────────────────────┐
│                      COLLECTION LAYER                                    │
│  ┌────────────────────────────────────────────────────────────────────┐  │
│  │                    Event Collection API                            │  │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                 │  │
│  │  │   /events   │  │  /batch     │  │  /health    │                 │  │
│  │  │   (single)  │  │  (bulk)     │  │  (status)   │                 │  │
│  │  └─────────────┘  └─────────────┘  └─────────────┘                 │  │
│  └────────────────────────────────────────────────────────────────────┘  │
│                                │                                         │
│                                ▼                                         │
│  ┌────────────────────────────────────────────────────────────────────┐  │
│  │                      Validation Layer                              │  │
│  │  • Schema validation        • Required fields check                │  │
│  │  • Type coercion            • Size limits                          │  │
│  │  • Timestamp normalization  • Deduplication check                  │  │
│  └────────────────────────────────────────────────────────────────────┘  │
│                                │                                         │
│                    ┌───────────┴───────────┐                             │
│                    ▼                       ▼                             │
│             ┌──────────┐            ┌──────────┐                         │
│             │  Valid   │            │ Invalid  │                         │
│             │  Events  │            │  Events  │                         │
│             └────┬─────┘            └────┬─────┘                         │
│                  │                       │                               │
└──────────────────┼───────────────────────┼───────────────────────────────┘
                   │                       │
                   ▼                       ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                      STREAMING LAYER (KAFKA)                            │
│  ┌────────────────┐  ┌────────────────┐  ┌────────────────┐             │
│  │  raw_events    │  │ validated_     │  │ dead_letter_   │             │
│  │  (all events)  │  │ events         │  │ events         │             │
│  │                │  │ (clean data)   │  │ (failures)     │             │
│  └────────────────┘  └────────────────┘  └────────────────┘             │
└─────────────────────────────────────────────────────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                      PROCESSING LAYER                                   │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐                   │
│  │   Real-time  │  │    Batch     │  │   Raw Lake   │                   │
│  │   Processor  │  │   Processor  │  │   Storage    │                   │
│  │   (Flink)    │  │   (Spark)    │  │   (S3)       │                   │
│  └──────────────┘  └──────────────┘  └──────────────┘                   │
└─────────────────────────────────────────────────────────────────────────┘

3.2 Collection API Design

# ingestion/collection_api.py

"""
Event Collection API.

High-performance API for receiving events from clients.
"""

from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
import asyncio
import logging

logger = logging.getLogger(__name__)

app = FastAPI(title="Event Collection API")


# =============================================================================
# Request/Response Models
# =============================================================================

class EventRequest(BaseModel):
    """Single event submission."""
    event_id: str = Field(..., description="Unique event identifier")
    event_type: str = Field(..., description="Event type (e.g., product.viewed)")
    event_time: datetime = Field(..., description="When the event occurred")
    user_id: Optional[str] = None
    session_id: Optional[str] = None
    payload: Dict[str, Any] = Field(default_factory=dict)
    
    class Config:
        json_schema_extra = {
            "example": {
                "event_id": "evt_abc123",
                "event_type": "product.viewed",
                "event_time": "2024-01-15T10:30:00Z",
                "user_id": "user_456",
                "payload": {
                    "product_id": "prod_789",
                    "source": "search"
                }
            }
        }


class BatchEventRequest(BaseModel):
    """Batch event submission."""
    events: List[EventRequest] = Field(..., max_length=1000)


class EventResponse(BaseModel):
    """Response for event submission."""
    status: str
    event_id: str
    received_at: datetime


class BatchEventResponse(BaseModel):
    """Response for batch submission."""
    status: str
    accepted: int
    rejected: int
    errors: List[Dict[str, str]] = []


# =============================================================================
# Collection Service
# =============================================================================

class EventCollector:
    """
    Collects and forwards events to Kafka.
    """
    
    def __init__(self, kafka_producer, validator, enricher):
        self.producer = kafka_producer
        self.validator = validator
        self.enricher = enricher
        
        # Metrics
        self.events_received = 0
        self.events_accepted = 0
        self.events_rejected = 0
    
    async def collect_event(
        self,
        event: EventRequest,
        request_context: Dict[str, Any]
    ) -> EventResponse:
        """
        Process a single event.
        
        Steps:
        1. Add server-side metadata
        2. Validate against schema
        3. Enrich with additional context
        4. Send to Kafka
        """
        
        self.events_received += 1
        received_at = datetime.utcnow()
        
        # Build full event
        full_event = {
            **event.dict(),
            "received_time": received_at.isoformat(),
            "server_context": {
                "collector_instance": request_context.get("instance_id"),
                "client_ip": request_context.get("client_ip"),
                "user_agent": request_context.get("user_agent")
            }
        }
        
        # Validate
        validation_result = await self.validator.validate(full_event)
        if not validation_result.is_valid:
            self.events_rejected += 1
            # Send to dead letter queue
            await self._send_to_dlq(full_event, validation_result.errors)
            raise HTTPException(
                status_code=400,
                detail={
                    "error": "Validation failed",
                    "details": validation_result.errors
                }
            )
        
        # Enrich
        enriched_event = await self.enricher.enrich(full_event)
        
        # Send to Kafka
        await self._send_to_kafka(enriched_event)
        
        self.events_accepted += 1
        
        return EventResponse(
            status="accepted",
            event_id=event.event_id,
            received_at=received_at
        )
    
    async def collect_batch(
        self,
        batch: BatchEventRequest,
        request_context: Dict[str, Any]
    ) -> BatchEventResponse:
        """
        Process a batch of events.
        
        For efficiency, we:
        1. Validate all events first
        2. Send valid events in a single Kafka batch
        3. Send invalid events to DLQ
        """
        
        received_at = datetime.utcnow()
        accepted = []
        rejected = []
        errors = []
        
        for event in batch.events:
            full_event = {
                **event.dict(),
                "received_time": received_at.isoformat(),
                "server_context": {
                    "collector_instance": request_context.get("instance_id"),
                    "client_ip": request_context.get("client_ip")
                }
            }
            
            validation_result = await self.validator.validate(full_event)
            
            if validation_result.is_valid:
                enriched = await self.enricher.enrich(full_event)
                accepted.append(enriched)
            else:
                rejected.append(full_event)
                errors.append({
                    "event_id": event.event_id,
                    "error": validation_result.errors[0]
                })
        
        # Batch send to Kafka
        if accepted:
            await self._send_batch_to_kafka(accepted)
        
        # Batch send to DLQ
        if rejected:
            await self._send_batch_to_dlq(rejected)
        
        self.events_received += len(batch.events)
        self.events_accepted += len(accepted)
        self.events_rejected += len(rejected)
        
        return BatchEventResponse(
            status="processed",
            accepted=len(accepted),
            rejected=len(rejected),
            errors=errors[:10]  # Limit error details
        )
    
    async def _send_to_kafka(self, event: Dict):
        """Send single event to Kafka."""
        topic = self._get_topic(event["event_type"])
        key = event.get("user_id") or event["event_id"]
        
        await self.producer.send(
            topic=topic,
            key=key,
            value=event
        )
    
    async def _send_batch_to_kafka(self, events: List[Dict]):
        """Send batch of events to Kafka."""
        
        # Group by topic
        by_topic = {}
        for event in events:
            topic = self._get_topic(event["event_type"])
            if topic not in by_topic:
                by_topic[topic] = []
            by_topic[topic].append(event)
        
        # Send each topic batch
        for topic, topic_events in by_topic.items():
            await self.producer.send_batch(topic, topic_events)
    
    async def _send_to_dlq(self, event: Dict, errors: List[str]):
        """Send failed event to dead letter queue."""
        await self.producer.send(
            topic="events.dead_letter",
            value={
                "original_event": event,
                "errors": errors,
                "failed_at": datetime.utcnow().isoformat()
            }
        )
    
    async def _send_batch_to_dlq(self, events: List[Dict]):
        """Send batch of failed events to DLQ."""
        for event in events:
            await self._send_to_dlq(event, ["Batch validation failed"])
    
    def _get_topic(self, event_type: str) -> str:
        """Route event to appropriate Kafka topic."""
        
        # Route by event category
        if event_type.startswith("user."):
            return "events.user"
        elif event_type.startswith("product."):
            return "events.product"
        elif event_type.startswith("order."):
            return "events.order"
        elif event_type.startswith("payment."):
            return "events.payment"
        else:
            return "events.general"


# =============================================================================
# API Endpoints
# =============================================================================

# Global collector instance (initialized on startup)
collector: EventCollector = None


@app.post("/v1/events", response_model=EventResponse)
async def collect_single_event(
    event: EventRequest,
    request: Request,
    background_tasks: BackgroundTasks
):
    """
    Collect a single event.
    
    Use this for real-time event tracking where immediate
    confirmation is needed.
    """
    
    context = {
        "instance_id": "collector-1",
        "client_ip": request.client.host,
        "user_agent": request.headers.get("user-agent")
    }
    
    return await collector.collect_event(event, context)


@app.post("/v1/events/batch", response_model=BatchEventResponse)
async def collect_batch_events(
    batch: BatchEventRequest,
    request: Request
):
    """
    Collect a batch of events.
    
    Use this for:
    - Mobile apps sending buffered events
    - Backend services with high event volume
    - Periodic sync of offline events
    
    Maximum 1000 events per batch.
    """
    
    context = {
        "instance_id": "collector-1",
        "client_ip": request.client.host
    }
    
    return await collector.collect_batch(batch, context)


@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {
        "status": "healthy",
        "metrics": {
            "events_received": collector.events_received,
            "events_accepted": collector.events_accepted,
            "events_rejected": collector.events_rejected
        }
    }

Part II: Implementation

Chapter 4: Validation and Enrichment

4.1 Event Validation

# ingestion/validation.py

"""
Event validation layer.

Validates events against schemas and business rules.
"""

from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import re
import json


@dataclass
class ValidationResult:
    """Result of event validation."""
    is_valid: bool
    errors: List[str]
    warnings: List[str]


class EventValidator:
    """
    Validates events before ingestion.
    
    Validation layers:
    1. Schema validation (structure)
    2. Type validation (field types)
    3. Business rules (logical constraints)
    4. Security validation (injection, size)
    """
    
    def __init__(self, schema_registry):
        self.schema_registry = schema_registry
        
        # Limits
        self.max_event_size_bytes = 1024 * 100  # 100KB
        self.max_string_length = 10000
        self.max_array_length = 1000
        self.max_payload_depth = 10
        
        # Time bounds
        self.max_future_seconds = 300  # 5 minutes
        self.max_past_days = 30  # 30 days old
    
    async def validate(self, event: Dict[str, Any]) -> ValidationResult:
        """
        Validate an event through all validation layers.
        """
        
        errors = []
        warnings = []
        
        # Layer 1: Size validation
        size_errors = self._validate_size(event)
        errors.extend(size_errors)
        
        if errors:
            return ValidationResult(False, errors, warnings)
        
        # Layer 2: Required fields
        required_errors = self._validate_required_fields(event)
        errors.extend(required_errors)
        
        if errors:
            return ValidationResult(False, errors, warnings)
        
        # Layer 3: Schema validation
        schema_errors = self._validate_schema(event)
        errors.extend(schema_errors)
        
        # Layer 4: Timestamp validation
        time_errors, time_warnings = self._validate_timestamp(event)
        errors.extend(time_errors)
        warnings.extend(time_warnings)
        
        # Layer 5: Business rules
        business_errors = self._validate_business_rules(event)
        errors.extend(business_errors)
        
        # Layer 6: Security validation
        security_errors = self._validate_security(event)
        errors.extend(security_errors)
        
        return ValidationResult(
            is_valid=len(errors) == 0,
            errors=errors,
            warnings=warnings
        )
    
    def _validate_size(self, event: Dict) -> List[str]:
        """Validate event size limits."""
        
        errors = []
        
        # Total size
        event_json = json.dumps(event)
        if len(event_json) > self.max_event_size_bytes:
            errors.append(
                f"Event size {len(event_json)} exceeds maximum "
                f"{self.max_event_size_bytes} bytes"
            )
        
        return errors
    
    def _validate_required_fields(self, event: Dict) -> List[str]:
        """Validate required fields are present."""
        
        errors = []
        required = ["event_id", "event_type", "event_time"]
        
        for field in required:
            if field not in event or event[field] is None:
                errors.append(f"Missing required field: {field}")
        
        return errors
    
    def _validate_schema(self, event: Dict) -> List[str]:
        """Validate against registered schema."""
        
        errors = []
        event_type = event.get("event_type")
        
        if not event_type:
            return errors  # Already caught in required fields
        
        # Get schema for this event type
        schema = self.schema_registry.get_schema(event_type)
        
        if not schema:
            # Unknown event type - warning but allow
            return []
        
        # Validate payload against schema
        payload = event.get("payload", {})
        
        for field, field_schema in schema.get("properties", {}).items():
            if field in schema.get("required", []):
                if field not in payload:
                    errors.append(f"Missing required payload field: {field}")
            
            if field in payload:
                field_errors = self._validate_field_type(
                    field, payload[field], field_schema
                )
                errors.extend(field_errors)
        
        return errors
    
    def _validate_field_type(
        self,
        field_name: str,
        value: Any,
        schema: Dict
    ) -> List[str]:
        """Validate a single field against its schema."""
        
        errors = []
        expected_type = schema.get("type")
        
        type_mapping = {
            "string": str,
            "integer": int,
            "number": (int, float),
            "boolean": bool,
            "array": list,
            "object": dict
        }
        
        if expected_type in type_mapping:
            expected = type_mapping[expected_type]
            if not isinstance(value, expected):
                errors.append(
                    f"Field '{field_name}' expected {expected_type}, "
                    f"got {type(value).__name__}"
                )
        
        return errors
    
    def _validate_timestamp(self, event: Dict) -> tuple:
        """Validate event timestamp is reasonable."""
        
        errors = []
        warnings = []
        
        event_time_str = event.get("event_time")
        if not event_time_str:
            return errors, warnings
        
        try:
            if isinstance(event_time_str, str):
                event_time = datetime.fromisoformat(
                    event_time_str.replace("Z", "+00:00")
                )
            else:
                event_time = event_time_str
            
            now = datetime.utcnow()
            
            # Check future timestamps
            max_future = now + timedelta(seconds=self.max_future_seconds)
            if event_time.replace(tzinfo=None) > max_future:
                errors.append(
                    f"Event time {event_time} is too far in the future"
                )
            
            # Check old timestamps
            max_past = now - timedelta(days=self.max_past_days)
            if event_time.replace(tzinfo=None) < max_past:
                warnings.append(
                    f"Event time {event_time} is more than "
                    f"{self.max_past_days} days old"
                )
        
        except Exception as e:
            errors.append(f"Invalid event_time format: {e}")
        
        return errors, warnings
    
    def _validate_business_rules(self, event: Dict) -> List[str]:
        """Validate business-specific rules."""
        
        errors = []
        event_type = event.get("event_type")
        payload = event.get("payload", {})
        
        # Example: Order events must have positive amounts
        if event_type == "order.placed":
            amount = payload.get("total_amount", 0)
            if amount <= 0:
                errors.append("Order total_amount must be positive")
        
        # Example: Product views must have product_id
        if event_type == "product.viewed":
            if not payload.get("product_id"):
                errors.append("product.viewed requires product_id")
        
        return errors
    
    def _validate_security(self, event: Dict) -> List[str]:
        """Validate for security concerns."""
        
        errors = []
        
        # Check for potential injection in string fields
        dangerous_patterns = [
            r"<script",
            r"javascript:",
            r"data:text/html",
            r"on\w+\s*=",  # onclick, onerror, etc.
        ]
        
        def check_strings(obj, path=""):
            if isinstance(obj, str):
                for pattern in dangerous_patterns:
                    if re.search(pattern, obj, re.IGNORECASE):
                        errors.append(
                            f"Potentially dangerous content in {path}"
                        )
            elif isinstance(obj, dict):
                for key, value in obj.items():
                    check_strings(value, f"{path}.{key}")
            elif isinstance(obj, list):
                for i, item in enumerate(obj):
                    check_strings(item, f"{path}[{i}]")
        
        check_strings(event)
        
        return errors

4.2 Event Enrichment

# ingestion/enrichment.py

"""
Event enrichment layer.

Adds additional context to events before storage.
"""

from dataclasses import dataclass
from typing import Dict, Any, Optional
from datetime import datetime
import hashlib


@dataclass
class GeoLocation:
    """Geographic location data."""
    country: str
    country_code: str
    region: str
    city: str
    latitude: float
    longitude: float
    timezone: str


class EventEnricher:
    """
    Enriches events with additional context.
    
    Enrichment types:
    1. IP geolocation
    2. User agent parsing
    3. User profile data
    4. Product catalog data
    5. Session reconstruction
    6. Derived fields
    """
    
    def __init__(
        self,
        geo_service,
        user_service,
        product_service,
        session_service
    ):
        self.geo = geo_service
        self.users = user_service
        self.products = product_service
        self.sessions = session_service
    
    async def enrich(self, event: Dict[str, Any]) -> Dict[str, Any]:
        """
        Enrich an event with additional context.
        """
        
        enriched = {**event}
        enriched["enrichment"] = {}
        
        # Enrich with geolocation
        if ip := event.get("server_context", {}).get("client_ip"):
            geo = await self._enrich_geo(ip)
            if geo:
                enriched["enrichment"]["geo"] = geo
        
        # Enrich with user data
        if user_id := event.get("user_id"):
            user_data = await self._enrich_user(user_id)
            if user_data:
                enriched["enrichment"]["user"] = user_data
        
        # Enrich with product data
        if product_id := event.get("payload", {}).get("product_id"):
            product_data = await self._enrich_product(product_id)
            if product_data:
                enriched["enrichment"]["product"] = product_data
        
        # Enrich with session data
        if session_id := event.get("session_id"):
            session_data = await self._enrich_session(session_id)
            if session_data:
                enriched["enrichment"]["session"] = session_data
        
        # Add derived fields
        enriched["derived"] = self._derive_fields(enriched)
        
        return enriched
    
    async def _enrich_geo(self, ip: str) -> Optional[Dict]:
        """Look up geolocation from IP."""
        
        try:
            # Skip private IPs
            if ip.startswith(("10.", "172.", "192.168.", "127.")):
                return None
            
            location = await self.geo.lookup(ip)
            
            if location:
                return {
                    "country": location.country,
                    "country_code": location.country_code,
                    "region": location.region,
                    "city": location.city,
                    "timezone": location.timezone
                }
        except Exception:
            pass
        
        return None
    
    async def _enrich_user(self, user_id: str) -> Optional[Dict]:
        """Look up user profile data."""
        
        try:
            user = await self.users.get_profile(user_id)
            
            if user:
                return {
                    "segment": user.segment,
                    "tier": user.tier,
                    "signup_date": user.signup_date.isoformat(),
                    "lifetime_value": user.ltv,
                    "is_subscriber": user.is_subscriber
                }
        except Exception:
            pass
        
        return None
    
    async def _enrich_product(self, product_id: str) -> Optional[Dict]:
        """Look up product catalog data."""
        
        try:
            product = await self.products.get_product(product_id)
            
            if product:
                return {
                    "category": product.category,
                    "subcategory": product.subcategory,
                    "brand": product.brand,
                    "price": product.price,
                    "is_on_sale": product.is_on_sale
                }
        except Exception:
            pass
        
        return None
    
    async def _enrich_session(self, session_id: str) -> Optional[Dict]:
        """Look up session data."""
        
        try:
            session = await self.sessions.get_session(session_id)
            
            if session:
                return {
                    "session_start": session.start_time.isoformat(),
                    "page_views": session.page_view_count,
                    "is_new_session": session.is_first_session,
                    "entry_page": session.entry_page,
                    "traffic_source": session.traffic_source
                }
        except Exception:
            pass
        
        return None
    
    def _derive_fields(self, event: Dict) -> Dict:
        """Derive additional fields from event data."""
        
        derived = {}
        
        # Derive time dimensions
        event_time = event.get("event_time")
        if event_time:
            if isinstance(event_time, str):
                dt = datetime.fromisoformat(event_time.replace("Z", "+00:00"))
            else:
                dt = event_time
            
            derived["time_dimensions"] = {
                "year": dt.year,
                "month": dt.month,
                "day": dt.day,
                "hour": dt.hour,
                "day_of_week": dt.weekday(),
                "is_weekend": dt.weekday() >= 5,
                "quarter": (dt.month - 1) // 3 + 1
            }
        
        # Derive event category
        event_type = event.get("event_type", "")
        derived["event_category"] = event_type.split(".")[0] if "." in event_type else "unknown"
        
        # Hash sensitive fields for analytics
        if user_id := event.get("user_id"):
            derived["user_id_hash"] = hashlib.sha256(
                user_id.encode()
            ).hexdigest()[:16]
        
        return derived

Chapter 5: Kafka as the Event Backbone

5.1 Kafka Topic Design

# ingestion/kafka_config.py

"""
Kafka configuration for event ingestion.

Topic design principles:
1. Partition by user_id for user-centric ordering
2. Separate topics by event category for independent scaling
3. Retention based on reprocessing needs
4. Compaction for lookup topics
"""

from dataclasses import dataclass
from typing import Dict, List
from enum import Enum


class CleanupPolicy(Enum):
    DELETE = "delete"
    COMPACT = "compact"
    COMPACT_DELETE = "compact,delete"


@dataclass
class TopicConfig:
    """Kafka topic configuration."""
    name: str
    partitions: int
    replication_factor: int
    retention_ms: int  # -1 for infinite
    cleanup_policy: CleanupPolicy
    min_insync_replicas: int = 2
    
    def to_dict(self) -> Dict:
        return {
            "num.partitions": self.partitions,
            "replication.factor": self.replication_factor,
            "retention.ms": self.retention_ms,
            "cleanup.policy": self.cleanup_policy.value,
            "min.insync.replicas": self.min_insync_replicas
        }


# =============================================================================
# Topic Definitions
# =============================================================================

RETENTION_7_DAYS = 7 * 24 * 60 * 60 * 1000
RETENTION_30_DAYS = 30 * 24 * 60 * 60 * 1000
RETENTION_90_DAYS = 90 * 24 * 60 * 60 * 1000
RETENTION_INFINITE = -1

TOPICS = {
    # Raw events (all events, short retention)
    "events.raw": TopicConfig(
        name="events.raw",
        partitions=64,
        replication_factor=3,
        retention_ms=RETENTION_7_DAYS,
        cleanup_policy=CleanupPolicy.DELETE
    ),
    
    # User events (user actions, longer retention)
    "events.user": TopicConfig(
        name="events.user",
        partitions=32,
        replication_factor=3,
        retention_ms=RETENTION_30_DAYS,
        cleanup_policy=CleanupPolicy.DELETE
    ),
    
    # Product events (views, searches, carts)
    "events.product": TopicConfig(
        name="events.product",
        partitions=32,
        replication_factor=3,
        retention_ms=RETENTION_30_DAYS,
        cleanup_policy=CleanupPolicy.DELETE
    ),
    
    # Order events (transactions, keep longer)
    "events.order": TopicConfig(
        name="events.order",
        partitions=16,
        replication_factor=3,
        retention_ms=RETENTION_90_DAYS,
        cleanup_policy=CleanupPolicy.DELETE
    ),
    
    # Payment events (financial, keep indefinitely)
    "events.payment": TopicConfig(
        name="events.payment",
        partitions=16,
        replication_factor=3,
        retention_ms=RETENTION_INFINITE,
        cleanup_policy=CleanupPolicy.DELETE
    ),
    
    # Dead letter queue (failed events)
    "events.dead_letter": TopicConfig(
        name="events.dead_letter",
        partitions=8,
        replication_factor=3,
        retention_ms=RETENTION_30_DAYS,
        cleanup_policy=CleanupPolicy.DELETE
    ),
    
    # Aggregated metrics (compacted for current state)
    "metrics.realtime": TopicConfig(
        name="metrics.realtime",
        partitions=16,
        replication_factor=3,
        retention_ms=RETENTION_INFINITE,
        cleanup_policy=CleanupPolicy.COMPACT
    )
}


# =============================================================================
# Partitioning Strategy
# =============================================================================

class PartitionStrategy:
    """
    Determines which partition an event goes to.
    
    Goals:
    1. Events for same user go to same partition (ordering)
    2. Even distribution across partitions (load balancing)
    3. Consistent assignment (same key always same partition)
    """
    
    @staticmethod
    def get_partition_key(event: Dict) -> str:
        """
        Get the partition key for an event.
        
        Strategy:
        - User events: partition by user_id
        - Anonymous events: partition by session_id or device_id
        - System events: partition by source service
        """
        
        event_type = event.get("event_type", "")
        
        # User-centric events
        if event_type.startswith(("user.", "order.", "payment.")):
            return event.get("user_id") or event.get("session_id") or "anonymous"
        
        # Product events - partition by product for aggregation
        if event_type.startswith("product."):
            # Use user_id if available for user journey analysis
            if user_id := event.get("user_id"):
                return user_id
            # Fall back to product_id for product analytics
            return event.get("payload", {}).get("product_id", "unknown")
        
        # System events - partition by service
        if event_type.startswith("system."):
            return event.get("server_context", {}).get("collector_instance", "default")
        
        # Default: use event_id for even distribution
        return event.get("event_id", "unknown")

5.2 Kafka Producer with Exactly-Once

# ingestion/kafka_producer.py

"""
High-performance Kafka producer for event ingestion.

Features:
- Batching for throughput
- Exactly-once semantics
- Async sending with callbacks
- Compression
- Retries with backoff
"""

from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass
import asyncio
import json
import logging
from datetime import datetime

logger = logging.getLogger(__name__)


@dataclass
class ProducerConfig:
    """Kafka producer configuration."""
    bootstrap_servers: List[str]
    client_id: str = "event-collector"
    
    # Batching
    batch_size: int = 16384  # 16KB
    linger_ms: int = 5  # Wait up to 5ms for batch
    buffer_memory: int = 33554432  # 32MB
    
    # Reliability
    acks: str = "all"  # Wait for all replicas
    retries: int = 5
    retry_backoff_ms: int = 100
    enable_idempotence: bool = True  # Exactly-once
    
    # Compression
    compression_type: str = "lz4"
    
    # Timeouts
    request_timeout_ms: int = 30000
    delivery_timeout_ms: int = 120000


class EventProducer:
    """
    Async Kafka producer for events.
    """
    
    def __init__(self, config: ProducerConfig):
        self.config = config
        self.producer = None
        
        # Metrics
        self.messages_sent = 0
        self.messages_failed = 0
        self.bytes_sent = 0
    
    async def start(self):
        """Initialize the Kafka producer."""
        
        from aiokafka import AIOKafkaProducer
        
        self.producer = AIOKafkaProducer(
            bootstrap_servers=self.config.bootstrap_servers,
            client_id=self.config.client_id,
            acks=self.config.acks,
            enable_idempotence=self.config.enable_idempotence,
            compression_type=self.config.compression_type,
            linger_ms=self.config.linger_ms,
            max_batch_size=self.config.batch_size,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None
        )
        
        await self.producer.start()
        logger.info("Kafka producer started")
    
    async def stop(self):
        """Shutdown the producer gracefully."""
        if self.producer:
            await self.producer.stop()
            logger.info("Kafka producer stopped")
    
    async def send(
        self,
        topic: str,
        value: Dict[str, Any],
        key: str = None,
        headers: Dict[str, str] = None
    ) -> bool:
        """
        Send a single event to Kafka.
        
        Args:
            topic: Target topic
            value: Event data
            key: Partition key
            headers: Optional headers
        
        Returns:
            True if sent successfully
        """
        
        try:
            # Convert headers to Kafka format
            kafka_headers = None
            if headers:
                kafka_headers = [
                    (k, v.encode('utf-8')) for k, v in headers.items()
                ]
            
            # Send and wait for acknowledgment
            result = await self.producer.send_and_wait(
                topic=topic,
                value=value,
                key=key,
                headers=kafka_headers
            )
            
            self.messages_sent += 1
            self.bytes_sent += len(json.dumps(value))
            
            logger.debug(
                f"Event sent to {topic}:{result.partition}@{result.offset}"
            )
            
            return True
            
        except Exception as e:
            self.messages_failed += 1
            logger.error(f"Failed to send event to {topic}: {e}")
            raise
    
    async def send_batch(
        self,
        topic: str,
        events: List[Dict[str, Any]],
        key_extractor: Callable[[Dict], str] = None
    ) -> int:
        """
        Send a batch of events efficiently.
        
        Args:
            topic: Target topic
            events: List of events
            key_extractor: Function to extract key from event
        
        Returns:
            Number of successfully sent events
        """
        
        if not events:
            return 0
        
        # Create batch of futures
        futures = []
        
        for event in events:
            key = None
            if key_extractor:
                key = key_extractor(event)
            elif "user_id" in event:
                key = event["user_id"]
            
            future = await self.producer.send(
                topic=topic,
                value=event,
                key=key
            )
            futures.append(future)
        
        # Wait for all to complete
        success_count = 0
        for i, future in enumerate(futures):
            try:
                await future
                success_count += 1
            except Exception as e:
                logger.error(f"Failed to send event {i}: {e}")
                self.messages_failed += 1
        
        self.messages_sent += success_count
        
        return success_count
    
    async def flush(self):
        """Flush any pending messages."""
        if self.producer:
            await self.producer.flush()
    
    def get_metrics(self) -> Dict[str, int]:
        """Get producer metrics."""
        return {
            "messages_sent": self.messages_sent,
            "messages_failed": self.messages_failed,
            "bytes_sent": self.bytes_sent
        }


# =============================================================================
# Transaction Support (Exactly-Once)
# =============================================================================

class TransactionalEventProducer(EventProducer):
    """
    Kafka producer with transaction support for exactly-once semantics.
    
    Use when you need to:
    1. Send events atomically (all or nothing)
    2. Coordinate with other Kafka operations
    3. Ensure exactly-once delivery
    """
    
    def __init__(self, config: ProducerConfig, transactional_id: str):
        super().__init__(config)
        self.transactional_id = transactional_id
    
    async def start(self):
        """Initialize transactional producer."""
        
        from aiokafka import AIOKafkaProducer
        
        self.producer = AIOKafkaProducer(
            bootstrap_servers=self.config.bootstrap_servers,
            client_id=self.config.client_id,
            transactional_id=self.transactional_id,
            acks=self.config.acks,
            enable_idempotence=True,  # Required for transactions
            compression_type=self.config.compression_type
        )
        
        await self.producer.start()
        
        # Initialize transactions
        await self.producer.begin_transaction()
        
        logger.info(f"Transactional producer started: {self.transactional_id}")
    
    async def send_transactional(
        self,
        events: List[tuple]  # List of (topic, event) tuples
    ) -> bool:
        """
        Send multiple events in a single transaction.
        
        Either all events are delivered or none are.
        """
        
        try:
            # Begin transaction
            await self.producer.begin_transaction()
            
            # Send all events
            for topic, event in events:
                await self.producer.send(topic, value=event)
            
            # Commit transaction
            await self.producer.commit_transaction()
            
            self.messages_sent += len(events)
            return True
            
        except Exception as e:
            logger.error(f"Transaction failed: {e}")
            
            # Abort transaction
            await self.producer.abort_transaction()
            
            self.messages_failed += len(events)
            return False

Chapter 6: Handling Ingestion Spikes

6.1 Backpressure and Rate Limiting

# ingestion/backpressure.py

"""
Backpressure handling for event ingestion.

When ingestion rate exceeds processing capacity:
1. Buffer events temporarily
2. Apply rate limiting
3. Shed load gracefully
4. Alert operators
"""

from dataclasses import dataclass
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
import asyncio
from collections import deque
import logging

logger = logging.getLogger(__name__)


@dataclass
class BackpressureConfig:
    """Backpressure configuration."""
    
    # Queue limits
    max_queue_size: int = 100000
    warning_queue_size: int = 50000
    
    # Rate limits
    max_events_per_second: int = 50000
    max_events_per_client: int = 1000  # Per second
    
    # Timeouts
    queue_timeout_seconds: int = 30
    
    # Load shedding
    enable_load_shedding: bool = True
    shed_threshold: float = 0.9  # Shed when queue > 90% full


class BackpressureManager:
    """
    Manages backpressure for event ingestion.
    """
    
    def __init__(self, config: BackpressureConfig):
        self.config = config
        
        # Event queue
        self.queue = asyncio.Queue(maxsize=config.max_queue_size)
        
        # Rate tracking
        self.event_count_window = deque(maxlen=1000)  # Last 1000 events
        self.client_event_counts: Dict[str, int] = {}
        self.last_reset = datetime.utcnow()
        
        # Status
        self.is_shedding = False
        self.events_shed = 0
    
    async def accept_event(
        self,
        event: Dict[str, Any],
        client_id: str
    ) -> bool:
        """
        Try to accept an event for processing.
        
        Returns True if accepted, False if rejected.
        """
        
        # Check client rate limit
        if not self._check_client_rate(client_id):
            logger.warning(f"Client {client_id} exceeded rate limit")
            return False
        
        # Check global rate limit
        if not self._check_global_rate():
            logger.warning("Global rate limit exceeded")
            return False
        
        # Check queue capacity
        queue_usage = self.queue.qsize() / self.config.max_queue_size
        
        # Load shedding
        if self.config.enable_load_shedding and queue_usage > self.config.shed_threshold:
            if not self.is_shedding:
                self.is_shedding = True
                logger.warning(f"Entering load shedding mode. Queue: {queue_usage:.1%}")
            
            # Shed non-critical events
            if self._should_shed(event):
                self.events_shed += 1
                return False
        else:
            if self.is_shedding:
                self.is_shedding = False
                logger.info("Exiting load shedding mode")
        
        # Try to queue event
        try:
            self.queue.put_nowait(event)
            self._record_event(client_id)
            return True
        except asyncio.QueueFull:
            logger.error("Event queue full, rejecting event")
            return False
    
    async def get_event(self, timeout: float = None) -> Optional[Dict]:
        """Get next event from queue."""
        
        try:
            if timeout:
                return await asyncio.wait_for(
                    self.queue.get(),
                    timeout=timeout
                )
            return await self.queue.get()
        except asyncio.TimeoutError:
            return None
    
    def _check_client_rate(self, client_id: str) -> bool:
        """Check if client is within rate limit."""
        
        self._reset_if_needed()
        
        count = self.client_event_counts.get(client_id, 0)
        return count < self.config.max_events_per_client
    
    def _check_global_rate(self) -> bool:
        """Check if global rate is within limit."""
        
        # Count events in last second
        now = datetime.utcnow()
        one_second_ago = now - timedelta(seconds=1)
        
        recent_count = sum(
            1 for ts in self.event_count_window
            if ts > one_second_ago
        )
        
        return recent_count < self.config.max_events_per_second
    
    def _record_event(self, client_id: str):
        """Record event for rate tracking."""
        
        now = datetime.utcnow()
        self.event_count_window.append(now)
        
        if client_id not in self.client_event_counts:
            self.client_event_counts[client_id] = 0
        self.client_event_counts[client_id] += 1
    
    def _reset_if_needed(self):
        """Reset per-second counters if needed."""
        
        now = datetime.utcnow()
        if (now - self.last_reset).total_seconds() >= 1:
            self.client_event_counts.clear()
            self.last_reset = now
    
    def _should_shed(self, event: Dict) -> bool:
        """Determine if event should be shed during overload."""
        
        event_type = event.get("event_type", "")
        
        # Never shed critical events
        critical_types = [
            "order.placed",
            "payment.processed",
            "user.signed_up"
        ]
        
        if any(event_type.startswith(t) for t in critical_types):
            return False
        
        # Shed less important events
        shedable_types = [
            "page.viewed",
            "product.viewed",
            "system.performance"
        ]
        
        return any(event_type.startswith(t) for t in shedable_types)
    
    def get_status(self) -> Dict:
        """Get backpressure status."""
        
        return {
            "queue_size": self.queue.qsize(),
            "queue_capacity": self.config.max_queue_size,
            "queue_usage": self.queue.qsize() / self.config.max_queue_size,
            "is_shedding": self.is_shedding,
            "events_shed": self.events_shed,
            "recent_rate": len(self.event_count_window)
        }


# =============================================================================
# Adaptive Rate Limiting
# =============================================================================

class AdaptiveRateLimiter:
    """
    Adaptive rate limiter that adjusts based on system health.
    
    When downstream is healthy: allow more traffic
    When downstream is struggling: reduce limits
    """
    
    def __init__(
        self,
        initial_rate: int = 10000,
        min_rate: int = 1000,
        max_rate: int = 100000
    ):
        self.current_rate = initial_rate
        self.min_rate = min_rate
        self.max_rate = max_rate
        
        # Sliding window for health tracking
        self.success_count = 0
        self.failure_count = 0
        self.last_adjustment = datetime.utcnow()
    
    def record_success(self):
        """Record successful event processing."""
        self.success_count += 1
        self._maybe_adjust()
    
    def record_failure(self):
        """Record failed event processing."""
        self.failure_count += 1
        self._maybe_adjust()
    
    def _maybe_adjust(self):
        """Adjust rate limit based on health."""
        
        now = datetime.utcnow()
        if (now - self.last_adjustment).total_seconds() < 10:
            return  # Adjust every 10 seconds
        
        total = self.success_count + self.failure_count
        if total < 100:
            return  # Need enough samples
        
        failure_rate = self.failure_count / total
        
        if failure_rate < 0.01:  # <1% failures, increase limit
            self.current_rate = min(
                self.max_rate,
                int(self.current_rate * 1.1)
            )
            logger.info(f"Increased rate limit to {self.current_rate}")
        
        elif failure_rate > 0.05:  # >5% failures, decrease limit
            self.current_rate = max(
                self.min_rate,
                int(self.current_rate * 0.8)
            )
            logger.warning(f"Decreased rate limit to {self.current_rate}")
        
        # Reset counters
        self.success_count = 0
        self.failure_count = 0
        self.last_adjustment = now
    
    def get_current_limit(self) -> int:
        """Get current rate limit."""
        return self.current_rate

6.2 Multi-Region Ingestion

# ingestion/multi_region.py

"""
Multi-region event ingestion architecture.

For global applications:
1. Collect events in each region
2. Aggregate to central location
3. Handle cross-region replication
"""

from dataclasses import dataclass
from typing import Dict, List
from enum import Enum


class Region(Enum):
    US_EAST = "us-east-1"
    US_WEST = "us-west-2"
    EU_WEST = "eu-west-1"
    AP_SOUTH = "ap-south-1"
    AP_NORTHEAST = "ap-northeast-1"


@dataclass
class RegionalConfig:
    """Configuration for a regional collector."""
    region: Region
    kafka_brokers: List[str]
    is_primary: bool = False


MULTI_REGION_ARCHITECTURE = """
MULTI-REGION EVENT INGESTION

                        GLOBAL USERS
                             │
        ┌────────────────────┼────────────────────┐
        │                    │                    │
        ▼                    ▼                    ▼
   ┌─────────┐         ┌─────────┐         ┌─────────┐
   │US East  │         │EU West  │         │AP South │
   │Collector│         │Collector│         │Collector│
   └────┬────┘         └────┬────┘         └────┬────┘
        │                   │                   │
        ▼                   ▼                   ▼
   ┌─────────┐         ┌─────────┐         ┌─────────┐
   │Regional │         │Regional │         │Regional │
   │ Kafka   │         │ Kafka   │         │ Kafka   │
   └────┬────┘         └────┬────┘         └────┬────┘
        │                   │                   │
        └───────────────────┼───────────────────┘
                            │
                    ┌───────▼───────┐
                    │  MirrorMaker  │
                    │  (Replicator) │
                    └───────┬───────┘
                            │
                    ┌───────▼───────┐
                    │Central Kafka  │
                    │  (US East)    │
                    └───────┬───────┘
                            │
              ┌─────────────┼─────────────┐
              ▼             ▼             ▼
         ┌────────┐   ┌────────┐   ┌────────┐
         │ Flink  │   │ Spark  │   │  S3    │
         │Stream  │   │ Batch  │   │ Lake   │
         └────────┘   └────────┘   └────────┘

BENEFITS:
├── Low latency for users (collect locally)
├── Fault tolerance (region can fail)
├── Compliance (data stays in region)
└── Central analytics (aggregate view)

CHALLENGES:
├── Cross-region replication lag
├── Ordering across regions
├── Cost of data transfer
└── Schema consistency
"""


class MultiRegionRouter:
    """
    Routes events to appropriate regional collector.
    """
    
    def __init__(self, regional_configs: Dict[Region, RegionalConfig]):
        self.configs = regional_configs
    
    def get_collector_for_user(self, user_region: str) -> RegionalConfig:
        """Get the nearest collector for a user."""
        
        # Map user regions to our regions
        region_mapping = {
            "US": Region.US_EAST,
            "CA": Region.US_EAST,
            "MX": Region.US_EAST,
            "BR": Region.US_EAST,
            "GB": Region.EU_WEST,
            "DE": Region.EU_WEST,
            "FR": Region.EU_WEST,
            "IN": Region.AP_SOUTH,
            "SG": Region.AP_SOUTH,
            "JP": Region.AP_NORTHEAST,
            "KR": Region.AP_NORTHEAST,
            "AU": Region.AP_NORTHEAST,
        }
        
        region = region_mapping.get(user_region, Region.US_EAST)
        return self.configs.get(region, self.configs[Region.US_EAST])

Part III: Real-World Application

Chapter 7: Case Studies

7.1 Case Study: Segment

SEGMENT'S EVENT INGESTION

Scale:
├── 200 billion events/month
├── 25,000+ customers
├── Sub-second ingestion latency

Architecture:
├── Edge collection: CDN-based collectors globally
├── Validation: Schema enforcement per source
├── Routing: Events routed to 300+ integrations
├── Storage: Raw events stored in S3

Key decisions:
├── Schema registry for each source
├── At-least-once delivery (idempotent destinations)
├── Customer-specific rate limits
└── Real-time + batch destinations

Lessons:
├── Schema validation prevents downstream issues
├── Rate limiting per customer is essential
└── Idempotency keys enable replay

7.2 Case Study: Uber

UBER'S EVENT INGESTION

Scale:
├── Trillions of events/day
├── 100+ event types
├── Multi-region deployment

Architecture:
├── Apache Kafka as backbone
├── Schema registry (Apache Avro)
├── Flink for real-time processing
├── HDFS/S3 for raw storage

Key decisions:
├── Avro for schema evolution
├── Kafka clusters per region
├── Tiered storage (hot/warm/cold)
└── Sampling for high-volume events

Lessons:
├── Schema evolution is critical at scale
├── Regional clusters reduce latency
└── Not all events need same retention

Chapter 8: Common Mistakes

8.1 Schema Design Mistakes

❌ MISTAKE 1: No Event ID

Wrong:
  {"type": "viewed", "product": "123"}

Problem:
  - Can't deduplicate
  - Can't track event through pipeline
  - Can't correlate with other systems

Right:
  {
    "event_id": "evt_abc123",
    "event_type": "product.viewed",
    "product_id": "123"
  }


❌ MISTAKE 2: Mutable Events

Wrong:
  # Updating existing event
  event["status"] = "processed"
  save(event)

Problem:
  - Breaks immutability
  - Can't replay from any point
  - Audit trail lost

Right:
  # Emit new event
  emit({
    "event_type": "event.processed",
    "original_event_id": event["event_id"]
  })


❌ MISTAKE 3: Inconsistent Timestamps

Wrong:
  {"timestamp": "Jan 15, 2024"}
  {"timestamp": 1705312200}
  {"timestamp": "2024-01-15T10:30:00"}

Problem:
  - Parsing nightmare
  - Timezone confusion
  - Ordering issues

Right:
  {"event_time": "2024-01-15T10:30:00Z"}  # Always ISO 8601 UTC

8.2 Infrastructure Mistakes

❌ MISTAKE 4: Synchronous Ingestion

Wrong:
  def handle_event(event):
      validate(event)
      enrich(event)
      save_to_database(event)  # Blocks!
      return "OK"

Problem:
  - Database slow = API slow
  - No buffering for spikes
  - Single point of failure

Right:
  async def handle_event(event):
      validate(event)
      await kafka.send(event)  # Async, buffered
      return "OK"


❌ MISTAKE 5: No Dead Letter Queue

Wrong:
  try:
      process(event)
  except:
      log.error("Failed")
      # Event lost forever!

Problem:
  - Data loss
  - No retry capability
  - No debugging

Right:
  try:
      process(event)
  except:
      await dlq.send(event, error=str(e))
      # Can investigate and replay later

Part IV: Interview Preparation

Chapter 9: Interview Tips

9.1 When to Discuss Event Ingestion

BRING UP EVENT INGESTION WHEN:

1. "Design an analytics system"
   → Start with how events are captured

2. "How would you track user behavior?"
   → Event schema and ingestion pipeline

3. "Design a recommendation system"
   → Needs event data for training

4. "Build a real-time dashboard"
   → Starts with event streaming

5. "Design a fraud detection system"
   → Events are the input

9.2 Key Phrases to Use

TALKING ABOUT EVENT INGESTION:

"For event ingestion, I'd use Kafka as the backbone because
it provides durability, replay capability, and handles
backpressure naturally through consumer lag."

"Events should be immutable and self-describing. Each event
needs a unique ID for deduplication, a timestamp for ordering,
and enough context to be useful without joining other data."

"Schema evolution is critical. I'd use a schema registry to
enforce backward compatibility so we can add fields without
breaking existing consumers."

"For handling spikes, I'd implement backpressure at the
collection layer and use Kafka's buffering. We can shed
low-priority events if needed while always accepting
critical events like orders and payments."

Chapter 10: Practice Problems

Problem 1: Mobile App Event Collection

Setup: Design an event collection system for a mobile app with:

  • 10M daily active users
  • Average 50 events per user per day
  • Offline support (events buffered on device)
  • Poor network conditions

Questions:

  1. How do you handle offline events that arrive days later?
  2. How do you prevent duplicate events from retries?
  3. How do you handle clock skew on mobile devices?
  • Use client-generated event IDs for deduplication
  • Include both client timestamp and server timestamp
  • Batch events in mobile SDK before sending
  • Accept late events with warnings

Problem 2: High-Volume Gaming Events

Setup: Design event ingestion for a multiplayer game:

  • 1M concurrent players
  • 100 events per second per player during gameplay
  • Need real-time leaderboards
  • Some events are critical (purchases), some are telemetry

Questions:

  1. How do you handle 100M events/second?
  2. How do you prioritize critical vs telemetry events?
  3. How do you aggregate for real-time leaderboards?
  • Sample telemetry events (1 in 100)
  • Separate Kafka topics by priority
  • Edge aggregation before central ingestion
  • Pre-aggregate scores in streaming layer

Chapter 11: Sample Interview Dialogue

Interviewer: "Design an event tracking system for our e-commerce platform."

You: "I'd start by understanding the event volume and use cases. How many events per day are we expecting, and what's the primary use case — real-time analytics, historical analysis, or both?"

Interviewer: "About 100 million events per day. We need both real-time dashboards and historical analysis."

You: "Got it. Let me design the ingestion layer first, then we can discuss processing.

For ingestion, I'd use a three-tier architecture:

First, a collection layer — lightweight API servers that receive events from web and mobile clients. These validate the schema, add server-side metadata, and immediately write to Kafka. They don't do any heavy processing.

Second, Kafka as the event backbone. I'd create topics by event category — user events, product events, order events. This lets us scale and process them independently. Kafka gives us durability and replay capability.

Third, a validation and enrichment layer — consumers that validate events against our schema registry, enrich them with additional context like geo-location, and route them to the appropriate downstream systems.

For handling spikes, Kafka naturally buffers events. If producers are faster than consumers, the lag increases but nothing is lost. We can also scale consumers horizontally."

Interviewer: "What about events from mobile devices that might be offline?"

You: "Good point. Mobile SDK should buffer events locally and send in batches when connectivity is available. Each event needs a client-generated UUID so we can deduplicate if the same batch is sent twice due to network retries.

For late-arriving events, I'd include both the client timestamp (when it happened) and server timestamp (when we received it). The processing layer can handle late events appropriately — for real-time dashboards, we might show them with a delay indicator, while for historical analysis, they're processed normally.

We should also validate that client timestamps are reasonable — not too far in the future, and not older than our retention policy allows."


Summary

DAY 1 SUMMARY: EVENT INGESTION AT SCALE

EVENT DESIGN
├── Immutable, timestamped facts
├── Unique ID for deduplication
├── Self-describing (schema + context)
├── Versioned for evolution
└── ISO 8601 UTC timestamps

INGESTION ARCHITECTURE
├── Collection layer (lightweight APIs)
├── Validation layer (schema enforcement)
├── Kafka backbone (durability + buffering)
├── Enrichment layer (add context)
└── Routing to downstream systems

SCHEMA EVOLUTION
├── Backward compatible changes only
├── Add optional fields (safe)
├── Never remove required fields
├── Use schema registry
└── Version all events

HANDLING SPIKES
├── Kafka buffers naturally
├── Rate limiting per client
├── Load shedding for non-critical
├── Adaptive rate limits
└── Multi-region for global scale

RELIABILITY
├── At-least-once delivery default
├── Exactly-once with transactions
├── Dead letter queue for failures
├── Idempotency keys for dedup
└── Replay capability always

Key Takeaways

EVENT INGESTION KEY TAKEAWAYS

1. EVENTS ARE IMMUTABLE FACTS
   Never update an event — emit a new one

2. SCHEMA IS CONTRACT
   Schema registry prevents downstream breakage

3. KAFKA IS THE BACKBONE
   Durability, buffering, replay all built-in

4. VALIDATE EARLY
   Catch bad data at ingestion, not in analytics

5. DESIGN FOR FAILURE
   DLQ, retries, idempotency from day one

GOLDEN RULES:
├── Every event has unique ID
├── Always use UTC timestamps
├── Never drop events silently
├── Schema evolution = backward compatible
└── Client timestamp ≠ Server timestamp

What's Next

Tomorrow, we'll tackle Streaming vs Batch Processing — understanding when to use real-time stream processing versus batch processing, and how to architect systems that use both.

TOMORROW'S PREVIEW: STREAMING VS BATCH

Questions we'll answer:
├── When do you need real-time vs batch?
├── What is Lambda Architecture?
├── What is Kappa Architecture?
├── How do you handle state in streaming?
└── When is Flink vs Spark the right choice?

We'll take the events we're now capturing and process
them into useful metrics and insights.

End of Week 8, Day 1

Next: Day 2 — Streaming vs Batch Processing