Himanshu Kukreja
0%
LearnSystem DesignWeek 3Audit Log System
Day 05

Week 3 — Day 5: Audit Log System

System Design Mastery Series


Preface

It's Friday. You've made it through one of the most intense weeks of this program.

Let's recap what you've learned:

WEEK 3 JOURNEY: Messaging and Async Processing

Day 1: Queue vs Stream
  └── When to use RabbitMQ vs Kafka
  └── Consumer groups, ordering, retention
  └── "Choose based on consumption pattern, not popularity"

Day 2: Transactional Outbox
  └── The dual-write problem
  └── Atomic DB + message publishing
  └── "Never write to two systems without coordination"

Day 3: Backpressure and Flow Control
  └── Detection: lag, depth, latency
  └── Response: shed, limit, scale, spill
  └── "Systems must be able to say 'slow down'"

Day 4: Dead Letters and Poison Pills
  └── Error classification and routing
  └── Quarantine before crash loops
  └── "Failed messages are data, not garbage"

Today, Day 5, we bring it ALL together.

Your final challenge: design a complete audit log system for a financial services company. This system must:

  • Capture every user action (compliance requirement)
  • Never lose an event (legal liability)
  • Handle 50,000 events per second at peak
  • Be queryable for investigations
  • Retain data for 7 years

This isn't a toy example. It's the kind of system that, if it fails, results in regulatory fines, lawsuits, and people losing their jobs.

Let's build it right.


Part I: Foundations

Chapter 1: What Is an Audit Log?

1.1 The Simple Definition

An audit log (or audit trail) is a chronological record of activities that have affected a system. It answers: who did what to which resource, when, and from where.

EVERYDAY ANALOGY: The Security Camera

Think of a building's security system:

  Every entrance has a camera recording 24/7.
  Every badge swipe is logged.
  Every door access is timestamped.
  
  If something goes wrong:
    "Show me who entered Room 401 last Tuesday at 3pm"
    → Camera footage + badge logs = answer
  
  An audit log is the security camera for your software.
  Every action is recorded.
  Nothing is deleted.
  Everything is queryable.

1.2 Why Audit Logs Matter

REGULATORY REQUIREMENTS

Financial Services (SOX, PCI-DSS):
  - All access to financial data must be logged
  - Logs must be retained for 7+ years
  - Logs must be tamper-evident
  
Healthcare (HIPAA):
  - All access to patient records must be logged
  - Must track who viewed what, when
  - Breach notification requires audit trail
  
General (GDPR, CCPA):
  - Must track data access and modifications
  - Right to access: "Show me everything you know about me"
  - Right to deletion: "Prove you deleted my data"

THE COST OF FAILURE

No audit log:
  - "We don't know who accessed the data"
  - "We can't prove we were compliant"
  - "We have no evidence for the investigation"
  
Result:
  - Regulatory fines (millions of dollars)
  - Legal liability (lawsuits)
  - Reputation damage (customer trust)
  - Criminal charges (in extreme cases)

1.3 Audit Log vs Application Log

These are different things:

Aspect Application Log Audit Log
Purpose Debugging, monitoring Compliance, investigation
Audience Engineers Auditors, legal, security
Retention Days to weeks Years
Mutability Can be deleted Must be immutable
Content Technical details Business actions
Schema Loose, varies Strict, standardized
APPLICATION LOG:
  2024-01-15 10:30:45 INFO  Processing request abc123
  2024-01-15 10:30:46 DEBUG SQL: SELECT * FROM users WHERE id=456
  2024-01-15 10:30:46 INFO  Request completed in 142ms

AUDIT LOG:
  {
    "event_id": "evt_789xyz",
    "timestamp": "2024-01-15T10:30:45.123Z",
    "actor": {
      "user_id": "usr_456",
      "email": "alice@company.com",
      "ip_address": "192.168.1.100",
      "user_agent": "Mozilla/5.0..."
    },
    "action": "user.profile.update",
    "resource": {
      "type": "user_profile",
      "id": "usr_789"
    },
    "changes": {
      "email": {"old": "bob@old.com", "new": "bob@new.com"}
    },
    "result": "success",
    "metadata": {
      "request_id": "req_abc123",
      "session_id": "sess_def456"
    }
  }

1.4 Key Terminology

Term Definition
Actor The user or system that performed the action
Action What was done (create, read, update, delete)
Resource What was acted upon (user, account, document)
Immutability Cannot be modified after creation
Tamper-evidence Any modification is detectable
Retention How long logs are kept
Chain of custody Tracking who accessed the logs themselves

Chapter 2: Audit Log Architecture

2.1 High-Level Design

Here's where everything from this week comes together:

COMPLETE AUDIT LOG ARCHITECTURE

                         ┌─────────────────────────────────────────────────┐
                         │              Application Layer                  │
                         │                                                 │
   User Action ─────────▶│  API Server  ──▶  Audit Client  ──▶  Outbox     │
                         │                       │              (Day 2)    │
                         │                       │                         │
                         └───────────────────────┼─────────────────────────┘
                                                 │
                                                 │ Transactional write
                                                 │ (same DB transaction as business logic)
                                                 ▼
                         ┌─────────────────────────────────────────────────┐
                         │              Outbox Publisher                   │
                         │                                                 │
                         │  Polls outbox table ──▶ Publishes to Kafka      │
                         │  (or CDC with Debezium)           (Day 1)       │
                         │                                                 │
                         └───────────────────────┬─────────────────────────┘
                                                 │
                                                 ▼
                         ┌─────────────────────────────────────────────────┐
                         │              Kafka: audit-events                │
                         │                                                 │
                         │  Partitioned by actor_id for ordering           │
                         │  Retention: 7 days (before archive)             │
                         │  Replication: 3 (durability)                    │
                         │                                                 │
                         └──────────┬──────────────────────┬───────────────┘
                                    │                      │
                         ┌──────────▼──────────┐ ┌────────▼────────┐
                         │   Audit Consumer    │ │  Archive Writer │
                         │                     │ │                 │
                         │  Writes to DB       │ │  Writes to S3   │
                         │  (hot storage)      │ │  (cold storage) │
                         │                     │ │                 │
                         │  Backpressure ◀─────┤ │  (Day 3)        │
                         │  handling (Day 3)   │ │                 │
                         │                     │ │                 │
                         │  DLQ if fails ──────┤ │                 │
                         │  (Day 4)            │ │                 │
                         │                     │ │                 │
                         └──────────┬──────────┘ └─────────────────┘
                                    │
                                    ▼
                         ┌─────────────────────────────────────────────────┐
                         │              Storage Layer                      │
                         │                                                 │
                         │  Hot: PostgreSQL/TimescaleDB (90 days)          │
                         │  Warm: ClickHouse (1 year)                      │
                         │  Cold: S3 + Parquet (7 years)                   │
                         │                                                 │
                         └───────────────────────┬─────────────────────────┘
                                                 │
                                                 ▼
                         ┌─────────────────────────────────────────────────┐
                         │              Query Layer                        │
                         │                                                 │
                         │  API for investigators                          │
                         │  Dashboard for compliance                       │
                         │  Export for legal                               │
                         │                                                 │
                         └─────────────────────────────────────────────────┘

2.2 Why This Architecture?

Let's trace how each day's learning applies:

DAY 1 — QUEUE VS STREAM: Why Kafka?

We need:
  ✓ Ordering per actor (see actions in sequence)
  ✓ Multiple consumers (DB writer, archiver, analytics)
  ✓ Replay capability (reprocess if bug found)
  ✓ High throughput (50K events/sec)
  
Kafka wins over RabbitMQ because:
  - Log-based: events persist for replay
  - Consumer groups: multiple independent consumers
  - Partitioning: scale writes horizontally
  - Ordering: per-partition ordering by actor_id


DAY 2 — TRANSACTIONAL OUTBOX: Guaranteed Capture

The nightmare scenario:
  User deletes sensitive data
  Delete succeeds in database
  Audit event fails to publish
  No record of the deletion!
  
Outbox pattern guarantees:
  Business action + audit event in same transaction
  Either both happen or neither happens
  Publisher sends to Kafka asynchronously


DAY 3 — BACKPRESSURE: Handling Peak Load

Black Friday scenario:
  Normal: 5,000 events/sec
  Peak: 50,000 events/sec (10x!)
  
Without backpressure:
  Consumers fall behind
  Lag grows to hours
  Memory exhaustion
  Crash, data loss
  
With backpressure:
  Detect lag growth
  Batch aggressively
  Shed non-critical (metrics, not audit!)
  Scale consumers
  Alert before crisis


DAY 4 — DEAD LETTERS: No Event Left Behind

Even with all precautions:
  Some events will fail to process
  Schema mismatch (old event, new consumer)
  Storage temporarily unavailable
  Unexpected data format
  
DLQ ensures:
  Failed events captured, not lost
  Investigation possible
  Replay after fix
  Compliance maintained

2.3 Design Principles

PRINCIPLE 1: WRITE FIRST, PROCESS LATER

Never block the user action on audit processing.

WRONG:
  User clicks "delete" 
    → Delete from DB
    → Write audit to Kafka (sync)
    → Wait for consumer to process
    → Return to user
  
  If Kafka is slow, user waits. Bad UX.

RIGHT:
  User clicks "delete"
    → Delete from DB + Write to outbox (same transaction)
    → Return to user immediately
    → Background: outbox → Kafka → consumer → storage
  
  User sees instant response. Audit happens async.


PRINCIPLE 2: IMMUTABILITY IS NON-NEGOTIABLE

Once written, audit events must NEVER change.

Implementation:
  - No UPDATE or DELETE on audit tables
  - Append-only storage
  - Cryptographic chaining (each event references previous hash)
  - Write-once object storage (S3 Object Lock)


PRINCIPLE 3: DEFENSE IN DEPTH

Multiple layers of protection:
  - Outbox: survives app crash
  - Kafka replication: survives broker failure
  - Consumer DLQ: survives processing errors
  - Multi-storage: survives single storage failure
  - Cross-region backup: survives datacenter failure


PRINCIPLE 4: QUERY PERFORMANCE MATTERS

"Find all actions by user X on resource Y in January 2023"

Must be fast, even with billions of events.
  - Proper indexing on hot storage
  - Partitioning by time and actor
  - Pre-aggregated views for common queries
  - Full-text search for investigation

Chapter 3: Trade-offs and Considerations

3.1 Storage Trade-offs

HOT STORAGE (0-90 days)
───────────────────────
Options: PostgreSQL, TimescaleDB, Cassandra

PostgreSQL/TimescaleDB:
  ✓ Rich querying (SQL)
  ✓ Familiar operations
  ✓ Good indexing
  ✗ Scaling writes is hard
  ✗ Expensive at scale
  
Cassandra:
  ✓ Massive write throughput
  ✓ Linear scalability
  ✗ Query limitations
  ✗ Operational complexity

Recommendation: TimescaleDB for most cases
  - Time-series optimized
  - Automatic partitioning
  - Compression
  - Still PostgreSQL compatible


WARM STORAGE (90 days - 1 year)
───────────────────────────────
Options: ClickHouse, BigQuery, Snowflake

ClickHouse:
  ✓ Extremely fast analytical queries
  ✓ Column-oriented compression
  ✓ Self-hosted option
  ✗ Operational burden
  
BigQuery/Snowflake:
  ✓ Managed service
  ✓ Scales automatically
  ✗ Vendor lock-in
  ✗ Cost at high volume

Recommendation: ClickHouse if you have ops capacity,
                BigQuery/Snowflake if you want managed


COLD STORAGE (1-7 years)
────────────────────────
Options: S3 + Parquet, Glacier

S3 + Parquet:
  ✓ Extremely cheap
  ✓ Columnar format for analytics
  ✓ Query with Athena/Presto
  ✓ Object Lock for immutability
  
Glacier:
  ✓ Even cheaper
  ✗ Retrieval takes hours
  ✗ Only for true archive

Recommendation: S3 Intelligent-Tiering + Parquet
  - Auto-moves to cheaper tiers
  - Still queryable via Athena

3.2 Consistency vs Latency

THE AUDIT LATENCY QUESTION

How soon must an event be visible in the audit log?

Option A: Synchronous (immediate)
  Write to audit DB in the request path
  User waits for audit write
  
  Latency: +50-200ms per request
  Consistency: Immediate
  Risk: Audit DB down = app down
  
Option B: Asynchronous (eventual)
  Write to outbox, process async
  User doesn't wait
  
  Latency: +0ms to user, 1-60s visibility
  Consistency: Eventual
  Risk: Audit may lag during issues

Option C: Hybrid
  Critical actions: sync
  Regular actions: async
  
  Most systems choose: Asynchronous
  
  Rationale:
    - Most investigations happen hours/days later
    - 1 minute delay is acceptable
    - User experience matters
    - Audit DB issues shouldn't break app

3.3 Schema Design Trade-offs

STRUCTURED VS FLEXIBLE SCHEMA

Structured (strict schema):
  {
    "actor_id": "required string",
    "action": "required enum",
    "resource_type": "required enum",
    "resource_id": "required string",
    "timestamp": "required datetime",
    ...
  }
  
  ✓ Consistent, easy to query
  ✓ Schema validation catches errors
  ✗ Hard to evolve
  ✗ Different actions need different fields

Flexible (schemaless):
  {
    "event_type": "string",
    "data": { /* anything */ }
  }
  
  ✓ Easy to add new event types
  ✓ Each event type has custom fields
  ✗ Hard to query consistently
  ✗ No validation, garbage in


RECOMMENDATION: Structured core + flexible extension

{
  // Required core fields (strict)
  "event_id": "evt_abc123",
  "timestamp": "2024-01-15T10:30:00Z",
  "actor": { "id": "usr_456", "type": "user" },
  "action": "resource.update",
  "resource": { "type": "document", "id": "doc_789" },
  "result": "success",
  
  // Flexible extension (varies by action)
  "details": {
    "changes": { "title": { "old": "Draft", "new": "Final" } },
    "reason": "User requested update"
  },
  
  // Standard metadata (always present)
  "context": {
    "ip_address": "192.168.1.100",
    "request_id": "req_xyz",
    "session_id": "sess_abc"
  }
}

Part II: Implementation

Chapter 4: Basic Implementation

4.1 Audit Event Schema

# Audit Event Schema
# The foundation of the entire system

from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
import uuid
import json


class ActionResult(Enum):
    SUCCESS = "success"
    FAILURE = "failure"
    DENIED = "denied"   # Authorization failure


class ActorType(Enum):
    USER = "user"
    SERVICE = "service"
    SYSTEM = "system"   # Automated processes


@dataclass
class Actor:
    """Who performed the action."""
    id: str
    type: ActorType
    email: Optional[str] = None
    name: Optional[str] = None
    ip_address: Optional[str] = None
    user_agent: Optional[str] = None
    
    def to_dict(self) -> Dict:
        return {
            "id": self.id,
            "type": self.type.value,
            "email": self.email,
            "name": self.name,
            "ip_address": self.ip_address,
            "user_agent": self.user_agent
        }


@dataclass
class Resource:
    """What was acted upon."""
    type: str           # e.g., "user", "document", "account"
    id: str             # Resource identifier
    name: Optional[str] = None  # Human-readable name
    
    def to_dict(self) -> Dict:
        return {
            "type": self.type,
            "id": self.id,
            "name": self.name
        }


@dataclass
class AuditEvent:
    """
    A single audit log entry.
    
    Captures: WHO did WHAT to WHICH RESOURCE, WHEN, and HOW.
    """
    # Core required fields
    actor: Actor
    action: str              # e.g., "document.create", "user.login"
    resource: Resource
    result: ActionResult
    
    # Auto-generated
    event_id: str = field(default_factory=lambda: f"evt_{uuid.uuid4().hex[:12]}")
    timestamp: datetime = field(default_factory=datetime.utcnow)
    
    # Optional details
    changes: Optional[Dict[str, Any]] = None    # For updates: old/new values
    reason: Optional[str] = None                 # Why action was taken
    error_message: Optional[str] = None          # If result is failure
    
    # Context
    request_id: Optional[str] = None
    session_id: Optional[str] = None
    correlation_id: Optional[str] = None        # For tracing across services
    
    # For tamper evidence
    previous_hash: Optional[str] = None         # Hash of previous event
    
    def to_dict(self) -> Dict:
        return {
            "event_id": self.event_id,
            "timestamp": self.timestamp.isoformat() + "Z",
            "actor": self.actor.to_dict(),
            "action": self.action,
            "resource": self.resource.to_dict(),
            "result": self.result.value,
            "changes": self.changes,
            "reason": self.reason,
            "error_message": self.error_message,
            "context": {
                "request_id": self.request_id,
                "session_id": self.session_id,
                "correlation_id": self.correlation_id
            },
            "integrity": {
                "previous_hash": self.previous_hash
            }
        }
    
    def to_json(self) -> str:
        return json.dumps(self.to_dict(), default=str)


# Example usage
def example():
    event = AuditEvent(
        actor=Actor(
            id="usr_456",
            type=ActorType.USER,
            email="alice@company.com",
            ip_address="192.168.1.100"
        ),
        action="document.update",
        resource=Resource(
            type="document",
            id="doc_789",
            name="Q4 Financial Report"
        ),
        result=ActionResult.SUCCESS,
        changes={
            "title": {"old": "Draft Report", "new": "Q4 Financial Report"},
            "status": {"old": "draft", "new": "published"}
        },
        request_id="req_abc123"
    )
    
    print(event.to_json())

4.2 Simple Audit Client

# Simple Audit Client
# WARNING: Not production-ready - for learning only

from typing import Optional
from contextlib import contextmanager


class SimpleAuditClient:
    """
    Basic audit client that writes to outbox.
    Application code uses this to log audit events.
    """
    
    def __init__(self, db_session):
        self.db = db_session
    
    def log(self, event: AuditEvent) -> None:
        """
        Log an audit event.
        
        Writes to outbox table in the current transaction.
        """
        self.db.execute(
            """
            INSERT INTO audit_outbox (
                event_id, 
                event_type,
                payload, 
                created_at
            ) VALUES (%s, %s, %s, %s)
            """,
            (
                event.event_id,
                event.action,
                event.to_json(),
                event.timestamp
            )
        )
    
    @contextmanager
    def capture(
        self,
        actor: Actor,
        action: str,
        resource: Resource,
        request_id: Optional[str] = None
    ):
        """
        Context manager that automatically logs success or failure.
        
        Usage:
            with audit.capture(actor, "document.delete", resource) as event:
                delete_document(doc_id)
                event.changes = {"deleted": True}
        """
        event = AuditEvent(
            actor=actor,
            action=action,
            resource=resource,
            result=ActionResult.SUCCESS,  # Optimistic
            request_id=request_id
        )
        
        try:
            yield event
            # If we get here, action succeeded
            event.result = ActionResult.SUCCESS
        except PermissionError as e:
            event.result = ActionResult.DENIED
            event.error_message = str(e)
            raise
        except Exception as e:
            event.result = ActionResult.FAILURE
            event.error_message = str(e)
            raise
        finally:
            # Always log, regardless of outcome
            self.log(event)


# Usage in application code
async def delete_document(doc_id: str, user: User, audit: SimpleAuditClient):
    """Example: deleting a document with audit logging."""
    
    # Get document for audit trail
    doc = await db.get_document(doc_id)
    
    actor = Actor(
        id=user.id,
        type=ActorType.USER,
        email=user.email
    )
    
    resource = Resource(
        type="document",
        id=doc_id,
        name=doc.title
    )
    
    with audit.capture(actor, "document.delete", resource) as event:
        # The actual business logic
        await db.delete_document(doc_id)
        
        # Capture what was deleted
        event.changes = {
            "title": doc.title,
            "owner": doc.owner_id,
            "size_bytes": doc.size
        }
    
    # If we get here, both delete and audit succeeded
    return {"status": "deleted"}

4.3 Database Schema

-- Audit Outbox Table (for transactional outbox pattern)
CREATE TABLE audit_outbox (
    id BIGSERIAL PRIMARY KEY,
    event_id VARCHAR(64) NOT NULL UNIQUE,
    event_type VARCHAR(128) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at TIMESTAMPTZ,
    
    -- For ordering and deduplication
    CONSTRAINT audit_outbox_event_unique UNIQUE (event_id)
);

CREATE INDEX idx_audit_outbox_unpublished 
    ON audit_outbox (created_at) 
    WHERE published_at IS NULL;


-- Hot Storage: Recent audit events (90 days)
-- Using TimescaleDB for time-series optimization
CREATE TABLE audit_events (
    event_id VARCHAR(64) PRIMARY KEY,
    timestamp TIMESTAMPTZ NOT NULL,
    
    -- Actor
    actor_id VARCHAR(64) NOT NULL,
    actor_type VARCHAR(32) NOT NULL,
    actor_email VARCHAR(256),
    actor_ip VARCHAR(45),
    
    -- Action
    action VARCHAR(128) NOT NULL,
    result VARCHAR(32) NOT NULL,
    
    -- Resource
    resource_type VARCHAR(64) NOT NULL,
    resource_id VARCHAR(64) NOT NULL,
    resource_name VARCHAR(256),
    
    -- Details (flexible)
    changes JSONB,
    error_message TEXT,
    
    -- Context
    request_id VARCHAR(64),
    session_id VARCHAR(64),
    correlation_id VARCHAR(64),
    
    -- Integrity
    previous_hash VARCHAR(64),
    event_hash VARCHAR(64),
    
    -- Full event for archive
    raw_event JSONB NOT NULL
);

-- Convert to TimescaleDB hypertable
SELECT create_hypertable('audit_events', 'timestamp');

-- Indexes for common queries
CREATE INDEX idx_audit_events_actor ON audit_events (actor_id, timestamp DESC);
CREATE INDEX idx_audit_events_resource ON audit_events (resource_type, resource_id, timestamp DESC);
CREATE INDEX idx_audit_events_action ON audit_events (action, timestamp DESC);

-- Composite index for investigation queries
CREATE INDEX idx_audit_events_investigation 
    ON audit_events (actor_id, resource_type, action, timestamp DESC);


-- DLQ for failed audit events
CREATE TABLE audit_dlq (
    id BIGSERIAL PRIMARY KEY,
    event_id VARCHAR(64) NOT NULL,
    original_payload JSONB NOT NULL,
    error_type VARCHAR(128) NOT NULL,
    error_message TEXT,
    stack_trace TEXT,
    retry_count INT NOT NULL DEFAULT 0,
    first_failed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    last_failed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    resolved_at TIMESTAMPTZ,
    resolution VARCHAR(32),  -- 'replayed', 'discarded', 'manual'
    
    CONSTRAINT audit_dlq_event_unique UNIQUE (event_id)
);

CREATE INDEX idx_audit_dlq_unresolved 
    ON audit_dlq (first_failed_at) 
    WHERE resolved_at IS NULL;

Chapter 5: Production-Ready Implementation

5.1 Requirements Recap

  1. Zero event loss — Every action must be logged
  2. High throughput — 50,000 events/sec at peak
  3. Low latency — Don't slow down user actions
  4. Immutability — Events cannot be modified
  5. Queryability — Fast investigation queries
  6. Long retention — 7 years of data
  7. Tamper evidence — Detect any modifications

5.2 Complete Production Implementation

# Production Audit Log System
# Combines all Week 3 patterns

import asyncio
import hashlib
import json
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
from enum import Enum
import logging

logger = logging.getLogger(__name__)


# =============================================================================
# Configuration
# =============================================================================

@dataclass
class AuditConfig:
    """Configuration for the audit system."""
    
    # Kafka
    kafka_topic: str = "audit-events"
    kafka_partitions: int = 32
    kafka_replication: int = 3
    
    # Outbox
    outbox_poll_interval: float = 0.1  # 100ms
    outbox_batch_size: int = 1000
    
    # Consumer
    consumer_batch_size: int = 500
    consumer_batch_timeout: float = 1.0
    
    # Backpressure
    lag_warning_threshold: int = 10_000
    lag_critical_threshold: int = 100_000
    
    # DLQ
    max_retries: int = 3
    
    # Integrity
    enable_hash_chain: bool = True


# =============================================================================
# Audit Client (Application-Side)
# =============================================================================

class AuditClient:
    """
    Production audit client with transactional outbox.
    
    Usage in application code:
        async with db.transaction():
            await business_logic()
            await audit.log(event)  # Same transaction!
    """
    
    def __init__(self, db_pool, config: AuditConfig):
        self.db = db_pool
        self.config = config
        self._actor_context: Dict[str, Actor] = {}  # Per-request context
    
    def set_context(self, request_id: str, actor: Actor):
        """Set actor context for current request (call in middleware)."""
        self._actor_context[request_id] = actor
    
    def clear_context(self, request_id: str):
        """Clear context after request completes."""
        self._actor_context.pop(request_id, None)
    
    def get_actor(self, request_id: str) -> Optional[Actor]:
        """Get actor for current request."""
        return self._actor_context.get(request_id)
    
    async def log(
        self,
        action: str,
        resource: Resource,
        result: ActionResult = ActionResult.SUCCESS,
        changes: Optional[Dict] = None,
        reason: Optional[str] = None,
        error_message: Optional[str] = None,
        request_id: Optional[str] = None,
        actor: Optional[Actor] = None  # Override context
    ) -> str:
        """
        Log an audit event to the outbox.
        
        MUST be called within a database transaction to ensure
        atomicity with the business operation.
        
        Returns: event_id
        """
        # Get actor from context if not provided
        if actor is None and request_id:
            actor = self.get_actor(request_id)
        
        if actor is None:
            raise ValueError("Actor must be provided or set in context")
        
        event = AuditEvent(
            actor=actor,
            action=action,
            resource=resource,
            result=result,
            changes=changes,
            reason=reason,
            error_message=error_message,
            request_id=request_id
        )
        
        # Write to outbox
        await self.db.execute(
            """
            INSERT INTO audit_outbox (event_id, event_type, payload, created_at)
            VALUES ($1, $2, $3, $4)
            """,
            event.event_id,
            event.action,
            event.to_json(),
            event.timestamp
        )
        
        logger.debug(f"Audit event logged to outbox: {event.event_id}")
        return event.event_id
    
    async def log_success(
        self,
        action: str,
        resource: Resource,
        changes: Optional[Dict] = None,
        **kwargs
    ) -> str:
        """Convenience method for successful actions."""
        return await self.log(
            action=action,
            resource=resource,
            result=ActionResult.SUCCESS,
            changes=changes,
            **kwargs
        )
    
    async def log_failure(
        self,
        action: str,
        resource: Resource,
        error_message: str,
        **kwargs
    ) -> str:
        """Convenience method for failed actions."""
        return await self.log(
            action=action,
            resource=resource,
            result=ActionResult.FAILURE,
            error_message=error_message,
            **kwargs
        )


# =============================================================================
# Outbox Publisher (Background Process)
# =============================================================================

class OutboxPublisher:
    """
    Publishes audit events from outbox to Kafka.
    
    Day 2 pattern: Transactional Outbox
    """
    
    def __init__(
        self,
        db_pool,
        kafka_producer,
        config: AuditConfig
    ):
        self.db = db_pool
        self.kafka = kafka_producer
        self.config = config
        self._running = False
        
        # Metrics
        self.events_published = 0
        self.publish_errors = 0
    
    async def start(self):
        """Start the publisher loop."""
        self._running = True
        logger.info("Outbox publisher started")
        
        while self._running:
            try:
                count = await self._publish_batch()
                if count == 0:
                    await asyncio.sleep(self.config.outbox_poll_interval)
            except Exception as e:
                logger.error(f"Publisher error: {e}")
                self.publish_errors += 1
                await asyncio.sleep(1)  # Back off on error
    
    async def stop(self):
        """Stop the publisher."""
        self._running = False
        logger.info(f"Outbox publisher stopped. Published: {self.events_published}")
    
    async def _publish_batch(self) -> int:
        """Publish a batch of events from outbox."""
        # Fetch unpublished events
        rows = await self.db.fetch(
            """
            SELECT id, event_id, event_type, payload
            FROM audit_outbox
            WHERE published_at IS NULL
            ORDER BY created_at
            LIMIT $1
            FOR UPDATE SKIP LOCKED
            """,
            self.config.outbox_batch_size
        )
        
        if not rows:
            return 0
        
        # Publish to Kafka
        published_ids = []
        for row in rows:
            try:
                # Partition by actor_id for ordering
                payload = json.loads(row['payload'])
                actor_id = payload.get('actor', {}).get('id', '')
                partition_key = actor_id.encode('utf-8')
                
                await self.kafka.send_and_wait(
                    self.config.kafka_topic,
                    key=partition_key,
                    value=row['payload'].encode('utf-8')
                )
                
                published_ids.append(row['id'])
                self.events_published += 1
                
            except Exception as e:
                logger.error(f"Failed to publish event {row['event_id']}: {e}")
                self.publish_errors += 1
        
        # Mark as published
        if published_ids:
            await self.db.execute(
                """
                UPDATE audit_outbox
                SET published_at = NOW()
                WHERE id = ANY($1)
                """,
                published_ids
            )
        
        return len(published_ids)


# =============================================================================
# Audit Consumer (Writes to Storage)
# =============================================================================

class AuditConsumer:
    """
    Consumes audit events from Kafka and writes to storage.
    
    Day 3 pattern: Backpressure handling
    Day 4 pattern: Dead letter queue
    """
    
    def __init__(
        self,
        kafka_consumer,
        db_pool,
        dlq_handler,
        config: AuditConfig,
        metrics_client
    ):
        self.kafka = kafka_consumer
        self.db = db_pool
        self.dlq = dlq_handler
        self.config = config
        self.metrics = metrics_client
        
        self._running = False
        self._last_hash: Optional[str] = None
        
        # Metrics
        self.events_processed = 0
        self.events_failed = 0
    
    async def start(self):
        """Start consuming."""
        self._running = True
        logger.info("Audit consumer started")
        
        # Load last hash for chain integrity
        if self.config.enable_hash_chain:
            self._last_hash = await self._get_last_hash()
        
        batch = []
        batch_start = datetime.utcnow()
        
        async for msg in self.kafka:
            if not self._running:
                break
            
            batch.append(msg)
            
            # Process batch if full or timeout
            should_flush = (
                len(batch) >= self.config.consumer_batch_size or
                (datetime.utcnow() - batch_start).total_seconds() > self.config.consumer_batch_timeout
            )
            
            if should_flush and batch:
                await self._process_batch(batch)
                batch = []
                batch_start = datetime.utcnow()
    
    async def _process_batch(self, messages: List) -> None:
        """Process a batch of messages."""
        events_to_insert = []
        
        for msg in messages:
            try:
                event_dict = json.loads(msg.value.decode('utf-8'))
                
                # Add hash chain
                if self.config.enable_hash_chain:
                    event_dict['integrity']['previous_hash'] = self._last_hash
                    event_hash = self._compute_hash(event_dict)
                    event_dict['integrity']['event_hash'] = event_hash
                    self._last_hash = event_hash
                
                events_to_insert.append(event_dict)
                
            except Exception as e:
                logger.error(f"Failed to process event: {e}")
                await self.dlq.handle_failure(msg, e)
                self.events_failed += 1
        
        # Batch insert
        if events_to_insert:
            try:
                await self._batch_insert(events_to_insert)
                self.events_processed += len(events_to_insert)
                
                # Commit offsets
                await self.kafka.commit()
                
            except Exception as e:
                logger.error(f"Batch insert failed: {e}")
                # Send all to DLQ
                for msg in messages:
                    await self.dlq.handle_failure(msg, e)
                self.events_failed += len(messages)
    
    async def _batch_insert(self, events: List[Dict]) -> None:
        """Insert batch of events to database."""
        # Build batch insert
        values = []
        for e in events:
            values.append((
                e['event_id'],
                datetime.fromisoformat(e['timestamp'].rstrip('Z')),
                e['actor']['id'],
                e['actor']['type'],
                e['actor'].get('email'),
                e['actor'].get('ip_address'),
                e['action'],
                e['result'],
                e['resource']['type'],
                e['resource']['id'],
                e['resource'].get('name'),
                json.dumps(e.get('changes')) if e.get('changes') else None,
                e.get('error_message'),
                e['context'].get('request_id'),
                e['context'].get('session_id'),
                e['context'].get('correlation_id'),
                e['integrity'].get('previous_hash'),
                e['integrity'].get('event_hash'),
                json.dumps(e)
            ))
        
        await self.db.executemany(
            """
            INSERT INTO audit_events (
                event_id, timestamp, actor_id, actor_type, actor_email, actor_ip,
                action, result, resource_type, resource_id, resource_name,
                changes, error_message, request_id, session_id, correlation_id,
                previous_hash, event_hash, raw_event
            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
            ON CONFLICT (event_id) DO NOTHING
            """,
            values
        )
    
    def _compute_hash(self, event: Dict) -> str:
        """Compute hash for tamper evidence."""
        # Canonical JSON for consistent hashing
        canonical = json.dumps(event, sort_keys=True, separators=(',', ':'))
        return hashlib.sha256(canonical.encode()).hexdigest()
    
    async def _get_last_hash(self) -> Optional[str]:
        """Get hash of last event for chain integrity."""
        row = await self.db.fetchrow(
            """
            SELECT event_hash
            FROM audit_events
            ORDER BY timestamp DESC
            LIMIT 1
            """
        )
        return row['event_hash'] if row else None


# =============================================================================
# Archive Writer (Cold Storage)
# =============================================================================

class ArchiveWriter:
    """
    Archives old audit events to S3 in Parquet format.
    Runs as a daily job.
    """
    
    def __init__(self, db_pool, s3_client, config: AuditConfig):
        self.db = db_pool
        self.s3 = s3_client
        self.config = config
    
    async def archive_day(self, date: datetime) -> Dict[str, Any]:
        """Archive all events from a specific day."""
        import pyarrow as pa
        import pyarrow.parquet as pq
        
        # Fetch events for the day
        start = date.replace(hour=0, minute=0, second=0, microsecond=0)
        end = start + timedelta(days=1)
        
        rows = await self.db.fetch(
            """
            SELECT raw_event
            FROM audit_events
            WHERE timestamp >= $1 AND timestamp < $2
            ORDER BY timestamp
            """,
            start, end
        )
        
        if not rows:
            return {"date": date.isoformat(), "events": 0, "status": "empty"}
        
        # Convert to Parquet
        events = [json.loads(row['raw_event']) for row in rows]
        table = pa.Table.from_pylist(events)
        
        # Write to S3
        key = f"audit-archive/{date.year}/{date.month:02d}/{date.day:02d}/events.parquet"
        
        buffer = pa.BufferOutputStream()
        pq.write_table(table, buffer, compression='snappy')
        
        await self.s3.put_object(
            Bucket='audit-archive',
            Key=key,
            Body=buffer.getvalue().to_pybytes(),
            # Object Lock for immutability
            ObjectLockMode='GOVERNANCE',
            ObjectLockRetainUntilDate=datetime.utcnow() + timedelta(days=2555)  # 7 years
        )
        
        logger.info(f"Archived {len(events)} events for {date.date()}")
        
        return {
            "date": date.isoformat(),
            "events": len(events),
            "s3_key": key,
            "status": "archived"
        }


# =============================================================================
# Query Service (Investigation)
# =============================================================================

class AuditQueryService:
    """
    Query interface for audit log investigation.
    """
    
    def __init__(self, db_pool, s3_client):
        self.db = db_pool
        self.s3 = s3_client
    
    async def search(
        self,
        actor_id: Optional[str] = None,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        action: Optional[str] = None,
        start_time: Optional[datetime] = None,
        end_time: Optional[datetime] = None,
        limit: int = 100,
        offset: int = 0
    ) -> List[Dict]:
        """
        Search audit events with filters.
        """
        conditions = ["1=1"]
        params = []
        param_idx = 1
        
        if actor_id:
            conditions.append(f"actor_id = ${param_idx}")
            params.append(actor_id)
            param_idx += 1
        
        if resource_type:
            conditions.append(f"resource_type = ${param_idx}")
            params.append(resource_type)
            param_idx += 1
        
        if resource_id:
            conditions.append(f"resource_id = ${param_idx}")
            params.append(resource_id)
            param_idx += 1
        
        if action:
            conditions.append(f"action LIKE ${param_idx}")
            params.append(f"{action}%")
            param_idx += 1
        
        if start_time:
            conditions.append(f"timestamp >= ${param_idx}")
            params.append(start_time)
            param_idx += 1
        
        if end_time:
            conditions.append(f"timestamp <= ${param_idx}")
            params.append(end_time)
            param_idx += 1
        
        params.extend([limit, offset])
        
        query = f"""
            SELECT raw_event
            FROM audit_events
            WHERE {' AND '.join(conditions)}
            ORDER BY timestamp DESC
            LIMIT ${param_idx} OFFSET ${param_idx + 1}
        """
        
        rows = await self.db.fetch(query, *params)
        return [json.loads(row['raw_event']) for row in rows]
    
    async def get_actor_activity(
        self,
        actor_id: str,
        days: int = 30
    ) -> List[Dict]:
        """Get all activity for a specific actor."""
        start_time = datetime.utcnow() - timedelta(days=days)
        return await self.search(actor_id=actor_id, start_time=start_time, limit=1000)
    
    async def get_resource_history(
        self,
        resource_type: str,
        resource_id: str
    ) -> List[Dict]:
        """Get complete history of a resource."""
        return await self.search(
            resource_type=resource_type,
            resource_id=resource_id,
            limit=1000
        )
    
    async def verify_integrity(
        self,
        start_time: datetime,
        end_time: datetime
    ) -> Dict[str, Any]:
        """
        Verify hash chain integrity for a time range.
        
        Returns any gaps or inconsistencies found.
        """
        rows = await self.db.fetch(
            """
            SELECT event_id, timestamp, previous_hash, event_hash, raw_event
            FROM audit_events
            WHERE timestamp >= $1 AND timestamp <= $2
            ORDER BY timestamp
            """,
            start_time, end_time
        )
        
        issues = []
        prev_hash = None
        
        for row in rows:
            # Check chain continuity
            if prev_hash and row['previous_hash'] != prev_hash:
                issues.append({
                    "type": "chain_break",
                    "event_id": row['event_id'],
                    "expected_prev": prev_hash,
                    "actual_prev": row['previous_hash']
                })
            
            # Verify hash
            event = json.loads(row['raw_event'])
            event['integrity']['previous_hash'] = row['previous_hash']
            event['integrity']['event_hash'] = None  # Exclude from hash
            
            canonical = json.dumps(event, sort_keys=True, separators=(',', ':'))
            computed_hash = hashlib.sha256(canonical.encode()).hexdigest()
            
            if computed_hash != row['event_hash']:
                issues.append({
                    "type": "hash_mismatch",
                    "event_id": row['event_id'],
                    "expected_hash": computed_hash,
                    "actual_hash": row['event_hash']
                })
            
            prev_hash = row['event_hash']
        
        return {
            "verified_events": len(rows),
            "issues_found": len(issues),
            "issues": issues,
            "integrity": "valid" if not issues else "compromised"
        }

Chapter 6: Edge Cases and Error Handling

6.1 Edge Case 1: Outbox Table Growing Too Large

SCENARIO: Kafka is down for 2 hours, outbox fills up

Timeline:
  10:00 - Kafka goes down
  10:00 - Outbox publisher can't publish
  10:00 - Outbox starts growing
  12:00 - Outbox has 10M rows
  12:00 - Queries on outbox slow down
  12:00 - Application performance degrades


SOLUTION: Outbox management

1. PARTITION BY TIME
   CREATE TABLE audit_outbox (...)
   PARTITION BY RANGE (created_at);
   
   -- Daily partitions
   CREATE TABLE audit_outbox_2024_01_15 
   PARTITION OF audit_outbox
   FOR VALUES FROM ('2024-01-15') TO ('2024-01-16');

2. CLEANUP PUBLISHED EVENTS
   DELETE FROM audit_outbox
   WHERE published_at < NOW() - INTERVAL '1 hour';
   
   -- Or move to archive
   INSERT INTO audit_outbox_archive
   SELECT * FROM audit_outbox
   WHERE published_at < NOW() - INTERVAL '1 hour';

3. ALERT ON GROWTH
   alert:
     name: "Outbox Growing"
     condition: count(audit_outbox WHERE published_at IS NULL) > 100000
     action: page_oncall

4. CIRCUIT BREAKER
   If outbox > 1M unpublished:
     - Alert critical
     - Consider pausing non-critical auditing
     - Never pause critical (auth, payments)

6.2 Edge Case 2: Consumer Falls Far Behind

SCENARIO: Consumer lag grows to hours

This is Day 3 backpressure in action!

Detection:
  - Consumer lag > 100,000 events
  - Lag growing > 1,000 events/sec
  
Response:

1. SCALE CONSUMERS
   # Kafka allows consumers <= partitions
   # If 32 partitions, can have up to 32 consumers
   kubectl scale deployment audit-consumer --replicas=16

2. INCREASE BATCH SIZE
   # Process more per batch
   config.consumer_batch_size = 1000  # up from 500

3. PARALLEL INSERTS
   # Multiple connections to database
   await asyncio.gather(
       self._batch_insert(batch1),
       self._batch_insert(batch2),
       self._batch_insert(batch3)
   )

4. ALERT BUT DON'T SHED
   # Unlike other systems, audit events can't be dropped
   # Accept temporary latency, never lose data
   
   if lag > critical_threshold:
       alert("Audit lag critical - needs attention")
       # But keep processing, don't shed!

6.3 Edge Case 3: Hash Chain Broken

SCENARIO: Integrity verification finds chain break

Detection:
  Event #1000: hash = "abc123"
  Event #1001: previous_hash = "xyz789" ← Should be "abc123"!
  
Possible causes:
  1. Events processed out of order
  2. Consumer crash between events
  3. Tampering (rare but serious)
  4. Bug in hash computation


INVESTIGATION STEPS:

1. CHECK FOR MISSING EVENTS
   SELECT event_id FROM audit_events
   WHERE timestamp BETWEEN '2024-01-15 10:00' AND '2024-01-15 10:05'
   ORDER BY timestamp;
   
   -- Look for gaps in sequence

2. CHECK DLQ
   SELECT * FROM audit_dlq
   WHERE first_failed_at BETWEEN '2024-01-15 10:00' AND '2024-01-15 10:05';
   
   -- Missing event might be in DLQ

3. CHECK KAFKA
   kafka-console-consumer --topic audit-events \
     --partition 0 --offset 1000 --max-messages 10
   
   -- See what was actually in Kafka

4. IF TAMPERING SUSPECTED
   - Preserve evidence
   - Check database access logs
   - Involve security team
   - Do not "fix" the data


PREVENTION:

1. SINGLE WRITER PER PARTITION
   Only one consumer per partition = guaranteed ordering
   
2. TRANSACTIONAL COMMIT
   Insert + offset commit in same transaction
   
3. CHAIN RECOVERY PROCEDURE
   If chain broken, add "chain_recovery" event
   Documents the break, restarts chain

6.4 Edge Case 4: Clock Skew Issues

SCENARIO: Events arrive with out-of-order timestamps

Server A clock: 10:00:00
Server B clock: 10:00:05 (5 seconds ahead)

Event from Server A: timestamp 10:00:02
Event from Server B: timestamp 10:00:07
Event from Server A: timestamp 10:00:03

Stored order: 10:00:02, 10:00:03, 10:00:07
Actual order: 10:00:02, 10:00:03, 10:00:02 (B thinks it's 10:00:07)


SOLUTIONS:

1. USE INGESTION TIME, NOT EVENT TIME
   Consumer adds received_at timestamp
   Sort by received_at for investigation
   Keep original timestamp for reference

2. NTP SYNC ALL SERVERS
   Ensure clock drift < 100ms
   Monitor clock sync as infrastructure metric

3. INCLUDE BOTH TIMESTAMPS
   {
     "event_time": "2024-01-15T10:00:07Z",  // From producer
     "ingested_at": "2024-01-15T10:00:02Z", // From consumer
     "time_drift_ms": 5000                   // Calculated
   }

4. ALERT ON DRIFT
   if abs(event_time - ingested_at) > 1000ms:
       alert("Clock drift detected on {server}")

6.5 Error Handling Matrix

Error Impact Response Prevention
Outbox write fails Event lost! Rollback entire transaction None (by design)
Kafka down Events queue in outbox Wait and retry Multi-broker setup
Consumer crash Temporary lag Auto-restart, resume Health checks
DB insert fails Event goes to DLQ DLQ + alert Retry with backoff
Hash mismatch Integrity concern Alert, investigate Single writer
Clock skew Order confusion Use ingestion time NTP monitoring
Storage full Can't write Alert, add capacity Capacity planning

Part III: Real-World Application

Chapter 7: How Big Tech Does It

7.1 Case Study: Stripe — Payment Audit Trail

STRIPE AUDIT REQUIREMENTS

Regulatory:
  - PCI-DSS: Log all access to cardholder data
  - SOX: Track all financial transactions
  - GDPR: Record data access and modifications

Scale:
  - Millions of API calls per day
  - Every call potentially audited
  - 7+ years retention

Architecture:

  API Request ──▶ API Server
                      │
                 ┌────┴────┐
                 │         │
            Business    Audit
            Logic       Client
                 │         │
                 ▼         ▼
              Primary   Outbox
                DB        │
                          │ Same transaction
                          ▼
                    Outbox Publisher
                          │
                          ▼
                       Kafka
                          │
              ┌───────────┼───────────┐
              │           │           │
           Primary    Analytics    Compliance
           Storage     Team         Export
              │           
              ▼           
         PostgreSQL (hot)
              │
              ▼ (90 days)
         BigQuery (warm)
              │
              ▼ (1 year)
         GCS Parquet (cold)


Key Decisions:

1. AUDIT HAPPENS IN REQUEST PATH
   Not background - too important to lose
   But: outbox pattern, so async to Kafka
   
2. FIELD-LEVEL TRACKING
   Every field change recorded:
   {
     "changes": {
       "amount": {"old": 1000, "new": 1500},
       "currency": {"old": "USD", "new": "USD"}
     }
   }

3. REQUEST CORRELATION
   Every audit event linked to API request
   Can reconstruct entire request flow
   
4. PII HANDLING
   Sensitive fields tokenized or encrypted
   Separate access controls for PII audit data

Reference: Stripe Engineering Blog

7.2 Case Study: Netflix — Content Access Audit

NETFLIX CONTENT AUDIT

Requirements:
  - Track who accessed which content
  - License compliance (who watched what, where)
  - Internal access to content systems

Scale:
  - Billions of streaming events per day
  - Not all need full audit (sampling for analytics)
  - But: Content system access = full audit

Architecture:

  Internal User ──▶ Content Admin
                         │
                    Audit captured
                    (every action)
                         │
                         ▼
  Streaming ──────▶  Kafka  ◀──── Other Services
  (sampled)            │
                       ▼
                   Consumer
                       │
           ┌───────────┼───────────┐
           │           │           │
      Elasticsearch   S3       License
      (search)     (archive)   Reporting


Key Decisions:

1. TIERED AUDITING
   - Internal admin access: 100% captured
   - Streaming views: Sampled for analytics
   - API calls: Based on sensitivity level
   
2. SEARCH-FIRST DESIGN
   Elasticsearch primary for investigation
   "Who accessed title X in region Y last week?"
   
3. SAMPLING FOR SCALE
   Can't audit every stream start event (billions/day)
   Sample 1% for analytics, 100% for sensitive content

4. GEO-PARTITIONING
   Audit data stays in region (GDPR, licensing)
   EU data in EU storage, etc.

7.3 Case Study: Healthcare — HIPAA Audit

HEALTHCARE AUDIT (HIPAA)

Requirements:
  - Every access to patient records logged
  - "Break the glass" tracking
  - Access by role and relationship
  - 6-year minimum retention

Special Challenges:

1. RELATIONSHIP-BASED ACCESS
   Dr. Smith can see patients assigned to them
   Nurse Jones can see patients on their floor
   Must audit: Who accessed, their role, their relationship
   
2. BREAK THE GLASS
   Emergency override of normal access controls
   Must be logged with extra detail:
   {
     "action": "patient.record.view",
     "access_type": "emergency_override",
     "justification": "Patient in ER, unresponsive",
     "witness": "charge_nurse_456"
   }

3. MINIMUM NECESSARY
   Auditors check: Did user access more than needed?
   Must log exactly which fields viewed, not just "viewed record"

4. PATIENT ACCESS RIGHTS
   Patients can request audit log of who viewed their data
   Must be able to generate per-patient report


Architecture:

  EHR System ──▶ Audit Service ──▶ WORM Storage
                      │                  │
                      │             Write-Once
                      │             Read-Many
                      │             (Tamper-proof)
                      │
                      ▼
                 Compliance
                 Reporting
                      │
                      ├── Daily access reports
                      ├── Break-glass review
                      └── Patient data requests

7.4 Summary: Industry Patterns

Company Focus Key Pattern Unique Challenge
Stripe Payments Field-level tracking PCI-DSS compliance
Netflix Content Tiered auditing Sampling at scale
Healthcare HIPAA Break-the-glass Relationship-based access
Banks SOX Dual-control logging Every transaction
Government FISMA Chain of custody Tamper evidence

Chapter 8: Common Mistakes to Avoid

8.1 Mistake 1: Auditing After Business Logic

❌ WRONG: Separate transactions

async def transfer_money(from_acct, to_acct, amount):
    # Business logic
    async with db.transaction():
        await db.debit(from_acct, amount)
        await db.credit(to_acct, amount)
    
    # Audit separately (WRONG!)
    await audit.log(...)  # What if this fails?

Problem:
  Money moved, but no audit trail
  Compliance violation
  Investigation impossible


✅ CORRECT: Same transaction

async def transfer_money(from_acct, to_acct, amount):
    async with db.transaction():
        # Business logic
        await db.debit(from_acct, amount)
        await db.credit(to_acct, amount)
        
        # Audit in same transaction
        await audit.log(
            action="account.transfer",
            resource=Resource(type="transfer", id=transfer_id),
            changes={
                "from_account": from_acct,
                "to_account": to_acct,
                "amount": amount
            }
        )

If audit fails, entire transfer rolls back.
Atomicity guaranteed.

8.2 Mistake 2: Mutable Audit Logs

❌ WRONG: UPDATE and DELETE allowed

-- "Fixing" an audit record
UPDATE audit_events
SET actor_email = 'correct@email.com'
WHERE event_id = 'evt_123';

-- "Cleaning up" old events
DELETE FROM audit_events
WHERE timestamp < '2020-01-01';

Problems:
  - Tampering possible
  - Chain of custody broken
  - Legal evidence inadmissible


✅ CORRECT: Append-only with corrections

-- Never UPDATE original record
-- Instead, add correction event:

INSERT INTO audit_events (...) VALUES (
    'evt_456',  -- New event ID
    NOW(),
    'system',   -- Actor: system or admin
    'audit.correction',  -- Action type
    'audit_event',       -- Resource type
    'evt_123',           -- Reference to original
    '{"field": "actor_email", "old": "wrong@email.com", "new": "correct@email.com"}'
);

-- Original preserved, correction linked
-- Both visible in audit trail

8.3 Mistake 3: Insufficient Context

❌ WRONG: Minimal information

{
    "action": "delete",
    "resource_id": "doc_123",
    "timestamp": "2024-01-15T10:00:00Z"
}

Problems:
  - Who deleted it?
  - What was deleted?
  - Why was it deleted?
  - From where?


✅ CORRECT: Full context

{
    "event_id": "evt_789",
    "timestamp": "2024-01-15T10:00:00.123Z",
    "actor": {
        "id": "usr_456",
        "email": "alice@company.com",
        "role": "admin",
        "ip_address": "192.168.1.100",
        "user_agent": "Mozilla/5.0..."
    },
    "action": "document.delete",
    "resource": {
        "type": "document",
        "id": "doc_123",
        "name": "Q4 Financial Report.pdf"
    },
    "changes": {
        "title": "Q4 Financial Report.pdf",
        "size_bytes": 1048576,
        "owner": "usr_789",
        "created_at": "2024-01-01T08:00:00Z"
    },
    "context": {
        "request_id": "req_abc",
        "session_id": "sess_def",
        "justification": "Document superseded by updated version"
    }
}

Every question answerable from the event itself.

8.4 Mistake 4: No Query Optimization

❌ WRONG: Querying without indexes

-- "Find all actions by user X"
SELECT * FROM audit_events
WHERE raw_event->>'actor'->>'id' = 'usr_456';

-- Full table scan on billions of rows
-- Query takes hours, times out


✅ CORRECT: Denormalized columns with indexes

-- Extract commonly queried fields to columns
CREATE TABLE audit_events (
    ...
    actor_id VARCHAR(64) NOT NULL,  -- Extracted
    resource_type VARCHAR(64) NOT NULL,  -- Extracted
    ...
);

CREATE INDEX idx_audit_actor ON audit_events (actor_id, timestamp DESC);
CREATE INDEX idx_audit_resource ON audit_events (resource_type, resource_id);

-- Query uses index
SELECT * FROM audit_events
WHERE actor_id = 'usr_456'
ORDER BY timestamp DESC
LIMIT 100;

-- Returns in milliseconds

8.5 Mistake Checklist

Before deploying audit system, verify:

  • Same transaction — Audit in same transaction as business logic
  • Immutable storage — No UPDATE or DELETE on audit tables
  • Full context — Who, what, when, where, why captured
  • Query performance — Indexes for common access patterns
  • Hash chain — Tamper evidence for compliance
  • DLQ handling — Failed events captured, not lost
  • Retention policy — Compliant with regulations
  • Backup strategy — Survives datacenter failure
  • Access controls — Audit log access is itself audited

Part IV: Interview Preparation

Chapter 9: Interview Tips and Phrases

9.1 When to Bring Up Audit Logging

Bring up audit logging when:

  • Designing systems with compliance requirements (finance, health)
  • User data is being accessed or modified
  • Actions need to be traceable for investigation
  • System handles sensitive operations
  • Interviewer asks about security or compliance

9.2 Key Phrases to Use

INTRODUCING AUDIT REQUIREMENTS:

"Before diving into the design, I want to clarify the audit 
requirements. For a financial system like this, we'll need
to track every action for compliance—who did what, when, 
and the system must be able to prove it wasn't tampered with."


EXPLAINING THE ARCHITECTURE:

"For audit logging, I'd use a transactional outbox pattern.
The audit event is written to an outbox table in the same
transaction as the business operation. This guarantees we
never have an action without its corresponding audit trail."


DISCUSSING IMMUTABILITY:

"Audit logs must be immutable—no updates, no deletes. Once
written, an event is permanent. If we need to correct 
something, we add a correction event that references the
original. This preserves the complete history and maintains
evidence integrity."


ADDRESSING SCALE:

"At 50,000 events per second, we need tiered storage. Hot
storage in TimescaleDB for recent data—90 days—with good
query performance. Then data moves to ClickHouse for the 
next year, and finally to S3 Parquet for long-term retention.
Each tier optimized for its access pattern."


HANDLING FAILURES:

"Even with best efforts, some events will fail to process.
We use a dead letter queue to capture these—they're not lost,
just quarantined. We alert on DLQ growth and have procedures
to investigate and replay. For audit logs, we never drop data."

9.3 Questions to Ask Interviewer

  • "What are the regulatory requirements? SOX, HIPAA, GDPR?"
  • "How long must audit data be retained?"
  • "What's the query pattern? Real-time investigation or periodic reports?"
  • "Is there a legal or compliance team that will access this data?"

9.4 Common Follow-up Questions

Question Good Answer
"How do you ensure no audit events are lost?" "Transactional outbox—audit write is in the same DB transaction as the business logic. Either both succeed or both fail. Then async publishing to Kafka with DLQ for any processing failures."
"How do you prove logs weren't tampered with?" "Hash chaining—each event includes the hash of the previous event. To tamper with one, you'd need to recompute all subsequent hashes, which is detectable. Plus write-once storage like S3 Object Lock."
"How do you handle sensitive data in logs?" "Tokenization or encryption for PII. Access controls so only authorized personnel can see full data. The audit log itself has an audit trail—we log who accessed the logs."
"What if the audit system itself goes down?" "Outbox pattern buffers events in the primary database. When the audit pipeline recovers, it catches up. We'd alert immediately on outage, but no data is lost because it's in the transactional outbox."

Chapter 10: Practice Problems

Problem 1: Healthcare Patient Access Audit

Setup: You're designing an audit system for a hospital's electronic health records. Every access to patient data must be logged for HIPAA compliance.

Requirements:

  • 10,000 healthcare workers
  • 500,000 patients
  • Every view, edit, print of patient records audited
  • "Break the glass" emergency access tracking
  • 6-year retention
  • Patients can request report of who accessed their records

Questions:

  1. How do you capture the relationship between accessor and patient?
  2. How do you handle break-the-glass scenarios?
  3. How do you generate per-patient access reports efficiently?
  • Consider role-based access patterns
  • Break-the-glass needs extra justification
  • Pre-aggregate for patient-centric queries

Schema for Relationship Tracking:

{
  "actor": {
    "id": "dr_smith",
    "role": "physician",
    "department": "cardiology"
  },
  "access_context": {
    "relationship": "treating_physician",
    "assignment_id": "assign_123",
    "access_level": "normal"  // or "emergency_override"
  },
  "resource": {
    "type": "patient_record",
    "patient_id": "patient_456",
    "record_section": "medications"
  }
}

Break-the-Glass Handling:

if access_type == "emergency_override":
    event.access_context = {
        "access_level": "emergency_override",
        "justification": request.justification,  # Required
        "witness": request.witness_id,           # Required
        "auto_review": True                      # Flag for compliance
    }
    
    # Send immediate alert
    await alert_compliance_team(event)

Patient-Centric Reports:

-- Materialized view for patient reports
CREATE MATERIALIZED VIEW patient_access_summary AS
SELECT 
    resource_id as patient_id,
    actor_id,
    actor_role,
    COUNT(*) as access_count,
    MIN(timestamp) as first_access,
    MAX(timestamp) as last_access
FROM audit_events
WHERE resource_type = 'patient_record'
GROUP BY resource_id, actor_id, actor_role;

-- Refresh daily
REFRESH MATERIALIZED VIEW patient_access_summary;

Problem 2: Financial Trading Audit

Setup: You're designing audit for a stock trading platform. Every order, modification, and cancellation must be logged with millisecond precision.

Requirements:

  • 100,000 orders per second during market hours
  • Sub-millisecond timestamp accuracy
  • Must reconstruct order book state at any point in time
  • 7-year retention (SEC requirement)
  • Real-time compliance monitoring

Questions:

  1. How do you handle the throughput requirement?
  2. How do you ensure timestamp accuracy?
  3. How do you enable point-in-time reconstruction?
  • Kafka partitioning for throughput
  • Hardware timestamping for accuracy
  • Event sourcing enables reconstruction

High Throughput:

# Kafka configuration
Partitions: 64 (parallel consumers)
Replication: 3
Batch size: Large for throughput

# Partition strategy
partition_key = f"{symbol}:{order_id % 100}"
# Orders for same symbol grouped, but spread load

Timestamp Accuracy:

# Use high-precision timestamps
import time

event = AuditEvent(
    timestamp_ns=time.time_ns(),  # Nanosecond precision
    clock_source="ntp_sync",       # Document source
    sequence_id=get_sequence(),    # Ordering within same ns
)

Point-in-Time Reconstruction:

# Event sourcing model
events = [
    {"type": "order.placed", "order_id": "123", "qty": 100, ...},
    {"type": "order.partial_fill", "order_id": "123", "filled_qty": 50, ...},
    {"type": "order.modified", "order_id": "123", "new_qty": 75, ...},
    {"type": "order.cancelled", "order_id": "123", ...}
]

def reconstruct_order_state(order_id, as_of_timestamp):
    events = get_events(order_id, before=as_of_timestamp)
    state = {}
    for e in events:
        state = apply_event(state, e)
    return state

Problem 3: Multi-Tenant SaaS Audit

Setup: You're designing audit for a multi-tenant SaaS platform where each customer (tenant) has isolated audit data.

Requirements:

  • 1,000 tenants, varying sizes (1-10,000 users each)
  • Tenants can query their own audit logs
  • Some tenants have regulatory requirements, others don't
  • Tenant data isolation is critical

Questions:

  1. How do you ensure tenant isolation?
  2. How do you handle tenants with different retention requirements?
  3. How do you optimize costs for inactive tenants?
  • Partition by tenant for isolation
  • Configuration-driven retention
  • Cold storage for inactive tenants

Tenant Isolation:

-- Row-level security
CREATE POLICY tenant_isolation ON audit_events
    USING (tenant_id = current_setting('app.tenant_id'));

-- Every query automatically filtered
SET app.tenant_id = 'tenant_123';
SELECT * FROM audit_events;  -- Only sees tenant_123 data

Configurable Retention:

@dataclass
class TenantAuditConfig:
    tenant_id: str
    retention_days: int = 90       # Default
    require_hash_chain: bool = False
    compliance_mode: Optional[str] = None  # "hipaa", "sox", etc.

# Load per-tenant config
config = await get_tenant_config(tenant_id)

if config.compliance_mode == "hipaa":
    config.retention_days = 2190  # 6 years
    config.require_hash_chain = True

Cost Optimization:

async def manage_tenant_storage(tenant_id):
    activity = await get_tenant_activity(tenant_id, days=30)
    
    if activity.event_count == 0:
        # Inactive: move to cold storage
        await archive_tenant_to_s3(tenant_id)
        await delete_from_hot_storage(tenant_id)
    elif activity.event_count < 100:
        # Low activity: compress aggressively
        await compress_tenant_data(tenant_id)

Chapter 11: Mock Interview Dialogue

Scenario: Design Audit System for Banking App

Interviewer: "We're building a mobile banking app. Design the audit logging system."

You: "Great, let me start by understanding the requirements. For banking, we're definitely looking at regulatory compliance—SOX, possibly state banking regulations. Can you confirm the retention requirements?"

Interviewer: "Yes, SOX compliance. 7-year retention. And we need to track every financial transaction and every access to customer data."

You: "Perfect. So we need to audit two categories: financial transactions like transfers and payments, and data access like viewing account details or statements. Let me design a system that handles both.

First, the core principle: audit events must be captured atomically with the business operation. If a transfer succeeds, its audit record must exist. We can't have orphaned transactions.

For the architecture, I'll use the transactional outbox pattern we discussed earlier this week:

Mobile App ──▶ API Gateway ──▶ Banking Service
                                     │
                              ┌──────┴──────┐
                              │             │
                         Transaction    Audit Event
                              │             │
                              └──────┬──────┘
                                     │
                              Same DB Transaction
                                     │
                                     ▼
                              Outbox Table
                                     │
                               (async)
                                     ▼
                                  Kafka
                                     │
                                     ▼
                              Audit Consumer
                                     │
                              ┌──────┴──────┐
                              │             │
                         Hot Store    Cold Store
                        (TimescaleDB)  (S3 Parquet)

The key here is that the audit event goes into an outbox table in the same database transaction as the banking operation. If the transaction fails, the audit is rolled back too. If it succeeds, the audit is guaranteed to be published."

Interviewer: "How do you handle the scale? We expect millions of transactions per day."

You: "Good question. Let me break down the write path:

At peak, banking apps typically see most activity between 9 AM and 5 PM. Let's assume 10 million transactions per day with 80% in an 8-hour window. That's roughly 280 transactions per second average, with peaks maybe 3-5x that, so let's design for 1,500 TPS.

For the outbox publisher, I'd use CDC with Debezium rather than polling. It reads from the PostgreSQL WAL with very low latency—events reach Kafka within 100ms of commit.

Kafka would have 32 partitions, partitioned by customer ID to maintain per-customer ordering. This lets us scale consumers horizontally.

For storage, I'd use TimescaleDB for the hot tier—it's optimized for time-series data with automatic partitioning. Events older than 90 days get archived to S3 in Parquet format, queryable via Athena for investigations."

Interviewer: "What about data that shouldn't be in logs, like account numbers?"

You: "Excellent security concern. I'd implement tokenization for sensitive fields:

audit_event = {
    'actor': {
        'id': 'usr_123',
        'email': tokenize('alice@bank.com')  # Reversible token
    },
    'action': 'transfer.create',
    'resource': {
        'type': 'transfer',
        'id': 'txn_456'
    },
    'details': {
        'from_account': mask('****5678'),  # Last 4 only
        'to_account': mask('****9012'),
        'amount': 1000.00,  # Amount visible (needed for compliance)
        'currency': 'USD'
    }
}

The full account numbers are stored separately in a secure vault with additional access controls. The audit log contains enough to investigate but not enough to compromise customer accounts if the logs were exposed.

Access to the detokenization service is itself audited—so we'd know who looked up the real account numbers."

Interviewer: "How do you prove the logs haven't been tampered with?"

You: "This is critical for SOX compliance. I'd implement hash chaining:

Each event includes the hash of the previous event. To tamper with event #1000, you'd have to recompute the hashes of events #1001 through present—which we can detect by verifying the chain.

Additionally, once events move to cold storage in S3, I'd enable S3 Object Lock with governance mode. This makes the files write-once-read-many (WORM) for the retention period. Even administrators can't delete or modify them without special override permissions that are themselves logged.

For extra assurance, we can periodically checkpoint the latest hash to a public blockchain or trusted timestamping service—though that may be overkill for most banking apps."

Interviewer: "What if the audit system itself goes down?"

You: "The transactional outbox is the safety net. If Kafka is down, events accumulate in the outbox table. When Kafka recovers, the publisher catches up. We might have some latency in the audit pipeline, but no data loss.

If the database itself is down... well, the banking operations can't happen either, so there's nothing to audit. The audit system's availability is tied to the primary database's availability, which should be very high with proper replication.

For the consumer side, any processing failures go to a dead letter queue. We monitor DLQ depth and alert if it grows. Failed events are never dropped—they wait for investigation and replay.

The DLQ also protects against poison pills—malformed events that crash the consumer. We detect these via processing timeouts and quarantine them immediately."


Week 3 Summary: Messaging and Async Processing

Congratulations! You've completed Week 3. Let's consolidate what you've learned:

WEEK 3 COMPLETE PICTURE

Day 1: Queue vs Stream
━━━━━━━━━━━━━━━━━━━━━━
  Queues (RabbitMQ): Task distribution, once-consumed
  Streams (Kafka): Event log, multi-consumer, replay
  Choose based on consumption pattern, not popularity
  
Day 2: Transactional Outbox  
━━━━━━━━━━━━━━━━━━━━━━━━━━━
  Problem: Dual-write between DB and queue
  Solution: Write to outbox in same transaction
  Publisher reads outbox, sends to Kafka
  Never lose data, never have orphans
  
Day 3: Backpressure and Flow Control
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
  Detection: Lag, depth, latency metrics
  Response: Shed, limit, scale, spill
  Critical: Systems must be able to say "slow down"
  Prevents: Memory exhaustion, cascading failures
  
Day 4: Dead Letters and Poison Pills
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
  Poison pills crash consumers repeatedly
  DLQ quarantines failed messages
  Classification: Validation vs transient vs poison
  Never lose data, even on failure
  
Day 5: Audit Log System (Today)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
  Combined ALL patterns into complete system
  Outbox → Kafka → Consumer → Tiered Storage
  Immutable, tamper-evident, queryable
  Handles compliance requirements


HOW THEY CONNECT:

  User Action
       │
       ▼
  Business Logic + Audit Client
       │
       ▼ (Day 2: Transactional Outbox)
  Outbox Table
       │
       ▼ (Day 1: Kafka Stream)
  Kafka Topic
       │
       ▼ (Day 3: Backpressure Handling)
  Audit Consumer
       │
       ├──▶ Success: Storage
       │
       └──▶ Failure: (Day 4: DLQ)
                     Dead Letter Queue
                           │
                           ▼
                     Investigation + Replay


YOU CAN NOW:

✓ Design async systems with guaranteed delivery
✓ Handle failures gracefully without data loss  
✓ Scale to high throughput with backpressure
✓ Build compliant audit trails
✓ Operate production messaging systems

Summary

DAY 5 KEY TAKEAWAYS

CORE CONCEPT:
• Audit log: Who did what to which resource, when, from where
• Immutable, tamper-evident, queryable
• Combines all Week 3 patterns into production system

ARCHITECTURE:
• Transactional outbox for guaranteed capture
• Kafka for reliable transport
• Tiered storage: hot (days) → warm (months) → cold (years)
• Hash chaining for tamper evidence

IMPLEMENTATION:
• Audit client writes to outbox in same transaction
• CDC or polling publishes to Kafka
• Consumer writes to storage with backpressure handling
• DLQ for failed events

COMPLIANCE:
• SOX, HIPAA, GDPR have specific requirements
• Retention periods: years, not days
• Immutability is non-negotiable
• Access to logs is itself audited

INTERVIEW TIPS:
• Start with "What are the regulatory requirements?"
• Emphasize transactional outbox for guaranteed capture
• Discuss hash chaining for tamper evidence
• Show tiered storage for cost optimization

DEFAULT CHOICE:
• Transactional outbox + Kafka + TimescaleDB + S3
• Hash chaining enabled
• 90-day hot, 1-year warm, 7-year cold

📚 Further Reading

Official Documentation

Engineering Blogs

Compliance Resources

Books

  • "Designing Data-Intensive Applications" by Martin Kleppmann — Chapter 11
  • "Building Event-Driven Microservices" by Adam Bellemare
  • "Streaming Systems" by Tyler Akidau

🎉 Week 3 Complete!

You've finished the most intensive week so far. Take a moment to appreciate what you've built:

  • A mental model for async system design
  • Patterns for guaranteed message delivery
  • Strategies for handling system overload
  • A complete audit log system

Next Week Preview:

Week 4 — Caching: Beyond "Just Add Redis"

We'll dive into:

  • Cache invalidation (the hardest problem)
  • Thundering herd prevention
  • Multi-tier caching strategies
  • When NOT to cache

The patterns from this week will come up again—particularly around consistency and failure handling. Caching adds another layer of complexity that builds on everything you've learned.

Rest up. You've earned it.


End of Week 3, Day 5: Audit Log System