Himanshu Kukreja
0%
Day 01

Week 3 — Day 1: Queue vs Stream

System Design Mastery Series


Preface

It's 3 AM, and your phone buzzes with a PagerDuty alert. Your e-commerce platform's order processing system is down. Orders are being lost. Customer payments are going through, but confirmations aren't being sent. The support queue is exploding.

You SSH into the server and find the culprit: your synchronous order processing chain has collapsed. The email service is slow, causing timeouts that cascade back through payment processing, inventory updates, and order creation. One slow service brought down everything.

Last week, we learned about building resilient systems—timeouts, retries, circuit breakers, idempotency. But we left a dangerous hole:

┌────────────────────────────────────────────────────────────────────────┐
│                    THE SYNCHRONOUS TRAP                                │
│                                                                        │
│   User Request                                                         │
│        │                                                               │
│        ▼                                                               │
│   ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐             │
│   │ Create  │───▶│ Reserve │───▶│ Charge  │───▶│  Send   │             │
│   │  Order  │    │Inventory│    │ Payment │    │  Email  │             │
│   └─────────┘    └─────────┘    └─────────┘    └─────────┘             │
│        │              │              │              │                  │
│        │              │              │              │                  │
│        │              │              │              ▼                  │
│        │              │              │         ┌─────────┐             │
│        │              │              │         │  SLOW!  │             │
│        │              │              │         │ (5 sec) │             │
│        │              │              │         └─────────┘             │
│        │              │              │              │                  │
│        │              │              ◀──────────────┘                  │
│        │              ◀───────────────── Timeout propagates back       │
│        ◀────────────────────────────────                               │
│        │                                                               │
│        ▼                                                               │
│   User sees: "Error processing your order"                             │
│   Reality: Payment charged, but order not created                      │
│                                                                        │
│   Question: What went wrong?                                           │
│   Answer: Tight coupling. Every service waits for the next.            │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

This is the problem asynchronous messaging solves.

Today, we'll learn the fundamental building blocks of async communication: queues and streams. By the end of this session, you'll know exactly when to reach for Kafka, when RabbitMQ is the right choice, and why "just use Kafka" is often the wrong answer.


Part I: Foundations

Chapter 1: What Are Queues and Streams?

1.1 The Simple Definition

A message queue is a buffer that holds messages between a sender and receiver, allowing them to operate independently.

A stream (or log) is an append-only sequence of messages that multiple readers can consume at their own pace.

┌────────────────────────────────────────────────────────────────────────┐
│                    EVERYDAY ANALOGIES                                  │
│                                                                        │
│  QUEUE = Post Office Box                                               │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │   📬 You drop off a letter                                      │   │
│  │   📭 Recipient picks it up                                      │   │
│  │   🗑️ Letter is gone after pickup                                │   │
│  │                                                                 │   │
│  │   • One recipient per letter                                    │   │
│  │   • Letter disappears after reading                             │   │
│  │   • Can't re-read the letter                                    │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                        │
│  STREAM = Newspaper Archive                                            │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │   📰 Publisher adds today's paper to the archive                │   │
│  │   👀 Anyone can read any past edition                           │   │
│  │   📚 Papers stay in the archive (until cleanup policy)          │   │
│  │                                                                 │   │
│  │   • Multiple readers, same content                              │   │
│  │   • Content persists after reading                              │   │
│  │   • Can re-read any past edition                                │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

1.2 Why Distributed Systems Need Messaging

In a distributed system, services need to communicate. There are two fundamental approaches:

Synchronous Communication (HTTP/RPC):

  • Service A calls Service B and waits for a response
  • Simple to understand and implement
  • Creates tight coupling between services
  • Cascading failures when downstream services are slow

Asynchronous Communication (Messaging):

  • Service A sends a message and continues immediately
  • Services are decoupled in time and availability
  • More complex to implement correctly
  • Resilient to downstream failures
┌────────────────────────────────────────────────────────────────────────┐
│                 SYNCHRONOUS vs ASYNCHRONOUS                            │
│                                                                        │
│  SYNCHRONOUS (HTTP/RPC)                                                │
│                                                                        │
│    Service A                    Service B                              │
│    ┌───────┐                    ┌───────┐                              │
│    │       │ ──── Request ────▶ │       │                              │
│    │       │                    │       │  Processing...               │
│    │ WAIT  │                    │       │  ...                         │
│    │       │ ◀─── Response ──── │       │                              │
│    │       │                    │       │                              │
│    └───────┘                    └───────┘                              │
│                                                                        │
│    Total time = A's time + B's time + network                          │
│    If B is down, A fails immediately                                   │
│                                                                        │
│  ASYNCHRONOUS (Messaging)                                              │
│                                                                        │
│    Service A       Queue/Stream       Service B                        │
│    ┌───────┐       ┌─────────┐       ┌───────┐                         │
│    │       │ ────▶ │ ░░░░░░░ │       │       │                         │
│    │ DONE! │       │ ░░░░░░░ │ ────▶ │       │                         │
│    │       │       │ ░░░░░░░ │       │       │                         │
│    └───────┘       └─────────┘       └───────┘                         │
│                                                                        │
│    A completes immediately after sending                               │
│    If B is down, messages queue up and wait                            │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

1.3 The Formal Definitions

Let's be precise about terminology:

Term Definition
Producer Service that sends messages
Consumer Service that receives and processes messages
Broker The messaging infrastructure (Kafka, RabbitMQ, etc.)
Topic/Queue Named destination for messages
Partition Subdivision of a topic for parallelism (Kafka)
Consumer Group Set of consumers that share message processing
Offset Position in a stream (how far you've read)
Acknowledgment (Ack) Confirmation that a message was processed

1.4 The Two Mental Models

This is the most important concept of the day. There are two fundamentally different mental models:

┌────────────────────────────────────────────────────────────────────────┐
│                    TWO MENTAL MODELS                                   │
├────────────────────────────────────────────────────────────────────────┤
│                                                                        │
│  MODEL 1: QUEUE (Job Distribution)                                     │
│  ═══════════════════════════════                                       │
│                                                                        │
│  Think: "Work to be done"                                              │
│                                                                        │
│    Producer ───▶ [Task][Task][Task][Task] ───▶ Worker                  │
│                                                                        │
│    • Each task goes to exactly ONE worker                              │
│    • Task is REMOVED after processing                                  │
│    • Workers compete for tasks                                         │
│    • No history—once done, it's gone                                   │
│                                                                        │
│  Examples: RabbitMQ, SQS, Redis LPUSH/BRPOP, Celery                    │
│                                                                        │
│  Use cases:                                                            │
│    - Background job processing                                         │
│    - Task distribution across workers                                  │
│    - Request buffering during traffic spikes                           │
│                                                                        │
├────────────────────────────────────────────────────────────────────────┤
│                                                                        │
│  MODEL 2: STREAM/LOG (Event History)                                   │
│  ══════════════════════════════════                                    │
│                                                                        │
│  Think: "Things that happened"                                         │
│                                                                        │
│    Producer ───▶ [Event][Event][Event][Event][Event]...                │
│                      ▲           ▲           ▲                         │
│                      │           │           │                         │
│                  Reader A    Reader B    Reader C                      │
│                 (offset 1)  (offset 3)  (offset 2)                     │
│                                                                        │
│    • Events are RETAINED (not removed after reading)                   │
│    • Multiple readers can read the same events                         │
│    • Each reader tracks their own position (offset)                    │
│    • Full history available for replay                                 │
│                                                                        │
│  Examples: Kafka, Kinesis, Redpanda, Pulsar, Redis Streams             │
│                                                                        │
│  Use cases:                                                            │
│    - Event sourcing and audit logs                                     │
│    - Real-time analytics                                               │
│    - Multi-consumer fan-out                                            │
│    - Replay for debugging or reprocessing                              │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

The key insight: Queues are about work distribution. Streams are about event history.


Chapter 2: Messaging Patterns

2.1 Pattern 1: Point-to-Point (Queue)

In point-to-point messaging, each message is delivered to exactly one consumer.

┌────────────────────────────────────────────────────────────────────────┐
│                    POINT-TO-POINT PATTERN                              │
│                                                                        │
│                         ┌────────────────┐                             │
│                         │     QUEUE      │                             │
│    Producer ──────────▶ │ [M1][M2][M3]   │                             │
│                         └───────┬────────┘                             │
│                                 │                                      │
│              ┌──────────────────┼──────────────────┐                   │
│              │                  │                  │                   │
│              ▼                  ▼                  ▼                   │
│         ┌────────┐         ┌────────┐         ┌────────┐               │
│         │Consumer│         │Consumer│         │Consumer│               │
│         │   A    │         │   B    │         │   C    │               │
│         └────────┘         └────────┘         └────────┘               │
│                                                                        │
│    M1 → Consumer A (removed from queue)                                │
│    M2 → Consumer B (removed from queue)                                │
│    M3 → Consumer C (removed from queue)                                │
│                                                                        │
│    Each message is processed by exactly ONE consumer.                  │
│    Consumers compete for messages.                                     │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

How it works:

  1. Producer sends message to queue
  2. Broker holds message until a consumer is available
  3. One consumer receives and processes the message
  4. Consumer acknowledges completion
  5. Message is removed from queue

Pros:

  • Simple mental model
  • Natural load balancing across consumers
  • Easy to scale by adding consumers
  • Built-in work distribution

Cons:

  • Message is gone after processing (no replay)
  • Only one consumer per message
  • Harder to add new consumers that need historical data

Use when:

  • You have background jobs to process
  • Work needs to be distributed across workers
  • You don't need to replay or re-read messages

2.2 Pattern 2: Publish-Subscribe (Fan-Out)

In pub-sub, each message is delivered to ALL subscribers.

┌────────────────────────────────────────────────────────────────────────┐
│                    PUBLISH-SUBSCRIBE PATTERN                           │
│                                                                        │
│                         ┌────────────────┐                             │
│                         │     TOPIC      │                             │
│    Publisher ─────────▶ │   [Message]    │                             │
│                         └───────┬────────┘                             │
│                                 │                                      │
│              ┌──────────────────┼──────────────────┐                   │
│              │                  │                  │                   │
│              ▼                  ▼                  ▼                   │
│         ┌────────┐         ┌────────┐         ┌────────┐               │
│         │Subscr. │         │Subscr. │         │Subscr. │               │
│         │   A    │         │   B    │         │   C    │               │
│         │(Email) │         │(Mobile)│         │(Analytics)             │
│         └────────┘         └────────┘         └────────┘               │
│                                                                        │
│    Same message delivered to ALL subscribers.                          │
│    Each subscriber gets a copy.                                        │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

How it works:

  1. Publisher sends message to topic
  2. Each subscriber receives a copy of the message
  3. Subscribers process independently
  4. Message may or may not be retained (depends on implementation)

Pros:

  • Easy to add new subscribers
  • Decouples publishers from subscribers
  • Same event can trigger multiple actions

Cons:

  • All subscribers must handle the load
  • Can't easily distribute work within a subscriber type

Use when:

  • Multiple systems need to react to the same event
  • You're building event-driven architecture
  • Adding new capabilities shouldn't require changing existing systems

2.3 Pattern 3: Consumer Groups (Partitioned Stream)

Consumer groups combine the best of both: fan-out ACROSS groups, load balancing WITHIN groups.

┌────────────────────────────────────────────────────────────────────────┐
│                    CONSUMER GROUPS PATTERN                             │
│                                                                        │
│    This is Kafka's killer feature.                                     │
│                                                                        │
│                    ┌─────────────────────────────────────┐             │
│                    │            TOPIC                    │             │
│    Producer ─────▶ │  Partition 0: [M1][M4][M7]         │              │
│                    │  Partition 1: [M2][M5][M8]         │              │
│                    │  Partition 2: [M3][M6][M9]         │              │
│                    └──────────────┬──────────────────────┘             │
│                                   │                                    │
│          ┌────────────────────────┼────────────────────────┐           │
│          │                        │                        │           │
│          ▼                        ▼                        ▼           │
│   ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐    │
│   │ Consumer Group A│    │ Consumer Group B│    │ Consumer Group C│    │
│   │   (Orders)      │    │  (Analytics)    │    │   (Audit)       │    │
│   │                 │    │                 │    │                 │    │
│   │  C1  C2  C3     │    │    C1   C2      │    │       C1        │    │
│   │  ▲   ▲   ▲      │    │    ▲    ▲       │    │       ▲         │    │
│   │  │   │   │      │    │    │    │       │    │       │         │    │
│   │  P0  P1  P2     │    │   P0,1  P2      │    │    P0,P1,P2     │    │
│   └─────────────────┘    └─────────────────┘    └─────────────────┘    │
│                                                                        │
│   Each GROUP gets ALL messages (fan-out)                               │
│   Within a group, messages are DIVIDED (load balance)                  │
│   Each partition goes to exactly one consumer per group                │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

How it works:

  1. Topic is divided into partitions
  2. Each consumer group gets all messages
  3. Within a group, each partition is assigned to one consumer
  4. Messages within a partition are processed in order
  5. Parallelism = number of partitions

Pros:

  • Fan-out to multiple consumer groups
  • Load balancing within each group
  • Ordering guaranteed within partition
  • Replay possible (rewind offset)

Cons:

  • More complex to set up and operate
  • Partition count limits parallelism
  • Rebalancing can cause delays

Use when:

  • Multiple systems need the same events
  • You need both fan-out AND parallel processing
  • Ordering within a key (e.g., per user) matters

2.4 Pattern Comparison

Aspect Point-to-Point Pub-Sub Consumer Groups
Message delivery One consumer All subscribers All groups, one per group
Load balancing Automatic None Within group
Ordering FIFO (usually) Per subscriber Per partition
Replay No Depends Yes
Use case Job queue Event notification Event streaming
Example RabbitMQ queue SNS topic Kafka topic

Chapter 3: Trade-offs and Considerations

3.1 Trade-off Matrix

┌────────────────────────────────────────────────────────────────────────┐
│                    QUEUE VS STREAM TRADE-OFFS                          │
│                                                                        │
│  QUEUE (RabbitMQ, SQS):                                                │
│  ├── ✓ Simpler mental model                                            │
│  ├── ✓ Message-level acknowledgment                                    │
│  ├── ✓ Built-in retry and dead-letter support                          │
│  ├── ✓ Flexible routing (exchanges, bindings)                          │
│  ├── ✗ No replay capability                                            │
│  ├── ✗ Single consumer per message                                     │
│  ├── ✗ Message loss if broker crashes before persist                   │
│  └── Best for: Job queues, task distribution, RPC patterns             │
│                                                                        │
│  STREAM (Kafka, Kinesis):                                              │
│  ├── ✓ Replay capability (rewind offset)                               │
│  ├── ✓ Multiple consumer groups                                        │
│  ├── ✓ High throughput (batching, zero-copy)                           │
│  ├── ✓ Durable storage (configurable retention)                        │
│  ├── ✗ More complex operations                                         │
│  ├── ✗ Partition count limits parallelism                              │
│  ├── ✗ Consumer must track offset                                      │
│  └── Best for: Event sourcing, analytics, multi-consumer fan-out       │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

3.2 Delivery Guarantees

This is where things get nuanced. What happens if something fails?

┌────────────────────────────────────────────────────────────────────────┐
│                    DELIVERY GUARANTEES                                 │
│                                                                        │
│  AT-MOST-ONCE                                                          │
│  ════════════                                                          │
│    • Message may be lost                                               │
│    • Message will NOT be duplicated                                    │
│    • Implementation: Fire and forget, no ack                           │
│    • Use when: Loss acceptable (metrics, logs, non-critical)           │
│                                                                        │
│    Producer ──▶ Broker ──▶ Consumer                                    │
│                    │           │                                       │
│                    │     ┌─────┴─────┐                                 │
│                    │     │ Crash!    │  Message lost                   │
│                    │     └───────────┘                                 │
│                                                                        │
│  AT-LEAST-ONCE                                                         │
│  ══════════════                                                        │
│    • Message will NOT be lost                                          │
│    • Message MAY be duplicated                                         │
│    • Implementation: Ack after processing, retry on failure            │
│    • Use when: Loss unacceptable, can handle duplicates                │
│                                                                        │
│    Producer ──▶ Broker ──▶ Consumer                                    │
│                    │           │                                       │
│                    │     ┌─────┴─────┐                                 │
│                    │     │ Processed │                                 │
│                    │     │ but ack   │  Retry = duplicate              │
│                    │     │ lost      │                                 │
│                    │     └───────────┘                                 │
│                                                                        │
│  EXACTLY-ONCE                                                          │
│  ═════════════                                                         │
│    • Message will NOT be lost                                          │
│    • Message will NOT be duplicated                                    │
│    • Implementation: Transactional processing + deduplication          │
│    • Reality: Very hard. Usually "effectively once" via idempotency.   │
│                                                                        │
│    This is often a LIE or requires special support from both           │
│    the broker AND the consumer. Kafka 0.11+ supports it with           │
│    transactions, but your consumer must participate.                   │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

The practical reality:

Most systems use at-least-once delivery with idempotent consumers. This is what you learned in Week 2, Day 2 (Idempotency). The broker guarantees delivery, and your consumer handles duplicates.

3.3 Ordering Guarantees

When do messages arrive in order?

┌────────────────────────────────────────────────────────────────────────┐
│                    ORDERING GUARANTEES                                 │
│                                                                        │
│  GLOBAL ORDERING (All messages in order)                               │
│  ═══════════════                                                       │
│                                                                        │
│    [M1] → [M2] → [M3] → [M4] → [M5]                                    │
│                                                                        │
│    • Single partition/queue                                            │
│    • Single consumer                                                   │
│    • No parallelism possible                                           │
│    • Throughput limited                                                │
│                                                                        │
│  PARTITION ORDERING (Order within partition)                           │
│  ══════════════════                                                    │
│                                                                        │
│    Partition 0: [M1] → [M3] → [M5]   (User A's messages in order)      │
│    Partition 1: [M2] → [M4] → [M6]   (User B's messages in order)      │
│                                                                        │
│    • Messages with same key go to same partition                       │
│    • Order guaranteed within partition                                 │
│    • Parallelism = number of partitions                                │
│    • This is Kafka's model                                             │
│                                                                        │
│  NO ORDERING                                                           │
│  ═══════════                                                           │
│                                                                        │
│    Messages processed in any order                                     │
│                                                                        │
│    • Maximum parallelism                                               │
│    • Consumer must handle out-of-order                                 │
│    • Simpler infrastructure                                            │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Common pattern: Use partition key to ensure related messages stay together.

# Example: Order events for same user stay in order
producer.send(
    topic="orders",
    key=user_id,        # Partition key
    value=order_event
)

# All events for user_123 go to same partition
# Processed in order by one consumer

3.4 Decision Framework

When choosing between queue and stream:

┌────────────────────────────────────────────────────────────────────────┐
│                    DECISION FRAMEWORK                                  │
│                                                                        │
│  START HERE                                                            │
│      │                                                                 │
│      ▼                                                                 │
│  ┌─────────────────────────────────────────┐                           │
│  │ Do you need to replay messages?         │                           │
│  └─────────────────┬───────────────────────┘                           │
│                    │                                                   │
│         YES ───────┼─────── NO                                         │
│          │         │        │                                          │
│          ▼         │        ▼                                          │
│       STREAM       │   ┌─────────────────────────────────────────┐     │
│                    │   │ Do multiple systems need the same data? │     │
│                    │   └─────────────────┬───────────────────────┘     │
│                    │                     │                             │
│                    │          YES ───────┼─────── NO                   │
│                    │           │         │        │                    │
│                    │           ▼         │        ▼                    │
│                    │        STREAM       │   ┌────────────────────┐    │
│                    │                     │   │ Is this a job/task │    │
│                    │                     │   │ to be processed?   │    │
│                    │                     │   └─────────┬──────────┘    │
│                    │                     │             │               │
│                    │                     │  YES ───────┼─────── NO     │
│                    │                     │   │         │        │      │
│                    │                     │   ▼         │        ▼      │
│                    │                     │ QUEUE       │    STREAM     │
│                    │                     │             │               │
│                    │                     │             │               │
│                    │                     │             │               │
└────────────────────────────────────────────────────────────────────────┘

SUMMARY:
────────
• Need replay? → Stream
• Multiple consumers need same data? → Stream  
• Task/job processing? → Queue
• Event notification? → Stream (or pub-sub)
• Uncertain? → Start with queue (simpler), migrate if needed

Part II: Implementation

Chapter 4: Basic Implementation

4.1 Simple Queue with Redis

Let's start with the simplest possible queue using Redis. This is educational—not production-ready.

# Basic Queue Implementation with Redis
# WARNING: Not production-ready - for learning only

import redis
import json
from typing import Any, Optional

class SimpleRedisQueue:
    """
    Basic queue using Redis lists.
    Messages are pushed to the right, popped from the left (FIFO).
    """
    
    def __init__(self, redis_url: str, queue_name: str):
        self.redis = redis.from_url(redis_url)
        self.queue_name = queue_name
    
    def publish(self, message: dict) -> None:
        """
        Add a message to the queue.
        """
        serialized = json.dumps(message)
        self.redis.rpush(self.queue_name, serialized)
    
    def consume(self, timeout: int = 0) -> Optional[dict]:
        """
        Get and remove a message from the queue.
        Blocks until a message is available or timeout.
        
        Args:
            timeout: Seconds to wait (0 = wait forever)
            
        Returns:
            Message dict or None if timeout
        """
        # BLPOP blocks until message available
        result = self.redis.blpop(self.queue_name, timeout=timeout)
        
        if result is None:
            return None
        
        _, message = result
        return json.loads(message)


# Usage example
def producer_example():
    queue = SimpleRedisQueue("redis://localhost:6379", "orders")
    
    # Publish some orders
    queue.publish({"order_id": "123", "user": "alice", "amount": 99.99})
    queue.publish({"order_id": "124", "user": "bob", "amount": 49.99})
    
    print("Published 2 orders")


def consumer_example():
    queue = SimpleRedisQueue("redis://localhost:6379", "orders")
    
    while True:
        message = queue.consume(timeout=5)
        
        if message is None:
            print("No messages, waiting...")
            continue
        
        print(f"Processing order: {message['order_id']}")
        # Process the order...
        
        # NOTE: If we crash here, message is LOST!
        # No acknowledgment, no retry.

What's wrong with this implementation?

  1. No acknowledgment: If consumer crashes after BLPOP but before processing, message is lost
  2. No retry: Failed messages disappear forever
  3. No dead letter queue: No way to handle poison pills
  4. No visibility timeout: Can't handle slow consumers

4.2 Simple Stream with Kafka (Conceptual)

Here's a conceptual Kafka producer/consumer:

# Basic Kafka Producer/Consumer
# WARNING: Not production-ready - for learning only

from kafka import KafkaProducer, KafkaConsumer
import json

class SimpleKafkaProducer:
    """
    Basic Kafka producer.
    """
    
    def __init__(self, bootstrap_servers: str):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None
        )
    
    def publish(self, topic: str, message: dict, key: str = None) -> None:
        """
        Send message to topic.
        Key determines which partition receives the message.
        """
        self.producer.send(topic, value=message, key=key)
        self.producer.flush()  # Wait for send to complete


class SimpleKafkaConsumer:
    """
    Basic Kafka consumer.
    """
    
    def __init__(self, bootstrap_servers: str, topic: str, group_id: str):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset='earliest',  # Start from beginning if new
            enable_auto_commit=True,        # Automatically commit offsets
            value_deserializer=lambda v: json.loads(v.decode('utf-8'))
        )
    
    def consume(self):
        """
        Yield messages from the topic.
        """
        for message in self.consumer:
            yield {
                'topic': message.topic,
                'partition': message.partition,
                'offset': message.offset,
                'key': message.key.decode('utf-8') if message.key else None,
                'value': message.value
            }


# Usage example
def kafka_producer_example():
    producer = SimpleKafkaProducer("localhost:9092")
    
    # Messages with same key go to same partition (ordered)
    producer.publish("orders", {"order_id": "123", "action": "created"}, key="user_alice")
    producer.publish("orders", {"order_id": "123", "action": "paid"}, key="user_alice")
    producer.publish("orders", {"order_id": "124", "action": "created"}, key="user_bob")


def kafka_consumer_example():
    consumer = SimpleKafkaConsumer(
        "localhost:9092", 
        "orders", 
        group_id="order-processor"
    )
    
    for message in consumer.consume():
        print(f"Partition {message['partition']}, Offset {message['offset']}")
        print(f"Key: {message['key']}, Value: {message['value']}")
        # Process the message...
        
        # NOTE: With auto_commit=True, offset is committed automatically
        # If we crash, we might reprocess (at-least-once)

4.3 Understanding the Flow

┌────────────────────────────────────────────────────────────────────────┐
│                    KAFKA MESSAGE FLOW                                  │
│                                                                        │
│  Step 1: Producer sends with key                                       │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │  producer.send("orders", key="user_123", value={...})            │  │
│  └──────────────────────────────────────────────────────────────────┘  │
│                              │                                         │
│                              ▼                                         │
│  Step 2: Key is hashed to determine partition                          │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │  partition = hash("user_123") % num_partitions                   │  │
│  │  partition = 2847293847 % 6 = 3                                  │  │
│  └──────────────────────────────────────────────────────────────────┘  │
│                              │                                         │
│                              ▼                                         │
│  Step 3: Message appended to partition log                             │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │  Partition 3: [...][msg][msg][NEW MSG]                           │  │
│  │                                   ▲                              │  │
│  │                              offset 847                          │  │
│  └──────────────────────────────────────────────────────────────────┘  │
│                              │                                         │
│                              ▼                                         │
│  Step 4: Consumer group assigns partitions                             │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │  Consumer Group "order-processor"                                │  │
│  │    Consumer A: Partitions 0, 1                                   │  │
│  │    Consumer B: Partitions 2, 3  ◄── Gets partition 3             │  │
│  │    Consumer C: Partitions 4, 5                                   │  │
│  └──────────────────────────────────────────────────────────────────┘  │
│                              │                                         │
│                              ▼                                         │
│  Step 5: Consumer reads and commits offset                             │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │  Consumer B reads offset 847                                     │  │
│  │  Processes message                                               │  │
│  │  Commits offset 848 (next to read)                               │  │
│  └──────────────────────────────────────────────────────────────────┘  │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Chapter 5: Production-Ready Implementation

5.1 Requirements for Production

Our production implementation needs:

  1. Reliable delivery — No message loss
  2. Acknowledgment — Confirm processing before removing
  3. Retry with backoff — Handle transient failures
  4. Dead letter queue — Handle permanent failures
  5. Monitoring — Track lag, errors, throughput
  6. Graceful shutdown — Don't lose in-flight messages

5.2 Production Kafka Consumer

# Production-Ready Kafka Consumer
# With proper error handling, retries, and monitoring

import asyncio
from dataclasses import dataclass, field
from typing import Callable, Optional, Dict, Any, List
from datetime import datetime, timedelta
from enum import Enum
import logging
import json
from contextlib import asynccontextmanager

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.errors import KafkaError

logger = logging.getLogger(__name__)


class ProcessingResult(Enum):
    SUCCESS = "success"
    RETRY = "retry"           # Transient failure, retry later
    DEAD_LETTER = "dead_letter"  # Permanent failure, send to DLQ


@dataclass
class ConsumerConfig:
    """Configuration for Kafka consumer."""
    bootstrap_servers: str
    topic: str
    group_id: str
    
    # Retry configuration
    max_retries: int = 3
    retry_backoff_ms: int = 1000
    max_retry_backoff_ms: int = 30000
    
    # Consumer configuration
    auto_offset_reset: str = "earliest"
    enable_auto_commit: bool = False  # Manual commit for reliability
    max_poll_records: int = 100
    session_timeout_ms: int = 30000
    
    # Dead letter topic
    dead_letter_topic: Optional[str] = None


@dataclass
class MessageContext:
    """Context for message processing."""
    topic: str
    partition: int
    offset: int
    key: Optional[str]
    value: Dict[str, Any]
    timestamp: datetime
    headers: Dict[str, str] = field(default_factory=dict)
    retry_count: int = 0


class KafkaConsumerService:
    """
    Production-ready Kafka consumer with:
    - Manual offset commits (at-least-once delivery)
    - Retry with exponential backoff
    - Dead letter queue for failed messages
    - Graceful shutdown
    - Metrics and logging
    """
    
    def __init__(
        self,
        config: ConsumerConfig,
        message_handler: Callable[[MessageContext], ProcessingResult]
    ):
        self.config = config
        self.message_handler = message_handler
        self.consumer: Optional[AIOKafkaConsumer] = None
        self.dlq_producer: Optional[AIOKafkaProducer] = None
        self._running = False
        self._shutdown_event = asyncio.Event()
        
        # Metrics
        self.messages_processed = 0
        self.messages_failed = 0
        self.messages_to_dlq = 0
    
    async def start(self) -> None:
        """Start the consumer."""
        logger.info(f"Starting consumer for topic {self.config.topic}")
        
        self.consumer = AIOKafkaConsumer(
            self.config.topic,
            bootstrap_servers=self.config.bootstrap_servers,
            group_id=self.config.group_id,
            auto_offset_reset=self.config.auto_offset_reset,
            enable_auto_commit=self.config.enable_auto_commit,
            max_poll_records=self.config.max_poll_records,
            session_timeout_ms=self.config.session_timeout_ms,
            value_deserializer=lambda v: json.loads(v.decode('utf-8')),
            key_deserializer=lambda k: k.decode('utf-8') if k else None
        )
        
        await self.consumer.start()
        
        # Start DLQ producer if configured
        if self.config.dead_letter_topic:
            self.dlq_producer = AIOKafkaProducer(
                bootstrap_servers=self.config.bootstrap_servers,
                value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                key_serializer=lambda k: k.encode('utf-8') if k else None
            )
            await self.dlq_producer.start()
        
        self._running = True
        logger.info(f"Consumer started for topic {self.config.topic}")
    
    async def stop(self) -> None:
        """Gracefully stop the consumer."""
        logger.info("Stopping consumer...")
        self._running = False
        self._shutdown_event.set()
        
        if self.consumer:
            await self.consumer.stop()
        
        if self.dlq_producer:
            await self.dlq_producer.stop()
        
        logger.info(f"Consumer stopped. Processed: {self.messages_processed}, "
                   f"Failed: {self.messages_failed}, DLQ: {self.messages_to_dlq}")
    
    async def run(self) -> None:
        """Main consumer loop."""
        try:
            async for message in self.consumer:
                if not self._running:
                    break
                
                ctx = MessageContext(
                    topic=message.topic,
                    partition=message.partition,
                    offset=message.offset,
                    key=message.key,
                    value=message.value,
                    timestamp=datetime.fromtimestamp(message.timestamp / 1000),
                    headers={k: v.decode('utf-8') for k, v in (message.headers or [])},
                    retry_count=self._get_retry_count(message.headers)
                )
                
                await self._process_message(ctx)
                
        except asyncio.CancelledError:
            logger.info("Consumer loop cancelled")
        except Exception as e:
            logger.error(f"Consumer loop error: {e}", exc_info=True)
            raise
    
    async def _process_message(self, ctx: MessageContext) -> None:
        """Process a single message with retry logic."""
        try:
            result = await self._execute_handler(ctx)
            
            if result == ProcessingResult.SUCCESS:
                # Commit offset on success
                await self.consumer.commit()
                self.messages_processed += 1
                logger.debug(f"Processed message at offset {ctx.offset}")
                
            elif result == ProcessingResult.RETRY:
                await self._handle_retry(ctx)
                
            elif result == ProcessingResult.DEAD_LETTER:
                await self._send_to_dlq(ctx, "Handler returned DEAD_LETTER")
                await self.consumer.commit()
                
        except Exception as e:
            logger.error(f"Error processing message: {e}", exc_info=True)
            await self._handle_retry(ctx)
    
    async def _execute_handler(self, ctx: MessageContext) -> ProcessingResult:
        """Execute the message handler with error handling."""
        try:
            # Handler might be sync or async
            result = self.message_handler(ctx)
            if asyncio.iscoroutine(result):
                result = await result
            return result
        except Exception as e:
            logger.warning(f"Handler raised exception: {e}")
            return ProcessingResult.RETRY
    
    async def _handle_retry(self, ctx: MessageContext) -> None:
        """Handle message retry with exponential backoff."""
        if ctx.retry_count >= self.config.max_retries:
            logger.warning(f"Max retries ({self.config.max_retries}) exceeded "
                          f"for message at offset {ctx.offset}")
            await self._send_to_dlq(ctx, f"Max retries exceeded after {ctx.retry_count} attempts")
            await self.consumer.commit()
            return
        
        # Calculate backoff
        backoff_ms = min(
            self.config.retry_backoff_ms * (2 ** ctx.retry_count),
            self.config.max_retry_backoff_ms
        )
        
        logger.info(f"Retrying message at offset {ctx.offset} "
                   f"(attempt {ctx.retry_count + 1}/{self.config.max_retries}) "
                   f"after {backoff_ms}ms")
        
        await asyncio.sleep(backoff_ms / 1000)
        
        # Re-process with incremented retry count
        ctx.retry_count += 1
        await self._process_message(ctx)
    
    async def _send_to_dlq(self, ctx: MessageContext, reason: str) -> None:
        """Send failed message to dead letter queue."""
        if not self.dlq_producer or not self.config.dead_letter_topic:
            logger.warning(f"No DLQ configured, dropping message at offset {ctx.offset}")
            self.messages_failed += 1
            return
        
        dlq_message = {
            "original_topic": ctx.topic,
            "original_partition": ctx.partition,
            "original_offset": ctx.offset,
            "original_key": ctx.key,
            "original_value": ctx.value,
            "original_timestamp": ctx.timestamp.isoformat(),
            "failure_reason": reason,
            "retry_count": ctx.retry_count,
            "failed_at": datetime.utcnow().isoformat()
        }
        
        await self.dlq_producer.send_and_wait(
            self.config.dead_letter_topic,
            value=dlq_message,
            key=ctx.key
        )
        
        self.messages_to_dlq += 1
        logger.info(f"Sent message at offset {ctx.offset} to DLQ: {reason}")
    
    def _get_retry_count(self, headers: Optional[List]) -> int:
        """Extract retry count from message headers."""
        if not headers:
            return 0
        for key, value in headers:
            if key == "retry_count":
                return int(value.decode('utf-8'))
        return 0


# =============================================================================
# Example Usage
# =============================================================================

async def order_handler(ctx: MessageContext) -> ProcessingResult:
    """Example message handler for order events."""
    try:
        order = ctx.value
        logger.info(f"Processing order {order.get('order_id')} "
                   f"for user {ctx.key}")
        
        # Simulate processing
        if order.get("amount", 0) < 0:
            # Invalid order - send to DLQ
            logger.warning(f"Invalid order amount: {order.get('amount')}")
            return ProcessingResult.DEAD_LETTER
        
        if order.get("action") == "fail_temporary":
            # Simulate transient failure
            return ProcessingResult.RETRY
        
        # Process the order...
        await asyncio.sleep(0.1)  # Simulate work
        
        return ProcessingResult.SUCCESS
        
    except KeyError as e:
        # Missing required field - permanent failure
        logger.error(f"Missing required field: {e}")
        return ProcessingResult.DEAD_LETTER
    except Exception as e:
        # Unknown error - retry
        logger.error(f"Processing error: {e}")
        return ProcessingResult.RETRY


async def main():
    """Example main function."""
    config = ConsumerConfig(
        bootstrap_servers="localhost:9092",
        topic="orders",
        group_id="order-processor",
        dead_letter_topic="orders-dlq",
        max_retries=3,
        retry_backoff_ms=1000
    )
    
    consumer = KafkaConsumerService(config, order_handler)
    
    try:
        await consumer.start()
        await consumer.run()
    except KeyboardInterrupt:
        pass
    finally:
        await consumer.stop()


if __name__ == "__main__":
    asyncio.run(main())

5.3 Production Kafka Producer

# Production-Ready Kafka Producer
# With batching, retries, and delivery confirmation

import asyncio
from dataclasses import dataclass
from typing import Optional, Dict, Any, Callable
import logging
import json
from datetime import datetime

from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError

logger = logging.getLogger(__name__)


@dataclass
class ProducerConfig:
    """Configuration for Kafka producer."""
    bootstrap_servers: str
    
    # Reliability settings
    acks: str = "all"                    # Wait for all replicas
    retries: int = 3                      # Retry on failure
    retry_backoff_ms: int = 100          # Backoff between retries
    
    # Batching settings (for throughput)
    linger_ms: int = 5                   # Wait up to 5ms to batch
    batch_size: int = 16384              # 16KB batch size
    
    # Idempotence (prevent duplicates on retry)
    enable_idempotence: bool = True


class KafkaProducerService:
    """
    Production-ready Kafka producer with:
    - Idempotent delivery (no duplicates)
    - Delivery confirmation
    - Batching for throughput
    - Proper error handling
    """
    
    def __init__(self, config: ProducerConfig):
        self.config = config
        self.producer: Optional[AIOKafkaProducer] = None
        
        # Metrics
        self.messages_sent = 0
        self.messages_failed = 0
    
    async def start(self) -> None:
        """Start the producer."""
        self.producer = AIOKafkaProducer(
            bootstrap_servers=self.config.bootstrap_servers,
            acks=self.config.acks,
            retries=self.config.retries,
            retry_backoff_ms=self.config.retry_backoff_ms,
            linger_ms=self.config.linger_ms,
            max_batch_size=self.config.batch_size,
            enable_idempotence=self.config.enable_idempotence,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None
        )
        await self.producer.start()
        logger.info("Producer started")
    
    async def stop(self) -> None:
        """Stop the producer, flushing pending messages."""
        if self.producer:
            await self.producer.stop()
        logger.info(f"Producer stopped. Sent: {self.messages_sent}, "
                   f"Failed: {self.messages_failed}")
    
    async def send(
        self,
        topic: str,
        value: Dict[str, Any],
        key: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None
    ) -> bool:
        """
        Send a message to Kafka.
        
        Args:
            topic: Target topic
            value: Message payload
            key: Partition key (messages with same key go to same partition)
            headers: Optional message headers
            
        Returns:
            True if sent successfully, False otherwise
        """
        try:
            # Convert headers to Kafka format
            kafka_headers = None
            if headers:
                kafka_headers = [(k, v.encode('utf-8')) for k, v in headers.items()]
            
            # Send and wait for acknowledgment
            result = await self.producer.send_and_wait(
                topic,
                value=value,
                key=key,
                headers=kafka_headers
            )
            
            self.messages_sent += 1
            logger.debug(f"Sent message to {topic}:{result.partition}@{result.offset}")
            return True
            
        except KafkaError as e:
            self.messages_failed += 1
            logger.error(f"Failed to send message: {e}")
            return False
    
    async def send_batch(
        self,
        topic: str,
        messages: list[tuple[Optional[str], Dict[str, Any]]]
    ) -> int:
        """
        Send multiple messages efficiently.
        
        Args:
            topic: Target topic
            messages: List of (key, value) tuples
            
        Returns:
            Number of successfully sent messages
        """
        tasks = []
        for key, value in messages:
            # Don't wait for each message individually
            future = await self.producer.send(topic, value=value, key=key)
            tasks.append(future)
        
        # Wait for all to complete
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        success_count = sum(1 for r in results if not isinstance(r, Exception))
        self.messages_sent += success_count
        self.messages_failed += len(results) - success_count
        
        return success_count


# =============================================================================
# Example Usage
# =============================================================================

async def producer_example():
    """Example of using the producer."""
    config = ProducerConfig(
        bootstrap_servers="localhost:9092",
        acks="all",
        enable_idempotence=True
    )
    
    producer = KafkaProducerService(config)
    await producer.start()
    
    try:
        # Send single message
        await producer.send(
            topic="orders",
            key="user_123",
            value={
                "order_id": "order_456",
                "action": "created",
                "amount": 99.99,
                "timestamp": datetime.utcnow().isoformat()
            },
            headers={"source": "api", "version": "1"}
        )
        
        # Send batch
        messages = [
            ("user_123", {"order_id": "order_457", "action": "created"}),
            ("user_456", {"order_id": "order_458", "action": "created"}),
            ("user_789", {"order_id": "order_459", "action": "created"}),
        ]
        sent = await producer.send_batch("orders", messages)
        print(f"Sent {sent} messages")
        
    finally:
        await producer.stop()

Chapter 6: Edge Cases and Error Handling

6.1 Edge Case 1: Consumer Crashes Mid-Processing

┌────────────────────────────────────────────────────────────────────────┐
│  SCENARIO: Consumer crashes after reading but before committing        │
│                                                                        │
│    Consumer reads message at offset 100                                │
│              │                                                         │
│              ▼                                                         │
│    Consumer starts processing                                          │
│              │                                                         │
│              ▼                                                         │
│    ┌─────────────────┐                                                 │
│    │     CRASH!      │  ◄── Process dies                               │
│    └─────────────────┘                                                 │
│              │                                                         │
│              ▼                                                         │
│    Offset NOT committed (still at 99)                                  │
│              │                                                         │
│              ▼                                                         │
│    New consumer starts, reads offset 100 again                         │
│              │                                                         │
│              ▼                                                         │
│    MESSAGE IS REPROCESSED (at-least-once)                              │
│                                                                        │
│  SOLUTION: Consumer must be IDEMPOTENT                                 │
│                                                                        │
│    Option 1: Deduplicate by message ID                                 │
│    Option 2: Use database transaction with offset                      │
│    Option 3: Make processing naturally idempotent                      │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Solution: Idempotent Consumer

class IdempotentOrderProcessor:
    """
    Process orders idempotently using a processed_ids table.
    """
    
    def __init__(self, db_pool):
        self.db = db_pool
    
    async def process_order(self, ctx: MessageContext) -> ProcessingResult:
        """Process order with idempotency check."""
        order = ctx.value
        message_id = f"{ctx.topic}:{ctx.partition}:{ctx.offset}"
        
        async with self.db.acquire() as conn:
            async with conn.transaction():
                # Check if already processed
                existing = await conn.fetchval(
                    "SELECT 1 FROM processed_messages WHERE message_id = $1",
                    message_id
                )
                
                if existing:
                    logger.info(f"Message {message_id} already processed, skipping")
                    return ProcessingResult.SUCCESS
                
                # Process the order
                await conn.execute(
                    """
                    INSERT INTO orders (order_id, user_id, amount, status)
                    VALUES ($1, $2, $3, 'created')
                    """,
                    order['order_id'], order['user_id'], order['amount']
                )
                
                # Mark as processed (in same transaction!)
                await conn.execute(
                    """
                    INSERT INTO processed_messages (message_id, processed_at)
                    VALUES ($1, NOW())
                    """,
                    message_id
                )
        
        return ProcessingResult.SUCCESS

6.2 Edge Case 2: Consumer Rebalance

┌────────────────────────────────────────────────────────────────────────┐
│  SCENARIO: Partition reassignment during processing                    │
│                                                                        │
│  BEFORE:                                                               │
│    Consumer A: Partitions 0, 1, 2                                      │
│    Consumer B: Partitions 3, 4, 5                                      │
│                                                                        │
│  Consumer C joins the group...                                         │
│                                                                        │
│  REBALANCE TRIGGERED:                                                  │
│    1. All consumers stop fetching                                      │
│    2. Partitions are reassigned                                        │
│    3. Uncommitted offsets may be lost                                  │
│                                                                        │
│  AFTER:                                                                │
│    Consumer A: Partitions 0, 1                                         │
│    Consumer B: Partitions 2, 3                                         │
│    Consumer C: Partitions 4, 5                                         │
│                                                                        │
│  PROBLEM: Consumer A was processing partition 2 during rebalance       │
│           Work may be lost or duplicated                               │
│                                                                        │
│  SOLUTIONS:                                                            │
│    1. Commit offsets frequently                                        │
│    2. Use cooperative rebalancing (Kafka 2.4+)                         │
│    3. Implement rebalance listener to commit before revoke             │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Solution: Rebalance Listener

from aiokafka import ConsumerRebalanceListener

class GracefulRebalanceListener(ConsumerRebalanceListener):
    """
    Handle partition rebalance gracefully.
    Commit offsets before partitions are revoked.
    """
    
    def __init__(self, consumer: AIOKafkaConsumer, processor):
        self.consumer = consumer
        self.processor = processor
    
    async def on_partitions_revoked(self, revoked):
        """Called before partitions are taken away."""
        logger.info(f"Partitions being revoked: {revoked}")
        
        # Finish any in-progress work
        await self.processor.finish_current_batch()
        
        # Commit current offsets
        await self.consumer.commit()
        
        logger.info("Committed offsets before rebalance")
    
    async def on_partitions_assigned(self, assigned):
        """Called after new partitions are assigned."""
        logger.info(f"Partitions assigned: {assigned}")
        
        # Optionally seek to specific offsets
        # for tp in assigned:
        #     await self.consumer.seek(tp, specific_offset)

6.3 Edge Case 3: Slow Consumer (Consumer Lag)

┌────────────────────────────────────────────────────────────────────────┐
│  SCENARIO: Consumer can't keep up with producer                        │
│                                                                        │
│    Producer rate: 10,000 messages/sec                                  │
│    Consumer rate: 1,000 messages/sec                                   │
│                                                                        │
│    Lag grows by 9,000 messages every second!                           │
│                                                                        │
│    After 1 hour: 32,400,000 messages behind                            │
│                                                                        │
│  SYMPTOMS:                                                             │
│    • Consumer lag metric increasing                                    │
│    • Memory pressure on broker (retaining more data)                   │
│    • Data becoming "stale" before processing                           │
│                                                                        │
│  SOLUTIONS:                                                            │
│    1. Add more consumers (up to partition count)                       │
│    2. Increase partition count (requires topic recreation)             │
│    3. Optimize consumer processing                                     │
│    4. Use batch processing                                             │
│    5. Implement backpressure on producer                               │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Solution: Monitor and Alert on Lag

from dataclasses import dataclass
from typing import Dict
import asyncio

@dataclass
class LagMetrics:
    partition: int
    current_offset: int
    end_offset: int
    lag: int


class ConsumerLagMonitor:
    """Monitor consumer lag and alert when thresholds exceeded."""
    
    def __init__(
        self,
        consumer: AIOKafkaConsumer,
        warning_threshold: int = 10000,
        critical_threshold: int = 100000
    ):
        self.consumer = consumer
        self.warning_threshold = warning_threshold
        self.critical_threshold = critical_threshold
    
    async def get_lag(self) -> Dict[int, LagMetrics]:
        """Get lag for all assigned partitions."""
        lag_metrics = {}
        
        # Get end offsets (latest)
        partitions = self.consumer.assignment()
        end_offsets = await self.consumer.end_offsets(partitions)
        
        for tp in partitions:
            current = await self.consumer.position(tp)
            end = end_offsets[tp]
            lag = end - current
            
            lag_metrics[tp.partition] = LagMetrics(
                partition=tp.partition,
                current_offset=current,
                end_offset=end,
                lag=lag
            )
        
        return lag_metrics
    
    async def check_lag(self) -> None:
        """Check lag and log warnings/errors."""
        lag_metrics = await self.get_lag()
        
        total_lag = sum(m.lag for m in lag_metrics.values())
        
        if total_lag >= self.critical_threshold:
            logger.error(f"CRITICAL: Consumer lag is {total_lag:,} messages!")
            # Send alert to PagerDuty/Slack
        elif total_lag >= self.warning_threshold:
            logger.warning(f"WARNING: Consumer lag is {total_lag:,} messages")
        else:
            logger.debug(f"Consumer lag: {total_lag:,} messages")
        
        # Log per-partition metrics
        for partition, metrics in lag_metrics.items():
            logger.debug(f"Partition {partition}: lag={metrics.lag:,}")
    
    async def monitor_loop(self, interval_seconds: int = 30):
        """Continuously monitor lag."""
        while True:
            try:
                await self.check_lag()
            except Exception as e:
                logger.error(f"Error checking lag: {e}")
            
            await asyncio.sleep(interval_seconds)

6.4 Error Handling Matrix

Error Type Cause Handling Retry?
Deserialization error Malformed message Send to DLQ, skip No
Validation error Missing required field Send to DLQ, skip No
Database timeout DB overloaded Retry with backoff Yes
Network error Connection lost Retry with backoff Yes
Out of memory Message too large Send to DLQ, skip No
Rate limited Downstream limit hit Retry with backoff Yes
Consumer crash Bug in handler Restart, reprocess Auto
Broker unavailable Kafka down Retry until available Yes

Part III: Real-World Application

Chapter 7: How Big Tech Does It

7.1 Case Study: LinkedIn — The Birthplace of Kafka

┌────────────────────────────────────────────────────────────────────────┐
│                    LINKEDIN'S KAFKA ARCHITECTURE                       │
│                                                                        │
│  SCALE:                                                                │
│    • 7+ trillion messages per day                                      │
│    • 100+ petabytes of data                                            │
│    • Thousands of producers and consumers                              │
│                                                                        │
│  USE CASES:                                                            │
│    ┌─────────────────────────────────────────────────────────────┐     │
│    │  Activity Tracking                                          │     │
│    │  • Page views, searches, profile views                      │     │
│    │  • Billions of events per day                               │     │
│    │  • Powers recommendations and analytics                     │     │
│    └─────────────────────────────────────────────────────────────┘     │
│                                                                        │
│    ┌─────────────────────────────────────────────────────────────┐     │
│    │  Data Pipeline                                              │     │
│    │  • Stream data from OLTP to OLAP                            │     │
│    │  • Real-time ETL                                            │     │
│    │  • Feed machine learning models                             │     │
│    └─────────────────────────────────────────────────────────────┘     │
│                                                                        │
│    ┌─────────────────────────────────────────────────────────────┐     │
│    │  Metrics and Monitoring                                     │     │
│    │  • Application metrics piped through Kafka                  │     │
│    │  • Aggregated in real-time                                  │     │
│    │  • Powers dashboards and alerts                             │     │
│    └─────────────────────────────────────────────────────────────┘     │
│                                                                        │
│  KEY DECISIONS:                                                        │
│    • Chose stream (Kafka) over queue for replay capability             │
│    • Partition by user ID for user-centric ordering                    │
│    • Multiple consumer groups for different use cases                  │
│    • 7-day retention for replay, then archive to HDFS                  │
│                                                                        │
│  LESSONS:                                                              │
│    • "If in doubt, log it" — capture everything, decide later          │
│    • Kafka became the "central nervous system" of LinkedIn             │
│    • Consumer groups allow adding new features without changes         │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Reference: "The Log: What every software engineer should know about real-time data's unifying abstraction" — Jay Kreps (LinkedIn, co-creator of Kafka)

7.2 Case Study: Uber — Kafka for Real-Time Matching

┌────────────────────────────────────────────────────────────────────────┐
│                    UBER'S MESSAGING ARCHITECTURE                       │
│                                                                        │
│  CHALLENGE:                                                            │
│    Match riders with drivers in real-time                              │
│    Millions of events per second                                       │
│    Strong ordering requirements per trip                               │
│                                                                        │
│  ARCHITECTURE:                                                         │
│                                                                        │
│    ┌──────────┐     ┌───────────────────────────────────┐              │
│    │ Rider    │────▶│                                   │              │
│    │   App    │     │         KAFKA                     │              │
│    └──────────┘     │                                   │              │
│                     │  Trip Events Topic                │              │
│    ┌──────────┐     │  ┌───────────────────────────┐    │              │
│    │ Driver   │────▶│  │ Key: trip_id              │    │              │
│    │   App    │     │  │ All events for same trip  │    │              │
│    └──────────┘     │  │ go to same partition      │    │              │
│                     │  └───────────────────────────┘    │              │
│    ┌──────────┐     │                                   │              │
│    │ Dispatch │────▶│  Location Updates Topic           │              │
│    │ Service  │     │  ┌───────────────────────────┐    │              │
│    └──────────┘     │  │ Key: driver_id            │    │              │
│                     │  │ 4 billion/day             │    │              │
│                     │  └───────────────────────────┘    │              │
│                     └───────────────────────────────────┘              │
│                                   │                                    │
│           ┌───────────────────────┼───────────────────────┐            │
│           ▼                       ▼                       ▼            │
│    ┌─────────────┐        ┌─────────────┐        ┌─────────────┐       │
│    │  Matching   │        │  Pricing    │        │  Analytics  │       │
│    │  Service    │        │  Service    │        │  Pipeline   │       │
│    └─────────────┘        └─────────────┘        └─────────────┘       │
│                                                                        │
│  KEY DECISIONS:                                                        │
│    • Partition by trip_id: All trip events in order                    │
│    • Consumer groups: Matching, pricing, analytics read same data      │
│    • Exactly-once via idempotent writes to database                    │
│    • Multi-region Kafka for disaster recovery                          │
│                                                                        │
│  SCALE:                                                                │
│    • Trillions of messages per day across all topics                   │
│    • < 10ms latency for trip updates                                   │
│    • 99.99% availability requirement                                   │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

7.3 Case Study: Stripe — SQS for Webhook Delivery

┌────────────────────────────────────────────────────────────────────────┐
│                    STRIPE'S WEBHOOK ARCHITECTURE                       │
│                                                                        │
│  CHALLENGE:                                                            │
│    Deliver webhooks reliably to millions of merchants                  │
│    Retry failed deliveries with exponential backoff                    │
│    Handle merchants with slow/broken endpoints                         │
│                                                                        │
│  WHY SQS (Queue) NOT Kafka (Stream)?                                   │
│                                                                        │
│    ✓ Each webhook is a "job to be done" (queue mental model)           │
│    ✓ Need per-message acknowledgment                                   │
│    ✓ Need visibility timeout for slow processing                       │
│    ✓ Built-in dead letter queue                                        │
│    ✓ Don't need replay (webhooks are delivered, not replayed)          │
│                                                                        │
│  ARCHITECTURE:                                                         │
│                                                                        │
│    ┌──────────┐     ┌─────────────────────────────────────┐            │
│    │ Payment  │     │              SQS                    │            │
│    │ Service  │────▶│  ┌─────────────────────────────┐    │            │
│    └──────────┘     │  │  Webhook Queue              │    │            │
│                     │  │  [hook][hook][hook][hook]   │    │            │
│                     │  └─────────────────────────────┘    │            │
│                     │                 │                   │            │
│                     │                 ▼                   │            │
│                     │  ┌─────────────────────────────┐    │            │
│                     │  │  Visibility Timeout: 30s    │    │            │
│                     │  │  (Message invisible while   │    │            │
│                     │  │   being processed)          │    │            │
│                     │  └─────────────────────────────┘    │            │
│                     │                 │                   │            │
│                     │         Failed? │                   │            │
│                     │                 ▼                   │            │
│                     │  ┌─────────────────────────────┐    │            │
│                     │  │  Dead Letter Queue          │    │            │
│                     │  │  (After 5 retries)          │    │            │
│                     │  └─────────────────────────────┘    │            │
│                     └─────────────────────────────────────┘            │
│                                                                        │
│  KEY DECISIONS:                                                        │
│    • SQS for simple job queue semantics                                │
│    • Visibility timeout prevents duplicate delivery                    │
│    • Dead letter queue for investigation                               │
│    • Exponential backoff in application code                           │
│                                                                        │
│  LESSONS:                                                              │
│    • Not everything needs Kafka                                        │
│    • Queue semantics fit webhook delivery perfectly                    │
│    • Managed service (SQS) reduces operational burden                  │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

7.4 Comparison: When Each Company Chose What

Company Use Case Choice Why
LinkedIn Activity tracking Kafka Replay, multiple consumers
Uber Trip events Kafka Ordering per trip, fan-out
Stripe Webhooks SQS Job queue, visibility timeout
Netflix Video encoding SQS Job distribution to workers
Shopify Order events Kafka Event sourcing, audit
Slack Message delivery Custom queue Per-workspace isolation
Segment Event routing Kafka Fan-out to 300+ destinations

Chapter 8: Common Mistakes to Avoid

8.1 Mistake 1: Using Kafka When a Queue Would Do

┌────────────────────────────────────────────────────────────────────────┐
│  ❌ WRONG: Using Kafka for simple background jobs                      │
│                                                                        │
│    "We need to process image thumbnails. Let's use Kafka!"             │
│                                                                        │
│    Problems:                                                           │
│    • Kafka complexity not needed                                       │
│    • Partition count limits parallelism                                │
│    • Operational overhead (ZooKeeper, brokers)                         │
│    • No built-in retry/DLQ support                                     │
│                                                                        │
├────────────────────────────────────────────────────────────────────────┤
│  ✅ CORRECT: Use a simple queue (Redis, SQS, Celery)                   │
│                                                                        │
│    • Jobs processed by available workers                               │
│    • Easy to scale workers up/down                                     │
│    • Built-in retry support                                            │
│    • Simpler operations                                                │
│                                                                        │
│  Rule: If you don't need replay or multiple consumers,                 │
│        a queue is probably simpler.                                    │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

8.2 Mistake 2: Forgetting Consumer Idempotency

┌────────────────────────────────────────────────────────────────────────┐
│  ❌ WRONG: Assuming exactly-once delivery                              │
│                                                                        │
│    async def process_order(message):                                   │
│        order = message.value                                           │
│        db.execute("INSERT INTO orders VALUES (...)")  # Duplicate!     │
│        send_email(order.user, "Order confirmed")       # Duplicate!    │
│                                                                        │
│    If consumer crashes and restarts, message is redelivered.           │
│    Order inserted twice. Two emails sent.                              │
│                                                                        │
├────────────────────────────────────────────────────────────────────────┤
│  ✅ CORRECT: Make consumer idempotent                                   │
│                                                                        │
│    async def process_order(message):                                   │
│        order = message.value                                           │
│        message_id = f"{message.partition}:{message.offset}"            │
│                                                                        │
│        # Check if already processed                                    │
│        if await is_processed(message_id):                              │
│            return  # Skip duplicate                                    │
│                                                                        │
│        # Process in transaction                                        │
│        async with db.transaction():                                    │
│            db.execute("INSERT INTO orders VALUES (...)")               │
│            db.execute("INSERT INTO processed_ids VALUES (...)")        │
│                                                                        │
│        # Idempotent email (check if sent)                              │
│        await send_email_if_not_sent(order.id, order.user)              │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

8.3 Mistake 3: Wrong Partition Key

┌────────────────────────────────────────────────────────────────────────┐
│  ❌ WRONG: Random or no partition key                                  │
│                                                                        │
│    # No key = round-robin to partitions                                │
│    producer.send("orders", value=order)                                │
│                                                                        │
│    Problem: Order created → Order paid → Order shipped                 │
│    These might go to different partitions!                             │
│    Consumer might process "shipped" before "created"                   │
│                                                                        │
├────────────────────────────────────────────────────────────────────────┤
│  ✅ CORRECT: Key by the entity that needs ordering                     │
│                                                                        │
│    # All events for same order go to same partition                    │
│    producer.send("orders", key=order.id, value=order)                  │
│                                                                        │
│    # OR key by user if user-level ordering needed                      │
│    producer.send("orders", key=order.user_id, value=order)             │
│                                                                        │
│  Rule: Key by whatever entity needs events processed in order          │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

8.4 Mistake 4: Too Few Partitions

┌────────────────────────────────────────────────────────────────────────┐
│  ❌ WRONG: Creating topic with 1 partition                              │
│                                                                        │
│    # Only 1 consumer can process at a time                             │
│    # Maximum throughput = 1 consumer's speed                           │
│                                                                        │
│    Partition 0: [msg][msg][msg][msg][msg][msg]...                      │
│                   ▲                                                    │
│                   │                                                    │
│              Only Consumer                                             │
│                                                                        │
├────────────────────────────────────────────────────────────────────────┤
│  ✅ CORRECT: Plan partition count for growth                           │
│                                                                        │
│    # Rule of thumb: partitions >= expected max consumers               │
│    # Can always have more partitions than consumers                    │
│    # Cannot easily reduce partitions later                             │
│                                                                        │
│    Partition 0: [msg][msg]     → Consumer A                            │
│    Partition 1: [msg][msg]     → Consumer B                            │
│    Partition 2: [msg][msg]     → Consumer C                            │
│    Partition 3: [msg][msg]     → Consumer D                            │
│    ...                                                                 │
│                                                                        │
│    Start with 6-12 partitions for new topics.                          │
│    High-volume topics: 50-100+ partitions.                             │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

8.5 Mistake Checklist

Before deploying a messaging system, verify:

  • Consumer is idempotent — Can handle duplicate messages
  • Partition key chosen correctly — Related messages stay together
  • Enough partitions — Can scale consumers as needed
  • Dead letter queue configured — Failed messages go somewhere
  • Monitoring in place — Consumer lag, error rates visible
  • Retention configured — Data kept long enough (but not forever)
  • Serialization tested — Schema evolution won't break consumers
  • Graceful shutdown — In-flight messages not lost on deploy

Part IV: Interview Preparation

Chapter 9: Interview Tips and Phrases

9.1 When to Bring Up Queues/Streams

Bring up messaging when:

  • System has components that can operate independently
  • You need to handle traffic spikes (buffer)
  • Multiple services need to react to the same event
  • You need audit/replay capabilities
  • Synchronous calls would create tight coupling

9.2 Key Phrases to Use

┌────────────────────────────────────────────────────────────────────────┐
│                    INTERVIEW PHRASES                                   │
│                                                                        │
│  When proposing async architecture:                                    │
│                                                                        │
│    "Rather than having the API call these services synchronously,      │
│     I'd publish an event to a message queue. This decouples the        │
│     services and allows us to handle spikes by buffering requests."    │
│                                                                        │
│  When choosing between queue and stream:                               │
│                                                                        │
│    "For this use case, I'd use Kafka rather than a traditional queue   │
│     because we need multiple consumer groups to process the same       │
│     events—analytics needs the data, and so does the notification      │
│     service. With a queue, once a message is consumed, it's gone."     │
│                                                                        │
│  When discussing ordering:                                             │
│                                                                        │
│    "I'd partition by user_id to ensure all events for a given user     │
│     are processed in order. Kafka guarantees ordering within a         │
│     partition, so this gives us user-level ordering while allowing     │
│     parallelism across users."                                         │
│                                                                        │
│  When discussing reliability:                                          │
│                                                                        │
│    "The consumer needs to be idempotent because Kafka provides         │
│     at-least-once delivery. I'd use the message offset as a            │
│     deduplication key, storing it in the same transaction as the       │
│     business operation."                                               │
│                                                                        │
│  When discussing scale:                                                │
│                                                                        │
│    "With 6 partitions, we can have up to 6 consumers processing        │
│     in parallel. If we need more throughput, we'd increase the         │
│     partition count, though this requires careful planning since       │
│     you can't easily reduce partitions later."                         │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

9.3 Questions to Ask Interviewer

  • "What are the ordering requirements? Do events need to be processed in a specific order?"
  • "How many consumers will need this data? Just one system, or multiple?"
  • "Do we need replay capability? If something goes wrong, can we reprocess?"
  • "What's the acceptable latency for message delivery?"
  • "How bursty is the traffic? Are there predictable spikes?"

9.4 Common Follow-up Questions

Question Good Answer
"What if Kafka is down?" "Producers would buffer locally and retry. We'd have monitoring to alert immediately. For critical paths, we might have a fallback to synchronous calls with circuit breaker."
"How do you handle poison pills?" "Messages that fail repeatedly go to a dead letter queue after N retries. We have tooling to inspect, fix, and replay them. The key is capturing why they failed."
"What about message ordering?" "Kafka guarantees ordering within a partition. I'd partition by the entity that needs ordering—like user_id or order_id. Across partitions, we'd either design for out-of-order or use timestamps."
"How do you scale consumers?" "Add more consumer instances up to the partition count. If we need more parallelism, increase partitions. We'd monitor consumer lag to know when to scale."

Chapter 10: Practice Problems

Problem 1: Order Processing System

Setup: Design an order processing system for an e-commerce platform. When a user places an order:

  1. Validate inventory
  2. Process payment
  3. Update inventory
  4. Send confirmation email
  5. Notify warehouse

Requirements:

  • 10,000 orders per minute at peak
  • Orders for same user must be processed in order
  • No order can be lost
  • Email and warehouse notification can be delayed

Questions to consider:

  1. Would you use a queue or stream? Why?
  2. How would you partition the messages?
  3. What happens if payment processing fails?
  4. How do you ensure emails aren't sent twice?
  • Think about which operations MUST happen together vs can be async
  • Consider using multiple topics for different stages
  • Payment and inventory are critical; email is not
  • Partition by user_id for user-level ordering

Architecture:

  1. API writes order to database + outbox in one transaction
  2. Outbox publisher sends to Kafka topic "orders" (key: user_id)
  3. Order processor validates inventory, processes payment
  4. On success, publishes to "orders.completed" topic
  5. Email service and warehouse service consume from "orders.completed"

Key decisions:

  • Kafka for replay and multiple consumers
  • Partition by user_id for ordering
  • Transactional outbox to avoid dual-write
  • Separate topics for different stages
  • Idempotent consumers with deduplication

Problem 2: Real-Time Analytics Pipeline

Setup: Build a system to track user activity on a website. Events include page views, clicks, searches, and purchases. The data feeds into:

  • Real-time dashboard (< 5 second delay)
  • Daily reports
  • ML recommendation engine

Requirements:

  • 100,000 events per second at peak
  • Events must not be lost
  • Multiple consumers need the same data
  • Need to replay last 7 days of data

Questions to consider:

  1. Why is this a stream (not queue) use case?
  2. How would you handle the different latency requirements?
  3. What partition strategy would you use?
  • Stream is essential: multiple consumers, replay needed
  • Consider consumer groups for each use case
  • Think about what needs ordering vs what doesn't
  • Consider data volume for partitioning decisions

Architecture:

  • Kafka with 100+ partitions for throughput
  • Partition by user_id (enables user-level analysis)
  • Three consumer groups:
    • dashboard-consumer: Aggregates in real-time
    • daily-reports-consumer: Batch processes at end of day
    • ml-consumer: Feeds recommendation model
  • 7-day retention, then archive to S3

Key decisions:

  • High partition count for parallelism
  • Consumer groups for isolation
  • Each consumer tracks its own offset
  • Replay by resetting consumer offset

Problem 3: Notification Service

Setup: Design a notification service that sends emails, SMS, and push notifications. Notifications come from various services (orders, marketing, security alerts).

Requirements:

  • Marketing can trigger 5 million notifications at once
  • Security alerts (password reset) must be delivered within 30 seconds
  • Email provider allows 1,000 sends/second
  • Must handle provider outages gracefully

Questions to consider:

  1. How do you prioritize security alerts over marketing?
  2. How do you handle the rate limit to external providers?
  3. Queue or stream? Why?
  • Consider multiple queues with different priorities
  • Rate limiting needs to happen at the consumer level
  • Queue fits better: job processing, no replay needed
  • Think about circuit breakers for provider outages

Architecture:

  • Priority queues: HIGH (security), MEDIUM (transactional), LOW (marketing)
  • Separate queues per channel (email, SMS, push)
  • Rate limiter before external provider calls
  • Circuit breaker per provider with fallback
  • Dead letter queue for failed deliveries

Key decisions:

  • Queue (not stream): job processing, no replay needed
  • Priority queues ensure security alerts not delayed
  • Rate limiting at consumer level
  • Provider abstraction allows fallback

Chapter 11: Mock Interview Dialogue

Scenario: Design an Event-Driven Order System

Interviewer: "We need to design an order processing system. When a user places an order, several things need to happen: validate inventory, charge payment, reserve inventory, send confirmation email, and notify the warehouse. How would you approach this?"

You: "Great question. Before I dive into the design, let me make sure I understand the requirements.

First, what's the expected scale? How many orders per second?"

Interviewer: "Let's say 1,000 orders per second at peak, with average around 100."

You: "And for reliability—is it acceptable to lose orders, or must every order be processed?"

Interviewer: "No order loss is acceptable. Payments are involved."

You: "Got it. Last question: do all these operations need to happen synchronously, or can some be delayed? For example, must the email be sent before we return a response to the user?"

Interviewer: "The user just needs to know their order was received. Email can be delayed by a few seconds, same with warehouse notification."

You: "Perfect. Based on these requirements, I'd use an event-driven architecture with a message stream—specifically Kafka. Let me draw this out.

┌────────────────────────────────────────────────────────────────────────┐
│                    PROPOSED ARCHITECTURE                               │
│                                                                        │
│    User ──► API ──► Database + Outbox (single transaction)             │
│                           │                                            │
│                           ▼                                            │
│                    ┌─────────────┐                                     │
│                    │   Kafka     │                                     │
│                    │  "orders"   │                                     │
│                    └──────┬──────┘                                     │
│                           │                                            │
│           ┌───────────────┼───────────────┐                            │
│           ▼               ▼               ▼                            │
│    ┌─────────────┐ ┌─────────────┐ ┌─────────────┐                     │
│    │   Order     │ │   Email     │ │  Warehouse  │                     │
│    │  Processor  │ │   Service   │ │   Service   │                     │
│    └──────┬──────┘ └─────────────┘ └─────────────┘                     │
│           │                                                            │
│           ▼                                                            │
│    Validate → Payment → Reserve Inventory                              │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Here's why I chose this approach:

  1. Transactional outbox for the API: When the user submits an order, we write to our orders table AND an outbox table in a single database transaction. This avoids the dual-write problem where we might save to the database but fail to publish.

  2. Kafka over a simple queue because multiple services need the same events—order processor, email, warehouse. With Kafka consumer groups, each service gets every message.

  3. Partition by user_id so all orders from the same user go to the same partition. This ensures ordering per user."

Interviewer: "What happens if the payment processing fails?"

You: "Good question. The order processor would implement a saga pattern with compensation.

If payment fails:

  1. The order is marked as 'payment_failed' in the database
  2. Any reserved inventory is released (compensation)
  3. User is notified of the failure

For transient failures—like the payment service timing out—we'd retry with exponential backoff. After 3 retries, the message goes to a dead letter queue for manual investigation.

The key is that each step must be idempotent. If we retry payment and it already succeeded, the payment service should recognize the idempotency key and return the existing result rather than charging twice."

Interviewer: "How do you ensure the email isn't sent multiple times?"

You: "Two approaches:

First, the email service consumer is idempotent. It tracks which order IDs have been emailed using a simple database table. Before sending, it checks if a confirmation was already sent.

Second, the email service provider probably supports idempotency keys. We'd include the order ID in that key so even if we call the API twice, only one email goes out.

This is important because Kafka provides at-least-once delivery. Messages can be redelivered if a consumer crashes after processing but before committing the offset."

Interviewer: "How would you handle scaling this system?"

You: "A few dimensions to scale:

  1. More partitions: Currently with 10 partitions, we can have 10 consumers per consumer group processing in parallel. At 1,000 orders/second, that's 100/second per consumer—very manageable. If we need more, we'd increase partition count.

  2. More consumer instances: Within each consumer group, we can add instances up to the partition count. Kubernetes would auto-scale based on consumer lag metrics.

  3. Separate topics by priority: If we later have high-priority orders (like same-day delivery), we might route those to a separate topic with more consumers.

The nice thing about this architecture is that each component scales independently. Email service slow? Add more email consumers. They don't affect order processing."

Interviewer: "Great. What metrics would you monitor?"

You: "Critical metrics:

  1. Consumer lag — Are we falling behind? If lag grows, we need more consumers.
  2. Processing latency — p99 of time from message publish to processing complete.
  3. Error rate — What percentage of messages fail and go to DLQ?
  4. DLQ depth — Growing DLQ means something systematic is wrong.

I'd have alerts on all of these. Consumer lag above 10,000 messages would page someone. DLQ growth of more than 100 messages in 5 minutes would alert immediately."


Summary

┌────────────────────────────────────────────────────────────────────────┐
│                    DAY 1 KEY TAKEAWAYS                                 │
│                                                                        │
│  CORE CONCEPT:                                                         │
│  • QUEUE = Work to be done (consumed and deleted)                      │
│  • STREAM = Events that happened (retained, multiple readers)          │
│  • Choose based on whether you need replay and multiple consumers      │
│                                                                        │
│  PATTERNS:                                                             │
│  • Point-to-Point: One consumer per message (job queues)               │
│  • Pub-Sub: All subscribers get all messages (notifications)           │
│  • Consumer Groups: Fan-out + load balance (Kafka's model)             │
│                                                                        │
│  IMPLEMENTATION:                                                       │
│  • Producers need acks=all for durability                              │
│  • Consumers must be idempotent (at-least-once delivery)               │
│  • Partition key determines ordering and parallelism                   │
│  • Dead letter queues for failed messages                              │
│                                                                        │
│  TRADE-OFFS:                                                           │
│  • Queue: Simpler, but no replay. Use for job processing.              │
│  • Stream: Replay + fan-out, but more complex. Use for events.         │
│                                                                        │
│  INTERVIEW TIPS:                                                       │
│  • Always ask about ordering requirements                              │
│  • Clarify if multiple systems need the same data                      │
│  • Mention idempotency when discussing reliability                     │
│                                                                        │
│  DEFAULT CHOICE:                                                       │
│  • Uncertain? Start with a queue (simpler)                             │
│  • Need replay or multiple consumers? Use a stream                     │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

📚 Further Reading

Official Documentation

Engineering Blogs

Books

  • "Designing Data-Intensive Applications" by Martin Kleppmann — Chapter 11 covers stream processing
  • "Kafka: The Definitive Guide" by Neha Narkhede — Comprehensive Kafka reference
  • "Enterprise Integration Patterns" by Hohpe & Woolf — Classic patterns book

Papers

  • "Kafka: a Distributed Messaging System for Log Processing" — Original Kafka paper from LinkedIn

End of Day 1: Queue vs Stream

Tomorrow: Day 2 — Transactional Outbox. We saw today that publishing to a queue after writing to a database is dangerous (dual-write problem). Tomorrow we'll solve this properly with the transactional outbox pattern, ensuring no message is ever lost between your database and message broker.