Week 3 — Day 5: Audit Log System
System Design Mastery Series
Preface
It's Friday. You've made it through one of the most intense weeks of this program.
Let's recap what you've learned:
WEEK 3 JOURNEY: Messaging and Async Processing
Day 1: Queue vs Stream
└── When to use RabbitMQ vs Kafka
└── Consumer groups, ordering, retention
└── "Choose based on consumption pattern, not popularity"
Day 2: Transactional Outbox
└── The dual-write problem
└── Atomic DB + message publishing
└── "Never write to two systems without coordination"
Day 3: Backpressure and Flow Control
└── Detection: lag, depth, latency
└── Response: shed, limit, scale, spill
└── "Systems must be able to say 'slow down'"
Day 4: Dead Letters and Poison Pills
└── Error classification and routing
└── Quarantine before crash loops
└── "Failed messages are data, not garbage"
Today, Day 5, we bring it ALL together.
Your final challenge: design a complete audit log system for a financial services company. This system must:
- Capture every user action (compliance requirement)
- Never lose an event (legal liability)
- Handle 50,000 events per second at peak
- Be queryable for investigations
- Retain data for 7 years
This isn't a toy example. It's the kind of system that, if it fails, results in regulatory fines, lawsuits, and people losing their jobs.
Let's build it right.
Part I: Foundations
Chapter 1: What Is an Audit Log?
1.1 The Simple Definition
An audit log (or audit trail) is a chronological record of activities that have affected a system. It answers: who did what to which resource, when, and from where.
EVERYDAY ANALOGY: The Security Camera
Think of a building's security system:
Every entrance has a camera recording 24/7.
Every badge swipe is logged.
Every door access is timestamped.
If something goes wrong:
"Show me who entered Room 401 last Tuesday at 3pm"
→ Camera footage + badge logs = answer
An audit log is the security camera for your software.
Every action is recorded.
Nothing is deleted.
Everything is queryable.
1.2 Why Audit Logs Matter
REGULATORY REQUIREMENTS
Financial Services (SOX, PCI-DSS):
- All access to financial data must be logged
- Logs must be retained for 7+ years
- Logs must be tamper-evident
Healthcare (HIPAA):
- All access to patient records must be logged
- Must track who viewed what, when
- Breach notification requires audit trail
General (GDPR, CCPA):
- Must track data access and modifications
- Right to access: "Show me everything you know about me"
- Right to deletion: "Prove you deleted my data"
THE COST OF FAILURE
No audit log:
- "We don't know who accessed the data"
- "We can't prove we were compliant"
- "We have no evidence for the investigation"
Result:
- Regulatory fines (millions of dollars)
- Legal liability (lawsuits)
- Reputation damage (customer trust)
- Criminal charges (in extreme cases)
1.3 Audit Log vs Application Log
These are different things:
| Aspect | Application Log | Audit Log |
|---|---|---|
| Purpose | Debugging, monitoring | Compliance, investigation |
| Audience | Engineers | Auditors, legal, security |
| Retention | Days to weeks | Years |
| Mutability | Can be deleted | Must be immutable |
| Content | Technical details | Business actions |
| Schema | Loose, varies | Strict, standardized |
APPLICATION LOG:
2024-01-15 10:30:45 INFO Processing request abc123
2024-01-15 10:30:46 DEBUG SQL: SELECT * FROM users WHERE id=456
2024-01-15 10:30:46 INFO Request completed in 142ms
AUDIT LOG:
{
"event_id": "evt_789xyz",
"timestamp": "2024-01-15T10:30:45.123Z",
"actor": {
"user_id": "usr_456",
"email": "alice@company.com",
"ip_address": "192.168.1.100",
"user_agent": "Mozilla/5.0..."
},
"action": "user.profile.update",
"resource": {
"type": "user_profile",
"id": "usr_789"
},
"changes": {
"email": {"old": "bob@old.com", "new": "bob@new.com"}
},
"result": "success",
"metadata": {
"request_id": "req_abc123",
"session_id": "sess_def456"
}
}
1.4 Key Terminology
| Term | Definition |
|---|---|
| Actor | The user or system that performed the action |
| Action | What was done (create, read, update, delete) |
| Resource | What was acted upon (user, account, document) |
| Immutability | Cannot be modified after creation |
| Tamper-evidence | Any modification is detectable |
| Retention | How long logs are kept |
| Chain of custody | Tracking who accessed the logs themselves |
Chapter 2: Audit Log Architecture
2.1 High-Level Design
Here's where everything from this week comes together:
COMPLETE AUDIT LOG ARCHITECTURE
┌─────────────────────────────────────────────────┐
│ Application Layer │
│ │
User Action ─────────▶│ API Server ──▶ Audit Client ──▶ Outbox │
│ │ (Day 2) │
│ │ │
└───────────────────────┼─────────────────────────┘
│
│ Transactional write
│ (same DB transaction as business logic)
▼
┌─────────────────────────────────────────────────┐
│ Outbox Publisher │
│ │
│ Polls outbox table ──▶ Publishes to Kafka │
│ (or CDC with Debezium) (Day 1) │
│ │
└───────────────────────┬─────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ Kafka: audit-events │
│ │
│ Partitioned by actor_id for ordering │
│ Retention: 7 days (before archive) │
│ Replication: 3 (durability) │
│ │
└──────────┬──────────────────────┬───────────────┘
│ │
┌──────────▼──────────┐ ┌────────▼────────┐
│ Audit Consumer │ │ Archive Writer │
│ │ │ │
│ Writes to DB │ │ Writes to S3 │
│ (hot storage) │ │ (cold storage) │
│ │ │ │
│ Backpressure ◀─────┤ │ (Day 3) │
│ handling (Day 3) │ │ │
│ │ │ │
│ DLQ if fails ──────┤ │ │
│ (Day 4) │ │ │
│ │ │ │
└──────────┬──────────┘ └─────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ Storage Layer │
│ │
│ Hot: PostgreSQL/TimescaleDB (90 days) │
│ Warm: ClickHouse (1 year) │
│ Cold: S3 + Parquet (7 years) │
│ │
└───────────────────────┬─────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ Query Layer │
│ │
│ API for investigators │
│ Dashboard for compliance │
│ Export for legal │
│ │
└─────────────────────────────────────────────────┘
2.2 Why This Architecture?
Let's trace how each day's learning applies:
DAY 1 — QUEUE VS STREAM: Why Kafka?
We need:
✓ Ordering per actor (see actions in sequence)
✓ Multiple consumers (DB writer, archiver, analytics)
✓ Replay capability (reprocess if bug found)
✓ High throughput (50K events/sec)
Kafka wins over RabbitMQ because:
- Log-based: events persist for replay
- Consumer groups: multiple independent consumers
- Partitioning: scale writes horizontally
- Ordering: per-partition ordering by actor_id
DAY 2 — TRANSACTIONAL OUTBOX: Guaranteed Capture
The nightmare scenario:
User deletes sensitive data
Delete succeeds in database
Audit event fails to publish
No record of the deletion!
Outbox pattern guarantees:
Business action + audit event in same transaction
Either both happen or neither happens
Publisher sends to Kafka asynchronously
DAY 3 — BACKPRESSURE: Handling Peak Load
Black Friday scenario:
Normal: 5,000 events/sec
Peak: 50,000 events/sec (10x!)
Without backpressure:
Consumers fall behind
Lag grows to hours
Memory exhaustion
Crash, data loss
With backpressure:
Detect lag growth
Batch aggressively
Shed non-critical (metrics, not audit!)
Scale consumers
Alert before crisis
DAY 4 — DEAD LETTERS: No Event Left Behind
Even with all precautions:
Some events will fail to process
Schema mismatch (old event, new consumer)
Storage temporarily unavailable
Unexpected data format
DLQ ensures:
Failed events captured, not lost
Investigation possible
Replay after fix
Compliance maintained
2.3 Design Principles
PRINCIPLE 1: WRITE FIRST, PROCESS LATER
Never block the user action on audit processing.
WRONG:
User clicks "delete"
→ Delete from DB
→ Write audit to Kafka (sync)
→ Wait for consumer to process
→ Return to user
If Kafka is slow, user waits. Bad UX.
RIGHT:
User clicks "delete"
→ Delete from DB + Write to outbox (same transaction)
→ Return to user immediately
→ Background: outbox → Kafka → consumer → storage
User sees instant response. Audit happens async.
PRINCIPLE 2: IMMUTABILITY IS NON-NEGOTIABLE
Once written, audit events must NEVER change.
Implementation:
- No UPDATE or DELETE on audit tables
- Append-only storage
- Cryptographic chaining (each event references previous hash)
- Write-once object storage (S3 Object Lock)
PRINCIPLE 3: DEFENSE IN DEPTH
Multiple layers of protection:
- Outbox: survives app crash
- Kafka replication: survives broker failure
- Consumer DLQ: survives processing errors
- Multi-storage: survives single storage failure
- Cross-region backup: survives datacenter failure
PRINCIPLE 4: QUERY PERFORMANCE MATTERS
"Find all actions by user X on resource Y in January 2023"
Must be fast, even with billions of events.
- Proper indexing on hot storage
- Partitioning by time and actor
- Pre-aggregated views for common queries
- Full-text search for investigation
Chapter 3: Trade-offs and Considerations
3.1 Storage Trade-offs
HOT STORAGE (0-90 days)
───────────────────────
Options: PostgreSQL, TimescaleDB, Cassandra
PostgreSQL/TimescaleDB:
✓ Rich querying (SQL)
✓ Familiar operations
✓ Good indexing
✗ Scaling writes is hard
✗ Expensive at scale
Cassandra:
✓ Massive write throughput
✓ Linear scalability
✗ Query limitations
✗ Operational complexity
Recommendation: TimescaleDB for most cases
- Time-series optimized
- Automatic partitioning
- Compression
- Still PostgreSQL compatible
WARM STORAGE (90 days - 1 year)
───────────────────────────────
Options: ClickHouse, BigQuery, Snowflake
ClickHouse:
✓ Extremely fast analytical queries
✓ Column-oriented compression
✓ Self-hosted option
✗ Operational burden
BigQuery/Snowflake:
✓ Managed service
✓ Scales automatically
✗ Vendor lock-in
✗ Cost at high volume
Recommendation: ClickHouse if you have ops capacity,
BigQuery/Snowflake if you want managed
COLD STORAGE (1-7 years)
────────────────────────
Options: S3 + Parquet, Glacier
S3 + Parquet:
✓ Extremely cheap
✓ Columnar format for analytics
✓ Query with Athena/Presto
✓ Object Lock for immutability
Glacier:
✓ Even cheaper
✗ Retrieval takes hours
✗ Only for true archive
Recommendation: S3 Intelligent-Tiering + Parquet
- Auto-moves to cheaper tiers
- Still queryable via Athena
3.2 Consistency vs Latency
THE AUDIT LATENCY QUESTION
How soon must an event be visible in the audit log?
Option A: Synchronous (immediate)
Write to audit DB in the request path
User waits for audit write
Latency: +50-200ms per request
Consistency: Immediate
Risk: Audit DB down = app down
Option B: Asynchronous (eventual)
Write to outbox, process async
User doesn't wait
Latency: +0ms to user, 1-60s visibility
Consistency: Eventual
Risk: Audit may lag during issues
Option C: Hybrid
Critical actions: sync
Regular actions: async
Most systems choose: Asynchronous
Rationale:
- Most investigations happen hours/days later
- 1 minute delay is acceptable
- User experience matters
- Audit DB issues shouldn't break app
3.3 Schema Design Trade-offs
STRUCTURED VS FLEXIBLE SCHEMA
Structured (strict schema):
{
"actor_id": "required string",
"action": "required enum",
"resource_type": "required enum",
"resource_id": "required string",
"timestamp": "required datetime",
...
}
✓ Consistent, easy to query
✓ Schema validation catches errors
✗ Hard to evolve
✗ Different actions need different fields
Flexible (schemaless):
{
"event_type": "string",
"data": { /* anything */ }
}
✓ Easy to add new event types
✓ Each event type has custom fields
✗ Hard to query consistently
✗ No validation, garbage in
RECOMMENDATION: Structured core + flexible extension
{
// Required core fields (strict)
"event_id": "evt_abc123",
"timestamp": "2024-01-15T10:30:00Z",
"actor": { "id": "usr_456", "type": "user" },
"action": "resource.update",
"resource": { "type": "document", "id": "doc_789" },
"result": "success",
// Flexible extension (varies by action)
"details": {
"changes": { "title": { "old": "Draft", "new": "Final" } },
"reason": "User requested update"
},
// Standard metadata (always present)
"context": {
"ip_address": "192.168.1.100",
"request_id": "req_xyz",
"session_id": "sess_abc"
}
}
Part II: Implementation
Chapter 4: Basic Implementation
4.1 Audit Event Schema
# Audit Event Schema
# The foundation of the entire system
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
import uuid
import json
class ActionResult(Enum):
SUCCESS = "success"
FAILURE = "failure"
DENIED = "denied" # Authorization failure
class ActorType(Enum):
USER = "user"
SERVICE = "service"
SYSTEM = "system" # Automated processes
@dataclass
class Actor:
"""Who performed the action."""
id: str
type: ActorType
email: Optional[str] = None
name: Optional[str] = None
ip_address: Optional[str] = None
user_agent: Optional[str] = None
def to_dict(self) -> Dict:
return {
"id": self.id,
"type": self.type.value,
"email": self.email,
"name": self.name,
"ip_address": self.ip_address,
"user_agent": self.user_agent
}
@dataclass
class Resource:
"""What was acted upon."""
type: str # e.g., "user", "document", "account"
id: str # Resource identifier
name: Optional[str] = None # Human-readable name
def to_dict(self) -> Dict:
return {
"type": self.type,
"id": self.id,
"name": self.name
}
@dataclass
class AuditEvent:
"""
A single audit log entry.
Captures: WHO did WHAT to WHICH RESOURCE, WHEN, and HOW.
"""
# Core required fields
actor: Actor
action: str # e.g., "document.create", "user.login"
resource: Resource
result: ActionResult
# Auto-generated
event_id: str = field(default_factory=lambda: f"evt_{uuid.uuid4().hex[:12]}")
timestamp: datetime = field(default_factory=datetime.utcnow)
# Optional details
changes: Optional[Dict[str, Any]] = None # For updates: old/new values
reason: Optional[str] = None # Why action was taken
error_message: Optional[str] = None # If result is failure
# Context
request_id: Optional[str] = None
session_id: Optional[str] = None
correlation_id: Optional[str] = None # For tracing across services
# For tamper evidence
previous_hash: Optional[str] = None # Hash of previous event
def to_dict(self) -> Dict:
return {
"event_id": self.event_id,
"timestamp": self.timestamp.isoformat() + "Z",
"actor": self.actor.to_dict(),
"action": self.action,
"resource": self.resource.to_dict(),
"result": self.result.value,
"changes": self.changes,
"reason": self.reason,
"error_message": self.error_message,
"context": {
"request_id": self.request_id,
"session_id": self.session_id,
"correlation_id": self.correlation_id
},
"integrity": {
"previous_hash": self.previous_hash
}
}
def to_json(self) -> str:
return json.dumps(self.to_dict(), default=str)
# Example usage
def example():
event = AuditEvent(
actor=Actor(
id="usr_456",
type=ActorType.USER,
email="alice@company.com",
ip_address="192.168.1.100"
),
action="document.update",
resource=Resource(
type="document",
id="doc_789",
name="Q4 Financial Report"
),
result=ActionResult.SUCCESS,
changes={
"title": {"old": "Draft Report", "new": "Q4 Financial Report"},
"status": {"old": "draft", "new": "published"}
},
request_id="req_abc123"
)
print(event.to_json())
4.2 Simple Audit Client
# Simple Audit Client
# WARNING: Not production-ready - for learning only
from typing import Optional
from contextlib import contextmanager
class SimpleAuditClient:
"""
Basic audit client that writes to outbox.
Application code uses this to log audit events.
"""
def __init__(self, db_session):
self.db = db_session
def log(self, event: AuditEvent) -> None:
"""
Log an audit event.
Writes to outbox table in the current transaction.
"""
self.db.execute(
"""
INSERT INTO audit_outbox (
event_id,
event_type,
payload,
created_at
) VALUES (%s, %s, %s, %s)
""",
(
event.event_id,
event.action,
event.to_json(),
event.timestamp
)
)
@contextmanager
def capture(
self,
actor: Actor,
action: str,
resource: Resource,
request_id: Optional[str] = None
):
"""
Context manager that automatically logs success or failure.
Usage:
with audit.capture(actor, "document.delete", resource) as event:
delete_document(doc_id)
event.changes = {"deleted": True}
"""
event = AuditEvent(
actor=actor,
action=action,
resource=resource,
result=ActionResult.SUCCESS, # Optimistic
request_id=request_id
)
try:
yield event
# If we get here, action succeeded
event.result = ActionResult.SUCCESS
except PermissionError as e:
event.result = ActionResult.DENIED
event.error_message = str(e)
raise
except Exception as e:
event.result = ActionResult.FAILURE
event.error_message = str(e)
raise
finally:
# Always log, regardless of outcome
self.log(event)
# Usage in application code
async def delete_document(doc_id: str, user: User, audit: SimpleAuditClient):
"""Example: deleting a document with audit logging."""
# Get document for audit trail
doc = await db.get_document(doc_id)
actor = Actor(
id=user.id,
type=ActorType.USER,
email=user.email
)
resource = Resource(
type="document",
id=doc_id,
name=doc.title
)
with audit.capture(actor, "document.delete", resource) as event:
# The actual business logic
await db.delete_document(doc_id)
# Capture what was deleted
event.changes = {
"title": doc.title,
"owner": doc.owner_id,
"size_bytes": doc.size
}
# If we get here, both delete and audit succeeded
return {"status": "deleted"}
4.3 Database Schema
-- Audit Outbox Table (for transactional outbox pattern)
CREATE TABLE audit_outbox (
id BIGSERIAL PRIMARY KEY,
event_id VARCHAR(64) NOT NULL UNIQUE,
event_type VARCHAR(128) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
-- For ordering and deduplication
CONSTRAINT audit_outbox_event_unique UNIQUE (event_id)
);
CREATE INDEX idx_audit_outbox_unpublished
ON audit_outbox (created_at)
WHERE published_at IS NULL;
-- Hot Storage: Recent audit events (90 days)
-- Using TimescaleDB for time-series optimization
CREATE TABLE audit_events (
event_id VARCHAR(64) PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
-- Actor
actor_id VARCHAR(64) NOT NULL,
actor_type VARCHAR(32) NOT NULL,
actor_email VARCHAR(256),
actor_ip VARCHAR(45),
-- Action
action VARCHAR(128) NOT NULL,
result VARCHAR(32) NOT NULL,
-- Resource
resource_type VARCHAR(64) NOT NULL,
resource_id VARCHAR(64) NOT NULL,
resource_name VARCHAR(256),
-- Details (flexible)
changes JSONB,
error_message TEXT,
-- Context
request_id VARCHAR(64),
session_id VARCHAR(64),
correlation_id VARCHAR(64),
-- Integrity
previous_hash VARCHAR(64),
event_hash VARCHAR(64),
-- Full event for archive
raw_event JSONB NOT NULL
);
-- Convert to TimescaleDB hypertable
SELECT create_hypertable('audit_events', 'timestamp');
-- Indexes for common queries
CREATE INDEX idx_audit_events_actor ON audit_events (actor_id, timestamp DESC);
CREATE INDEX idx_audit_events_resource ON audit_events (resource_type, resource_id, timestamp DESC);
CREATE INDEX idx_audit_events_action ON audit_events (action, timestamp DESC);
-- Composite index for investigation queries
CREATE INDEX idx_audit_events_investigation
ON audit_events (actor_id, resource_type, action, timestamp DESC);
-- DLQ for failed audit events
CREATE TABLE audit_dlq (
id BIGSERIAL PRIMARY KEY,
event_id VARCHAR(64) NOT NULL,
original_payload JSONB NOT NULL,
error_type VARCHAR(128) NOT NULL,
error_message TEXT,
stack_trace TEXT,
retry_count INT NOT NULL DEFAULT 0,
first_failed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_failed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
resolved_at TIMESTAMPTZ,
resolution VARCHAR(32), -- 'replayed', 'discarded', 'manual'
CONSTRAINT audit_dlq_event_unique UNIQUE (event_id)
);
CREATE INDEX idx_audit_dlq_unresolved
ON audit_dlq (first_failed_at)
WHERE resolved_at IS NULL;
Chapter 5: Production-Ready Implementation
5.1 Requirements Recap
- Zero event loss — Every action must be logged
- High throughput — 50,000 events/sec at peak
- Low latency — Don't slow down user actions
- Immutability — Events cannot be modified
- Queryability — Fast investigation queries
- Long retention — 7 years of data
- Tamper evidence — Detect any modifications
5.2 Complete Production Implementation
# Production Audit Log System
# Combines all Week 3 patterns
import asyncio
import hashlib
import json
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
from enum import Enum
import logging
logger = logging.getLogger(__name__)
# =============================================================================
# Configuration
# =============================================================================
@dataclass
class AuditConfig:
"""Configuration for the audit system."""
# Kafka
kafka_topic: str = "audit-events"
kafka_partitions: int = 32
kafka_replication: int = 3
# Outbox
outbox_poll_interval: float = 0.1 # 100ms
outbox_batch_size: int = 1000
# Consumer
consumer_batch_size: int = 500
consumer_batch_timeout: float = 1.0
# Backpressure
lag_warning_threshold: int = 10_000
lag_critical_threshold: int = 100_000
# DLQ
max_retries: int = 3
# Integrity
enable_hash_chain: bool = True
# =============================================================================
# Audit Client (Application-Side)
# =============================================================================
class AuditClient:
"""
Production audit client with transactional outbox.
Usage in application code:
async with db.transaction():
await business_logic()
await audit.log(event) # Same transaction!
"""
def __init__(self, db_pool, config: AuditConfig):
self.db = db_pool
self.config = config
self._actor_context: Dict[str, Actor] = {} # Per-request context
def set_context(self, request_id: str, actor: Actor):
"""Set actor context for current request (call in middleware)."""
self._actor_context[request_id] = actor
def clear_context(self, request_id: str):
"""Clear context after request completes."""
self._actor_context.pop(request_id, None)
def get_actor(self, request_id: str) -> Optional[Actor]:
"""Get actor for current request."""
return self._actor_context.get(request_id)
async def log(
self,
action: str,
resource: Resource,
result: ActionResult = ActionResult.SUCCESS,
changes: Optional[Dict] = None,
reason: Optional[str] = None,
error_message: Optional[str] = None,
request_id: Optional[str] = None,
actor: Optional[Actor] = None # Override context
) -> str:
"""
Log an audit event to the outbox.
MUST be called within a database transaction to ensure
atomicity with the business operation.
Returns: event_id
"""
# Get actor from context if not provided
if actor is None and request_id:
actor = self.get_actor(request_id)
if actor is None:
raise ValueError("Actor must be provided or set in context")
event = AuditEvent(
actor=actor,
action=action,
resource=resource,
result=result,
changes=changes,
reason=reason,
error_message=error_message,
request_id=request_id
)
# Write to outbox
await self.db.execute(
"""
INSERT INTO audit_outbox (event_id, event_type, payload, created_at)
VALUES ($1, $2, $3, $4)
""",
event.event_id,
event.action,
event.to_json(),
event.timestamp
)
logger.debug(f"Audit event logged to outbox: {event.event_id}")
return event.event_id
async def log_success(
self,
action: str,
resource: Resource,
changes: Optional[Dict] = None,
**kwargs
) -> str:
"""Convenience method for successful actions."""
return await self.log(
action=action,
resource=resource,
result=ActionResult.SUCCESS,
changes=changes,
**kwargs
)
async def log_failure(
self,
action: str,
resource: Resource,
error_message: str,
**kwargs
) -> str:
"""Convenience method for failed actions."""
return await self.log(
action=action,
resource=resource,
result=ActionResult.FAILURE,
error_message=error_message,
**kwargs
)
# =============================================================================
# Outbox Publisher (Background Process)
# =============================================================================
class OutboxPublisher:
"""
Publishes audit events from outbox to Kafka.
Day 2 pattern: Transactional Outbox
"""
def __init__(
self,
db_pool,
kafka_producer,
config: AuditConfig
):
self.db = db_pool
self.kafka = kafka_producer
self.config = config
self._running = False
# Metrics
self.events_published = 0
self.publish_errors = 0
async def start(self):
"""Start the publisher loop."""
self._running = True
logger.info("Outbox publisher started")
while self._running:
try:
count = await self._publish_batch()
if count == 0:
await asyncio.sleep(self.config.outbox_poll_interval)
except Exception as e:
logger.error(f"Publisher error: {e}")
self.publish_errors += 1
await asyncio.sleep(1) # Back off on error
async def stop(self):
"""Stop the publisher."""
self._running = False
logger.info(f"Outbox publisher stopped. Published: {self.events_published}")
async def _publish_batch(self) -> int:
"""Publish a batch of events from outbox."""
# Fetch unpublished events
rows = await self.db.fetch(
"""
SELECT id, event_id, event_type, payload
FROM audit_outbox
WHERE published_at IS NULL
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED
""",
self.config.outbox_batch_size
)
if not rows:
return 0
# Publish to Kafka
published_ids = []
for row in rows:
try:
# Partition by actor_id for ordering
payload = json.loads(row['payload'])
actor_id = payload.get('actor', {}).get('id', '')
partition_key = actor_id.encode('utf-8')
await self.kafka.send_and_wait(
self.config.kafka_topic,
key=partition_key,
value=row['payload'].encode('utf-8')
)
published_ids.append(row['id'])
self.events_published += 1
except Exception as e:
logger.error(f"Failed to publish event {row['event_id']}: {e}")
self.publish_errors += 1
# Mark as published
if published_ids:
await self.db.execute(
"""
UPDATE audit_outbox
SET published_at = NOW()
WHERE id = ANY($1)
""",
published_ids
)
return len(published_ids)
# =============================================================================
# Audit Consumer (Writes to Storage)
# =============================================================================
class AuditConsumer:
"""
Consumes audit events from Kafka and writes to storage.
Day 3 pattern: Backpressure handling
Day 4 pattern: Dead letter queue
"""
def __init__(
self,
kafka_consumer,
db_pool,
dlq_handler,
config: AuditConfig,
metrics_client
):
self.kafka = kafka_consumer
self.db = db_pool
self.dlq = dlq_handler
self.config = config
self.metrics = metrics_client
self._running = False
self._last_hash: Optional[str] = None
# Metrics
self.events_processed = 0
self.events_failed = 0
async def start(self):
"""Start consuming."""
self._running = True
logger.info("Audit consumer started")
# Load last hash for chain integrity
if self.config.enable_hash_chain:
self._last_hash = await self._get_last_hash()
batch = []
batch_start = datetime.utcnow()
async for msg in self.kafka:
if not self._running:
break
batch.append(msg)
# Process batch if full or timeout
should_flush = (
len(batch) >= self.config.consumer_batch_size or
(datetime.utcnow() - batch_start).total_seconds() > self.config.consumer_batch_timeout
)
if should_flush and batch:
await self._process_batch(batch)
batch = []
batch_start = datetime.utcnow()
async def _process_batch(self, messages: List) -> None:
"""Process a batch of messages."""
events_to_insert = []
for msg in messages:
try:
event_dict = json.loads(msg.value.decode('utf-8'))
# Add hash chain
if self.config.enable_hash_chain:
event_dict['integrity']['previous_hash'] = self._last_hash
event_hash = self._compute_hash(event_dict)
event_dict['integrity']['event_hash'] = event_hash
self._last_hash = event_hash
events_to_insert.append(event_dict)
except Exception as e:
logger.error(f"Failed to process event: {e}")
await self.dlq.handle_failure(msg, e)
self.events_failed += 1
# Batch insert
if events_to_insert:
try:
await self._batch_insert(events_to_insert)
self.events_processed += len(events_to_insert)
# Commit offsets
await self.kafka.commit()
except Exception as e:
logger.error(f"Batch insert failed: {e}")
# Send all to DLQ
for msg in messages:
await self.dlq.handle_failure(msg, e)
self.events_failed += len(messages)
async def _batch_insert(self, events: List[Dict]) -> None:
"""Insert batch of events to database."""
# Build batch insert
values = []
for e in events:
values.append((
e['event_id'],
datetime.fromisoformat(e['timestamp'].rstrip('Z')),
e['actor']['id'],
e['actor']['type'],
e['actor'].get('email'),
e['actor'].get('ip_address'),
e['action'],
e['result'],
e['resource']['type'],
e['resource']['id'],
e['resource'].get('name'),
json.dumps(e.get('changes')) if e.get('changes') else None,
e.get('error_message'),
e['context'].get('request_id'),
e['context'].get('session_id'),
e['context'].get('correlation_id'),
e['integrity'].get('previous_hash'),
e['integrity'].get('event_hash'),
json.dumps(e)
))
await self.db.executemany(
"""
INSERT INTO audit_events (
event_id, timestamp, actor_id, actor_type, actor_email, actor_ip,
action, result, resource_type, resource_id, resource_name,
changes, error_message, request_id, session_id, correlation_id,
previous_hash, event_hash, raw_event
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
ON CONFLICT (event_id) DO NOTHING
""",
values
)
def _compute_hash(self, event: Dict) -> str:
"""Compute hash for tamper evidence."""
# Canonical JSON for consistent hashing
canonical = json.dumps(event, sort_keys=True, separators=(',', ':'))
return hashlib.sha256(canonical.encode()).hexdigest()
async def _get_last_hash(self) -> Optional[str]:
"""Get hash of last event for chain integrity."""
row = await self.db.fetchrow(
"""
SELECT event_hash
FROM audit_events
ORDER BY timestamp DESC
LIMIT 1
"""
)
return row['event_hash'] if row else None
# =============================================================================
# Archive Writer (Cold Storage)
# =============================================================================
class ArchiveWriter:
"""
Archives old audit events to S3 in Parquet format.
Runs as a daily job.
"""
def __init__(self, db_pool, s3_client, config: AuditConfig):
self.db = db_pool
self.s3 = s3_client
self.config = config
async def archive_day(self, date: datetime) -> Dict[str, Any]:
"""Archive all events from a specific day."""
import pyarrow as pa
import pyarrow.parquet as pq
# Fetch events for the day
start = date.replace(hour=0, minute=0, second=0, microsecond=0)
end = start + timedelta(days=1)
rows = await self.db.fetch(
"""
SELECT raw_event
FROM audit_events
WHERE timestamp >= $1 AND timestamp < $2
ORDER BY timestamp
""",
start, end
)
if not rows:
return {"date": date.isoformat(), "events": 0, "status": "empty"}
# Convert to Parquet
events = [json.loads(row['raw_event']) for row in rows]
table = pa.Table.from_pylist(events)
# Write to S3
key = f"audit-archive/{date.year}/{date.month:02d}/{date.day:02d}/events.parquet"
buffer = pa.BufferOutputStream()
pq.write_table(table, buffer, compression='snappy')
await self.s3.put_object(
Bucket='audit-archive',
Key=key,
Body=buffer.getvalue().to_pybytes(),
# Object Lock for immutability
ObjectLockMode='GOVERNANCE',
ObjectLockRetainUntilDate=datetime.utcnow() + timedelta(days=2555) # 7 years
)
logger.info(f"Archived {len(events)} events for {date.date()}")
return {
"date": date.isoformat(),
"events": len(events),
"s3_key": key,
"status": "archived"
}
# =============================================================================
# Query Service (Investigation)
# =============================================================================
class AuditQueryService:
"""
Query interface for audit log investigation.
"""
def __init__(self, db_pool, s3_client):
self.db = db_pool
self.s3 = s3_client
async def search(
self,
actor_id: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
action: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 100,
offset: int = 0
) -> List[Dict]:
"""
Search audit events with filters.
"""
conditions = ["1=1"]
params = []
param_idx = 1
if actor_id:
conditions.append(f"actor_id = ${param_idx}")
params.append(actor_id)
param_idx += 1
if resource_type:
conditions.append(f"resource_type = ${param_idx}")
params.append(resource_type)
param_idx += 1
if resource_id:
conditions.append(f"resource_id = ${param_idx}")
params.append(resource_id)
param_idx += 1
if action:
conditions.append(f"action LIKE ${param_idx}")
params.append(f"{action}%")
param_idx += 1
if start_time:
conditions.append(f"timestamp >= ${param_idx}")
params.append(start_time)
param_idx += 1
if end_time:
conditions.append(f"timestamp <= ${param_idx}")
params.append(end_time)
param_idx += 1
params.extend([limit, offset])
query = f"""
SELECT raw_event
FROM audit_events
WHERE {' AND '.join(conditions)}
ORDER BY timestamp DESC
LIMIT ${param_idx} OFFSET ${param_idx + 1}
"""
rows = await self.db.fetch(query, *params)
return [json.loads(row['raw_event']) for row in rows]
async def get_actor_activity(
self,
actor_id: str,
days: int = 30
) -> List[Dict]:
"""Get all activity for a specific actor."""
start_time = datetime.utcnow() - timedelta(days=days)
return await self.search(actor_id=actor_id, start_time=start_time, limit=1000)
async def get_resource_history(
self,
resource_type: str,
resource_id: str
) -> List[Dict]:
"""Get complete history of a resource."""
return await self.search(
resource_type=resource_type,
resource_id=resource_id,
limit=1000
)
async def verify_integrity(
self,
start_time: datetime,
end_time: datetime
) -> Dict[str, Any]:
"""
Verify hash chain integrity for a time range.
Returns any gaps or inconsistencies found.
"""
rows = await self.db.fetch(
"""
SELECT event_id, timestamp, previous_hash, event_hash, raw_event
FROM audit_events
WHERE timestamp >= $1 AND timestamp <= $2
ORDER BY timestamp
""",
start_time, end_time
)
issues = []
prev_hash = None
for row in rows:
# Check chain continuity
if prev_hash and row['previous_hash'] != prev_hash:
issues.append({
"type": "chain_break",
"event_id": row['event_id'],
"expected_prev": prev_hash,
"actual_prev": row['previous_hash']
})
# Verify hash
event = json.loads(row['raw_event'])
event['integrity']['previous_hash'] = row['previous_hash']
event['integrity']['event_hash'] = None # Exclude from hash
canonical = json.dumps(event, sort_keys=True, separators=(',', ':'))
computed_hash = hashlib.sha256(canonical.encode()).hexdigest()
if computed_hash != row['event_hash']:
issues.append({
"type": "hash_mismatch",
"event_id": row['event_id'],
"expected_hash": computed_hash,
"actual_hash": row['event_hash']
})
prev_hash = row['event_hash']
return {
"verified_events": len(rows),
"issues_found": len(issues),
"issues": issues,
"integrity": "valid" if not issues else "compromised"
}
Chapter 6: Edge Cases and Error Handling
6.1 Edge Case 1: Outbox Table Growing Too Large
SCENARIO: Kafka is down for 2 hours, outbox fills up
Timeline:
10:00 - Kafka goes down
10:00 - Outbox publisher can't publish
10:00 - Outbox starts growing
12:00 - Outbox has 10M rows
12:00 - Queries on outbox slow down
12:00 - Application performance degrades
SOLUTION: Outbox management
1. PARTITION BY TIME
CREATE TABLE audit_outbox (...)
PARTITION BY RANGE (created_at);
-- Daily partitions
CREATE TABLE audit_outbox_2024_01_15
PARTITION OF audit_outbox
FOR VALUES FROM ('2024-01-15') TO ('2024-01-16');
2. CLEANUP PUBLISHED EVENTS
DELETE FROM audit_outbox
WHERE published_at < NOW() - INTERVAL '1 hour';
-- Or move to archive
INSERT INTO audit_outbox_archive
SELECT * FROM audit_outbox
WHERE published_at < NOW() - INTERVAL '1 hour';
3. ALERT ON GROWTH
alert:
name: "Outbox Growing"
condition: count(audit_outbox WHERE published_at IS NULL) > 100000
action: page_oncall
4. CIRCUIT BREAKER
If outbox > 1M unpublished:
- Alert critical
- Consider pausing non-critical auditing
- Never pause critical (auth, payments)
6.2 Edge Case 2: Consumer Falls Far Behind
SCENARIO: Consumer lag grows to hours
This is Day 3 backpressure in action!
Detection:
- Consumer lag > 100,000 events
- Lag growing > 1,000 events/sec
Response:
1. SCALE CONSUMERS
# Kafka allows consumers <= partitions
# If 32 partitions, can have up to 32 consumers
kubectl scale deployment audit-consumer --replicas=16
2. INCREASE BATCH SIZE
# Process more per batch
config.consumer_batch_size = 1000 # up from 500
3. PARALLEL INSERTS
# Multiple connections to database
await asyncio.gather(
self._batch_insert(batch1),
self._batch_insert(batch2),
self._batch_insert(batch3)
)
4. ALERT BUT DON'T SHED
# Unlike other systems, audit events can't be dropped
# Accept temporary latency, never lose data
if lag > critical_threshold:
alert("Audit lag critical - needs attention")
# But keep processing, don't shed!
6.3 Edge Case 3: Hash Chain Broken
SCENARIO: Integrity verification finds chain break
Detection:
Event #1000: hash = "abc123"
Event #1001: previous_hash = "xyz789" ← Should be "abc123"!
Possible causes:
1. Events processed out of order
2. Consumer crash between events
3. Tampering (rare but serious)
4. Bug in hash computation
INVESTIGATION STEPS:
1. CHECK FOR MISSING EVENTS
SELECT event_id FROM audit_events
WHERE timestamp BETWEEN '2024-01-15 10:00' AND '2024-01-15 10:05'
ORDER BY timestamp;
-- Look for gaps in sequence
2. CHECK DLQ
SELECT * FROM audit_dlq
WHERE first_failed_at BETWEEN '2024-01-15 10:00' AND '2024-01-15 10:05';
-- Missing event might be in DLQ
3. CHECK KAFKA
kafka-console-consumer --topic audit-events \
--partition 0 --offset 1000 --max-messages 10
-- See what was actually in Kafka
4. IF TAMPERING SUSPECTED
- Preserve evidence
- Check database access logs
- Involve security team
- Do not "fix" the data
PREVENTION:
1. SINGLE WRITER PER PARTITION
Only one consumer per partition = guaranteed ordering
2. TRANSACTIONAL COMMIT
Insert + offset commit in same transaction
3. CHAIN RECOVERY PROCEDURE
If chain broken, add "chain_recovery" event
Documents the break, restarts chain
6.4 Edge Case 4: Clock Skew Issues
SCENARIO: Events arrive with out-of-order timestamps
Server A clock: 10:00:00
Server B clock: 10:00:05 (5 seconds ahead)
Event from Server A: timestamp 10:00:02
Event from Server B: timestamp 10:00:07
Event from Server A: timestamp 10:00:03
Stored order: 10:00:02, 10:00:03, 10:00:07
Actual order: 10:00:02, 10:00:03, 10:00:02 (B thinks it's 10:00:07)
SOLUTIONS:
1. USE INGESTION TIME, NOT EVENT TIME
Consumer adds received_at timestamp
Sort by received_at for investigation
Keep original timestamp for reference
2. NTP SYNC ALL SERVERS
Ensure clock drift < 100ms
Monitor clock sync as infrastructure metric
3. INCLUDE BOTH TIMESTAMPS
{
"event_time": "2024-01-15T10:00:07Z", // From producer
"ingested_at": "2024-01-15T10:00:02Z", // From consumer
"time_drift_ms": 5000 // Calculated
}
4. ALERT ON DRIFT
if abs(event_time - ingested_at) > 1000ms:
alert("Clock drift detected on {server}")
6.5 Error Handling Matrix
| Error | Impact | Response | Prevention |
|---|---|---|---|
| Outbox write fails | Event lost! | Rollback entire transaction | None (by design) |
| Kafka down | Events queue in outbox | Wait and retry | Multi-broker setup |
| Consumer crash | Temporary lag | Auto-restart, resume | Health checks |
| DB insert fails | Event goes to DLQ | DLQ + alert | Retry with backoff |
| Hash mismatch | Integrity concern | Alert, investigate | Single writer |
| Clock skew | Order confusion | Use ingestion time | NTP monitoring |
| Storage full | Can't write | Alert, add capacity | Capacity planning |
Part III: Real-World Application
Chapter 7: How Big Tech Does It
7.1 Case Study: Stripe — Payment Audit Trail
STRIPE AUDIT REQUIREMENTS
Regulatory:
- PCI-DSS: Log all access to cardholder data
- SOX: Track all financial transactions
- GDPR: Record data access and modifications
Scale:
- Millions of API calls per day
- Every call potentially audited
- 7+ years retention
Architecture:
API Request ──▶ API Server
│
┌────┴────┐
│ │
Business Audit
Logic Client
│ │
▼ ▼
Primary Outbox
DB │
│ Same transaction
▼
Outbox Publisher
│
▼
Kafka
│
┌───────────┼───────────┐
│ │ │
Primary Analytics Compliance
Storage Team Export
│
▼
PostgreSQL (hot)
│
▼ (90 days)
BigQuery (warm)
│
▼ (1 year)
GCS Parquet (cold)
Key Decisions:
1. AUDIT HAPPENS IN REQUEST PATH
Not background - too important to lose
But: outbox pattern, so async to Kafka
2. FIELD-LEVEL TRACKING
Every field change recorded:
{
"changes": {
"amount": {"old": 1000, "new": 1500},
"currency": {"old": "USD", "new": "USD"}
}
}
3. REQUEST CORRELATION
Every audit event linked to API request
Can reconstruct entire request flow
4. PII HANDLING
Sensitive fields tokenized or encrypted
Separate access controls for PII audit data
Reference: Stripe Engineering Blog
7.2 Case Study: Netflix — Content Access Audit
NETFLIX CONTENT AUDIT
Requirements:
- Track who accessed which content
- License compliance (who watched what, where)
- Internal access to content systems
Scale:
- Billions of streaming events per day
- Not all need full audit (sampling for analytics)
- But: Content system access = full audit
Architecture:
Internal User ──▶ Content Admin
│
Audit captured
(every action)
│
▼
Streaming ──────▶ Kafka ◀──── Other Services
(sampled) │
▼
Consumer
│
┌───────────┼───────────┐
│ │ │
Elasticsearch S3 License
(search) (archive) Reporting
Key Decisions:
1. TIERED AUDITING
- Internal admin access: 100% captured
- Streaming views: Sampled for analytics
- API calls: Based on sensitivity level
2. SEARCH-FIRST DESIGN
Elasticsearch primary for investigation
"Who accessed title X in region Y last week?"
3. SAMPLING FOR SCALE
Can't audit every stream start event (billions/day)
Sample 1% for analytics, 100% for sensitive content
4. GEO-PARTITIONING
Audit data stays in region (GDPR, licensing)
EU data in EU storage, etc.
7.3 Case Study: Healthcare — HIPAA Audit
HEALTHCARE AUDIT (HIPAA)
Requirements:
- Every access to patient records logged
- "Break the glass" tracking
- Access by role and relationship
- 6-year minimum retention
Special Challenges:
1. RELATIONSHIP-BASED ACCESS
Dr. Smith can see patients assigned to them
Nurse Jones can see patients on their floor
Must audit: Who accessed, their role, their relationship
2. BREAK THE GLASS
Emergency override of normal access controls
Must be logged with extra detail:
{
"action": "patient.record.view",
"access_type": "emergency_override",
"justification": "Patient in ER, unresponsive",
"witness": "charge_nurse_456"
}
3. MINIMUM NECESSARY
Auditors check: Did user access more than needed?
Must log exactly which fields viewed, not just "viewed record"
4. PATIENT ACCESS RIGHTS
Patients can request audit log of who viewed their data
Must be able to generate per-patient report
Architecture:
EHR System ──▶ Audit Service ──▶ WORM Storage
│ │
│ Write-Once
│ Read-Many
│ (Tamper-proof)
│
▼
Compliance
Reporting
│
├── Daily access reports
├── Break-glass review
└── Patient data requests
7.4 Summary: Industry Patterns
| Company | Focus | Key Pattern | Unique Challenge |
|---|---|---|---|
| Stripe | Payments | Field-level tracking | PCI-DSS compliance |
| Netflix | Content | Tiered auditing | Sampling at scale |
| Healthcare | HIPAA | Break-the-glass | Relationship-based access |
| Banks | SOX | Dual-control logging | Every transaction |
| Government | FISMA | Chain of custody | Tamper evidence |
Chapter 8: Common Mistakes to Avoid
8.1 Mistake 1: Auditing After Business Logic
❌ WRONG: Separate transactions
async def transfer_money(from_acct, to_acct, amount):
# Business logic
async with db.transaction():
await db.debit(from_acct, amount)
await db.credit(to_acct, amount)
# Audit separately (WRONG!)
await audit.log(...) # What if this fails?
Problem:
Money moved, but no audit trail
Compliance violation
Investigation impossible
✅ CORRECT: Same transaction
async def transfer_money(from_acct, to_acct, amount):
async with db.transaction():
# Business logic
await db.debit(from_acct, amount)
await db.credit(to_acct, amount)
# Audit in same transaction
await audit.log(
action="account.transfer",
resource=Resource(type="transfer", id=transfer_id),
changes={
"from_account": from_acct,
"to_account": to_acct,
"amount": amount
}
)
If audit fails, entire transfer rolls back.
Atomicity guaranteed.
8.2 Mistake 2: Mutable Audit Logs
❌ WRONG: UPDATE and DELETE allowed
-- "Fixing" an audit record
UPDATE audit_events
SET actor_email = 'correct@email.com'
WHERE event_id = 'evt_123';
-- "Cleaning up" old events
DELETE FROM audit_events
WHERE timestamp < '2020-01-01';
Problems:
- Tampering possible
- Chain of custody broken
- Legal evidence inadmissible
✅ CORRECT: Append-only with corrections
-- Never UPDATE original record
-- Instead, add correction event:
INSERT INTO audit_events (...) VALUES (
'evt_456', -- New event ID
NOW(),
'system', -- Actor: system or admin
'audit.correction', -- Action type
'audit_event', -- Resource type
'evt_123', -- Reference to original
'{"field": "actor_email", "old": "wrong@email.com", "new": "correct@email.com"}'
);
-- Original preserved, correction linked
-- Both visible in audit trail
8.3 Mistake 3: Insufficient Context
❌ WRONG: Minimal information
{
"action": "delete",
"resource_id": "doc_123",
"timestamp": "2024-01-15T10:00:00Z"
}
Problems:
- Who deleted it?
- What was deleted?
- Why was it deleted?
- From where?
✅ CORRECT: Full context
{
"event_id": "evt_789",
"timestamp": "2024-01-15T10:00:00.123Z",
"actor": {
"id": "usr_456",
"email": "alice@company.com",
"role": "admin",
"ip_address": "192.168.1.100",
"user_agent": "Mozilla/5.0..."
},
"action": "document.delete",
"resource": {
"type": "document",
"id": "doc_123",
"name": "Q4 Financial Report.pdf"
},
"changes": {
"title": "Q4 Financial Report.pdf",
"size_bytes": 1048576,
"owner": "usr_789",
"created_at": "2024-01-01T08:00:00Z"
},
"context": {
"request_id": "req_abc",
"session_id": "sess_def",
"justification": "Document superseded by updated version"
}
}
Every question answerable from the event itself.
8.4 Mistake 4: No Query Optimization
❌ WRONG: Querying without indexes
-- "Find all actions by user X"
SELECT * FROM audit_events
WHERE raw_event->>'actor'->>'id' = 'usr_456';
-- Full table scan on billions of rows
-- Query takes hours, times out
✅ CORRECT: Denormalized columns with indexes
-- Extract commonly queried fields to columns
CREATE TABLE audit_events (
...
actor_id VARCHAR(64) NOT NULL, -- Extracted
resource_type VARCHAR(64) NOT NULL, -- Extracted
...
);
CREATE INDEX idx_audit_actor ON audit_events (actor_id, timestamp DESC);
CREATE INDEX idx_audit_resource ON audit_events (resource_type, resource_id);
-- Query uses index
SELECT * FROM audit_events
WHERE actor_id = 'usr_456'
ORDER BY timestamp DESC
LIMIT 100;
-- Returns in milliseconds
8.5 Mistake Checklist
Before deploying audit system, verify:
- Same transaction — Audit in same transaction as business logic
- Immutable storage — No UPDATE or DELETE on audit tables
- Full context — Who, what, when, where, why captured
- Query performance — Indexes for common access patterns
- Hash chain — Tamper evidence for compliance
- DLQ handling — Failed events captured, not lost
- Retention policy — Compliant with regulations
- Backup strategy — Survives datacenter failure
- Access controls — Audit log access is itself audited
Part IV: Interview Preparation
Chapter 9: Interview Tips and Phrases
9.1 When to Bring Up Audit Logging
Bring up audit logging when:
- Designing systems with compliance requirements (finance, health)
- User data is being accessed or modified
- Actions need to be traceable for investigation
- System handles sensitive operations
- Interviewer asks about security or compliance
9.2 Key Phrases to Use
INTRODUCING AUDIT REQUIREMENTS:
"Before diving into the design, I want to clarify the audit
requirements. For a financial system like this, we'll need
to track every action for compliance—who did what, when,
and the system must be able to prove it wasn't tampered with."
EXPLAINING THE ARCHITECTURE:
"For audit logging, I'd use a transactional outbox pattern.
The audit event is written to an outbox table in the same
transaction as the business operation. This guarantees we
never have an action without its corresponding audit trail."
DISCUSSING IMMUTABILITY:
"Audit logs must be immutable—no updates, no deletes. Once
written, an event is permanent. If we need to correct
something, we add a correction event that references the
original. This preserves the complete history and maintains
evidence integrity."
ADDRESSING SCALE:
"At 50,000 events per second, we need tiered storage. Hot
storage in TimescaleDB for recent data—90 days—with good
query performance. Then data moves to ClickHouse for the
next year, and finally to S3 Parquet for long-term retention.
Each tier optimized for its access pattern."
HANDLING FAILURES:
"Even with best efforts, some events will fail to process.
We use a dead letter queue to capture these—they're not lost,
just quarantined. We alert on DLQ growth and have procedures
to investigate and replay. For audit logs, we never drop data."
9.3 Questions to Ask Interviewer
- "What are the regulatory requirements? SOX, HIPAA, GDPR?"
- "How long must audit data be retained?"
- "What's the query pattern? Real-time investigation or periodic reports?"
- "Is there a legal or compliance team that will access this data?"
9.4 Common Follow-up Questions
| Question | Good Answer |
|---|---|
| "How do you ensure no audit events are lost?" | "Transactional outbox—audit write is in the same DB transaction as the business logic. Either both succeed or both fail. Then async publishing to Kafka with DLQ for any processing failures." |
| "How do you prove logs weren't tampered with?" | "Hash chaining—each event includes the hash of the previous event. To tamper with one, you'd need to recompute all subsequent hashes, which is detectable. Plus write-once storage like S3 Object Lock." |
| "How do you handle sensitive data in logs?" | "Tokenization or encryption for PII. Access controls so only authorized personnel can see full data. The audit log itself has an audit trail—we log who accessed the logs." |
| "What if the audit system itself goes down?" | "Outbox pattern buffers events in the primary database. When the audit pipeline recovers, it catches up. We'd alert immediately on outage, but no data is lost because it's in the transactional outbox." |
Chapter 10: Practice Problems
Problem 1: Healthcare Patient Access Audit
Setup: You're designing an audit system for a hospital's electronic health records. Every access to patient data must be logged for HIPAA compliance.
Requirements:
- 10,000 healthcare workers
- 500,000 patients
- Every view, edit, print of patient records audited
- "Break the glass" emergency access tracking
- 6-year retention
- Patients can request report of who accessed their records
Questions:
- How do you capture the relationship between accessor and patient?
- How do you handle break-the-glass scenarios?
- How do you generate per-patient access reports efficiently?
- Consider role-based access patterns
- Break-the-glass needs extra justification
- Pre-aggregate for patient-centric queries
Schema for Relationship Tracking:
{
"actor": {
"id": "dr_smith",
"role": "physician",
"department": "cardiology"
},
"access_context": {
"relationship": "treating_physician",
"assignment_id": "assign_123",
"access_level": "normal" // or "emergency_override"
},
"resource": {
"type": "patient_record",
"patient_id": "patient_456",
"record_section": "medications"
}
}
Break-the-Glass Handling:
if access_type == "emergency_override":
event.access_context = {
"access_level": "emergency_override",
"justification": request.justification, # Required
"witness": request.witness_id, # Required
"auto_review": True # Flag for compliance
}
# Send immediate alert
await alert_compliance_team(event)
Patient-Centric Reports:
-- Materialized view for patient reports
CREATE MATERIALIZED VIEW patient_access_summary AS
SELECT
resource_id as patient_id,
actor_id,
actor_role,
COUNT(*) as access_count,
MIN(timestamp) as first_access,
MAX(timestamp) as last_access
FROM audit_events
WHERE resource_type = 'patient_record'
GROUP BY resource_id, actor_id, actor_role;
-- Refresh daily
REFRESH MATERIALIZED VIEW patient_access_summary;
Problem 2: Financial Trading Audit
Setup: You're designing audit for a stock trading platform. Every order, modification, and cancellation must be logged with millisecond precision.
Requirements:
- 100,000 orders per second during market hours
- Sub-millisecond timestamp accuracy
- Must reconstruct order book state at any point in time
- 7-year retention (SEC requirement)
- Real-time compliance monitoring
Questions:
- How do you handle the throughput requirement?
- How do you ensure timestamp accuracy?
- How do you enable point-in-time reconstruction?
- Kafka partitioning for throughput
- Hardware timestamping for accuracy
- Event sourcing enables reconstruction
High Throughput:
# Kafka configuration
Partitions: 64 (parallel consumers)
Replication: 3
Batch size: Large for throughput
# Partition strategy
partition_key = f"{symbol}:{order_id % 100}"
# Orders for same symbol grouped, but spread load
Timestamp Accuracy:
# Use high-precision timestamps
import time
event = AuditEvent(
timestamp_ns=time.time_ns(), # Nanosecond precision
clock_source="ntp_sync", # Document source
sequence_id=get_sequence(), # Ordering within same ns
)
Point-in-Time Reconstruction:
# Event sourcing model
events = [
{"type": "order.placed", "order_id": "123", "qty": 100, ...},
{"type": "order.partial_fill", "order_id": "123", "filled_qty": 50, ...},
{"type": "order.modified", "order_id": "123", "new_qty": 75, ...},
{"type": "order.cancelled", "order_id": "123", ...}
]
def reconstruct_order_state(order_id, as_of_timestamp):
events = get_events(order_id, before=as_of_timestamp)
state = {}
for e in events:
state = apply_event(state, e)
return state
Problem 3: Multi-Tenant SaaS Audit
Setup: You're designing audit for a multi-tenant SaaS platform where each customer (tenant) has isolated audit data.
Requirements:
- 1,000 tenants, varying sizes (1-10,000 users each)
- Tenants can query their own audit logs
- Some tenants have regulatory requirements, others don't
- Tenant data isolation is critical
Questions:
- How do you ensure tenant isolation?
- How do you handle tenants with different retention requirements?
- How do you optimize costs for inactive tenants?
- Partition by tenant for isolation
- Configuration-driven retention
- Cold storage for inactive tenants
Tenant Isolation:
-- Row-level security
CREATE POLICY tenant_isolation ON audit_events
USING (tenant_id = current_setting('app.tenant_id'));
-- Every query automatically filtered
SET app.tenant_id = 'tenant_123';
SELECT * FROM audit_events; -- Only sees tenant_123 data
Configurable Retention:
@dataclass
class TenantAuditConfig:
tenant_id: str
retention_days: int = 90 # Default
require_hash_chain: bool = False
compliance_mode: Optional[str] = None # "hipaa", "sox", etc.
# Load per-tenant config
config = await get_tenant_config(tenant_id)
if config.compliance_mode == "hipaa":
config.retention_days = 2190 # 6 years
config.require_hash_chain = True
Cost Optimization:
async def manage_tenant_storage(tenant_id):
activity = await get_tenant_activity(tenant_id, days=30)
if activity.event_count == 0:
# Inactive: move to cold storage
await archive_tenant_to_s3(tenant_id)
await delete_from_hot_storage(tenant_id)
elif activity.event_count < 100:
# Low activity: compress aggressively
await compress_tenant_data(tenant_id)
Chapter 11: Mock Interview Dialogue
Scenario: Design Audit System for Banking App
Interviewer: "We're building a mobile banking app. Design the audit logging system."
You: "Great, let me start by understanding the requirements. For banking, we're definitely looking at regulatory compliance—SOX, possibly state banking regulations. Can you confirm the retention requirements?"
Interviewer: "Yes, SOX compliance. 7-year retention. And we need to track every financial transaction and every access to customer data."
You: "Perfect. So we need to audit two categories: financial transactions like transfers and payments, and data access like viewing account details or statements. Let me design a system that handles both.
First, the core principle: audit events must be captured atomically with the business operation. If a transfer succeeds, its audit record must exist. We can't have orphaned transactions.
For the architecture, I'll use the transactional outbox pattern we discussed earlier this week:
Mobile App ──▶ API Gateway ──▶ Banking Service
│
┌──────┴──────┐
│ │
Transaction Audit Event
│ │
└──────┬──────┘
│
Same DB Transaction
│
▼
Outbox Table
│
(async)
▼
Kafka
│
▼
Audit Consumer
│
┌──────┴──────┐
│ │
Hot Store Cold Store
(TimescaleDB) (S3 Parquet)
The key here is that the audit event goes into an outbox table in the same database transaction as the banking operation. If the transaction fails, the audit is rolled back too. If it succeeds, the audit is guaranteed to be published."
Interviewer: "How do you handle the scale? We expect millions of transactions per day."
You: "Good question. Let me break down the write path:
At peak, banking apps typically see most activity between 9 AM and 5 PM. Let's assume 10 million transactions per day with 80% in an 8-hour window. That's roughly 280 transactions per second average, with peaks maybe 3-5x that, so let's design for 1,500 TPS.
For the outbox publisher, I'd use CDC with Debezium rather than polling. It reads from the PostgreSQL WAL with very low latency—events reach Kafka within 100ms of commit.
Kafka would have 32 partitions, partitioned by customer ID to maintain per-customer ordering. This lets us scale consumers horizontally.
For storage, I'd use TimescaleDB for the hot tier—it's optimized for time-series data with automatic partitioning. Events older than 90 days get archived to S3 in Parquet format, queryable via Athena for investigations."
Interviewer: "What about data that shouldn't be in logs, like account numbers?"
You: "Excellent security concern. I'd implement tokenization for sensitive fields:
audit_event = {
'actor': {
'id': 'usr_123',
'email': tokenize('alice@bank.com') # Reversible token
},
'action': 'transfer.create',
'resource': {
'type': 'transfer',
'id': 'txn_456'
},
'details': {
'from_account': mask('****5678'), # Last 4 only
'to_account': mask('****9012'),
'amount': 1000.00, # Amount visible (needed for compliance)
'currency': 'USD'
}
}
The full account numbers are stored separately in a secure vault with additional access controls. The audit log contains enough to investigate but not enough to compromise customer accounts if the logs were exposed.
Access to the detokenization service is itself audited—so we'd know who looked up the real account numbers."
Interviewer: "How do you prove the logs haven't been tampered with?"
You: "This is critical for SOX compliance. I'd implement hash chaining:
Each event includes the hash of the previous event. To tamper with event #1000, you'd have to recompute the hashes of events #1001 through present—which we can detect by verifying the chain.
Additionally, once events move to cold storage in S3, I'd enable S3 Object Lock with governance mode. This makes the files write-once-read-many (WORM) for the retention period. Even administrators can't delete or modify them without special override permissions that are themselves logged.
For extra assurance, we can periodically checkpoint the latest hash to a public blockchain or trusted timestamping service—though that may be overkill for most banking apps."
Interviewer: "What if the audit system itself goes down?"
You: "The transactional outbox is the safety net. If Kafka is down, events accumulate in the outbox table. When Kafka recovers, the publisher catches up. We might have some latency in the audit pipeline, but no data loss.
If the database itself is down... well, the banking operations can't happen either, so there's nothing to audit. The audit system's availability is tied to the primary database's availability, which should be very high with proper replication.
For the consumer side, any processing failures go to a dead letter queue. We monitor DLQ depth and alert if it grows. Failed events are never dropped—they wait for investigation and replay.
The DLQ also protects against poison pills—malformed events that crash the consumer. We detect these via processing timeouts and quarantine them immediately."
Week 3 Summary: Messaging and Async Processing
Congratulations! You've completed Week 3. Let's consolidate what you've learned:
WEEK 3 COMPLETE PICTURE
Day 1: Queue vs Stream
━━━━━━━━━━━━━━━━━━━━━━
Queues (RabbitMQ): Task distribution, once-consumed
Streams (Kafka): Event log, multi-consumer, replay
Choose based on consumption pattern, not popularity
Day 2: Transactional Outbox
━━━━━━━━━━━━━━━━━━━━━━━━━━━
Problem: Dual-write between DB and queue
Solution: Write to outbox in same transaction
Publisher reads outbox, sends to Kafka
Never lose data, never have orphans
Day 3: Backpressure and Flow Control
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Detection: Lag, depth, latency metrics
Response: Shed, limit, scale, spill
Critical: Systems must be able to say "slow down"
Prevents: Memory exhaustion, cascading failures
Day 4: Dead Letters and Poison Pills
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Poison pills crash consumers repeatedly
DLQ quarantines failed messages
Classification: Validation vs transient vs poison
Never lose data, even on failure
Day 5: Audit Log System (Today)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Combined ALL patterns into complete system
Outbox → Kafka → Consumer → Tiered Storage
Immutable, tamper-evident, queryable
Handles compliance requirements
HOW THEY CONNECT:
User Action
│
▼
Business Logic + Audit Client
│
▼ (Day 2: Transactional Outbox)
Outbox Table
│
▼ (Day 1: Kafka Stream)
Kafka Topic
│
▼ (Day 3: Backpressure Handling)
Audit Consumer
│
├──▶ Success: Storage
│
└──▶ Failure: (Day 4: DLQ)
Dead Letter Queue
│
▼
Investigation + Replay
YOU CAN NOW:
✓ Design async systems with guaranteed delivery
✓ Handle failures gracefully without data loss
✓ Scale to high throughput with backpressure
✓ Build compliant audit trails
✓ Operate production messaging systems
Summary
DAY 5 KEY TAKEAWAYS
CORE CONCEPT:
• Audit log: Who did what to which resource, when, from where
• Immutable, tamper-evident, queryable
• Combines all Week 3 patterns into production system
ARCHITECTURE:
• Transactional outbox for guaranteed capture
• Kafka for reliable transport
• Tiered storage: hot (days) → warm (months) → cold (years)
• Hash chaining for tamper evidence
IMPLEMENTATION:
• Audit client writes to outbox in same transaction
• CDC or polling publishes to Kafka
• Consumer writes to storage with backpressure handling
• DLQ for failed events
COMPLIANCE:
• SOX, HIPAA, GDPR have specific requirements
• Retention periods: years, not days
• Immutability is non-negotiable
• Access to logs is itself audited
INTERVIEW TIPS:
• Start with "What are the regulatory requirements?"
• Emphasize transactional outbox for guaranteed capture
• Discuss hash chaining for tamper evidence
• Show tiered storage for cost optimization
DEFAULT CHOICE:
• Transactional outbox + Kafka + TimescaleDB + S3
• Hash chaining enabled
• 90-day hot, 1-year warm, 7-year cold
📚 Further Reading
Official Documentation
- TimescaleDB: https://docs.timescale.com/
- ClickHouse: https://clickhouse.com/docs/
- S3 Object Lock: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock.html
Engineering Blogs
- Stripe Engineering: "Designing Audit Logs" — https://stripe.com/blog/engineering
- Segment Engineering: "Building an Audit Log" — https://segment.com/blog/
- Cloudflare: "The Audit Problem" — https://blog.cloudflare.com/
Compliance Resources
- SOX Compliance: https://www.sarbanes-oxley-101.com/
- HIPAA Audit Requirements: https://www.hhs.gov/hipaa/
- GDPR Article 30: Records of processing activities
Books
- "Designing Data-Intensive Applications" by Martin Kleppmann — Chapter 11
- "Building Event-Driven Microservices" by Adam Bellemare
- "Streaming Systems" by Tyler Akidau
🎉 Week 3 Complete!
You've finished the most intensive week so far. Take a moment to appreciate what you've built:
- A mental model for async system design
- Patterns for guaranteed message delivery
- Strategies for handling system overload
- A complete audit log system
Next Week Preview:
Week 4 — Caching: Beyond "Just Add Redis"
We'll dive into:
- Cache invalidation (the hardest problem)
- Thundering herd prevention
- Multi-tier caching strategies
- When NOT to cache
The patterns from this week will come up again—particularly around consistency and failure handling. Caching adds another layer of complexity that builds on everything you've learned.
Rest up. You've earned it.
End of Week 3, Day 5: Audit Log System