Week 3 — Day 1: Queue vs Stream
System Design Mastery Series
Preface
It's 3 AM, and your phone buzzes with a PagerDuty alert. Your e-commerce platform's order processing system is down. Orders are being lost. Customer payments are going through, but confirmations aren't being sent. The support queue is exploding.
You SSH into the server and find the culprit: your synchronous order processing chain has collapsed. The email service is slow, causing timeouts that cascade back through payment processing, inventory updates, and order creation. One slow service brought down everything.
Last week, we learned about building resilient systems—timeouts, retries, circuit breakers, idempotency. But we left a dangerous hole:
┌────────────────────────────────────────────────────────────────────────┐
│ THE SYNCHRONOUS TRAP │
│ │
│ User Request │
│ │ │
│ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Create │───▶│ Reserve │───▶│ Charge │───▶│ Send │ │
│ │ Order │ │Inventory│ │ Payment │ │ Email │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ ▼ │
│ │ │ │ ┌─────────┐ │
│ │ │ │ │ SLOW! │ │
│ │ │ │ │ (5 sec) │ │
│ │ │ │ └─────────┘ │
│ │ │ │ │ │
│ │ │ ◀──────────────┘ │
│ │ ◀───────────────── Timeout propagates back │
│ ◀──────────────────────────────── │
│ │ │
│ ▼ │
│ User sees: "Error processing your order" │
│ Reality: Payment charged, but order not created │
│ │
│ Question: What went wrong? │
│ Answer: Tight coupling. Every service waits for the next. │
│ │
└────────────────────────────────────────────────────────────────────────┘
This is the problem asynchronous messaging solves.
Today, we'll learn the fundamental building blocks of async communication: queues and streams. By the end of this session, you'll know exactly when to reach for Kafka, when RabbitMQ is the right choice, and why "just use Kafka" is often the wrong answer.
Part I: Foundations
Chapter 1: What Are Queues and Streams?
1.1 The Simple Definition
A message queue is a buffer that holds messages between a sender and receiver, allowing them to operate independently.
A stream (or log) is an append-only sequence of messages that multiple readers can consume at their own pace.
┌────────────────────────────────────────────────────────────────────────┐
│ EVERYDAY ANALOGIES │
│ │
│ QUEUE = Post Office Box │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 📬 You drop off a letter │ │
│ │ 📭 Recipient picks it up │ │
│ │ 🗑️ Letter is gone after pickup │ │
│ │ │ │
│ │ • One recipient per letter │ │
│ │ • Letter disappears after reading │ │
│ │ • Can't re-read the letter │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ STREAM = Newspaper Archive │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 📰 Publisher adds today's paper to the archive │ │
│ │ 👀 Anyone can read any past edition │ │
│ │ 📚 Papers stay in the archive (until cleanup policy) │ │
│ │ │ │
│ │ • Multiple readers, same content │ │
│ │ • Content persists after reading │ │
│ │ • Can re-read any past edition │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────┘
1.2 Why Distributed Systems Need Messaging
In a distributed system, services need to communicate. There are two fundamental approaches:
Synchronous Communication (HTTP/RPC):
- Service A calls Service B and waits for a response
- Simple to understand and implement
- Creates tight coupling between services
- Cascading failures when downstream services are slow
Asynchronous Communication (Messaging):
- Service A sends a message and continues immediately
- Services are decoupled in time and availability
- More complex to implement correctly
- Resilient to downstream failures
┌────────────────────────────────────────────────────────────────────────┐
│ SYNCHRONOUS vs ASYNCHRONOUS │
│ │
│ SYNCHRONOUS (HTTP/RPC) │
│ │
│ Service A Service B │
│ ┌───────┐ ┌───────┐ │
│ │ │ ──── Request ────▶ │ │ │
│ │ │ │ │ Processing... │
│ │ WAIT │ │ │ ... │
│ │ │ ◀─── Response ──── │ │ │
│ │ │ │ │ │
│ └───────┘ └───────┘ │
│ │
│ Total time = A's time + B's time + network │
│ If B is down, A fails immediately │
│ │
│ ASYNCHRONOUS (Messaging) │
│ │
│ Service A Queue/Stream Service B │
│ ┌───────┐ ┌─────────┐ ┌───────┐ │
│ │ │ ────▶ │ ░░░░░░░ │ │ │ │
│ │ DONE! │ │ ░░░░░░░ │ ────▶ │ │ │
│ │ │ │ ░░░░░░░ │ │ │ │
│ └───────┘ └─────────┘ └───────┘ │
│ │
│ A completes immediately after sending │
│ If B is down, messages queue up and wait │
│ │
└────────────────────────────────────────────────────────────────────────┘
1.3 The Formal Definitions
Let's be precise about terminology:
| Term | Definition |
|---|---|
| Producer | Service that sends messages |
| Consumer | Service that receives and processes messages |
| Broker | The messaging infrastructure (Kafka, RabbitMQ, etc.) |
| Topic/Queue | Named destination for messages |
| Partition | Subdivision of a topic for parallelism (Kafka) |
| Consumer Group | Set of consumers that share message processing |
| Offset | Position in a stream (how far you've read) |
| Acknowledgment (Ack) | Confirmation that a message was processed |
1.4 The Two Mental Models
This is the most important concept of the day. There are two fundamentally different mental models:
┌────────────────────────────────────────────────────────────────────────┐
│ TWO MENTAL MODELS │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ MODEL 1: QUEUE (Job Distribution) │
│ ═══════════════════════════════ │
│ │
│ Think: "Work to be done" │
│ │
│ Producer ───▶ [Task][Task][Task][Task] ───▶ Worker │
│ │
│ • Each task goes to exactly ONE worker │
│ • Task is REMOVED after processing │
│ • Workers compete for tasks │
│ • No history—once done, it's gone │
│ │
│ Examples: RabbitMQ, SQS, Redis LPUSH/BRPOP, Celery │
│ │
│ Use cases: │
│ - Background job processing │
│ - Task distribution across workers │
│ - Request buffering during traffic spikes │
│ │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ MODEL 2: STREAM/LOG (Event History) │
│ ══════════════════════════════════ │
│ │
│ Think: "Things that happened" │
│ │
│ Producer ───▶ [Event][Event][Event][Event][Event]... │
│ ▲ ▲ ▲ │
│ │ │ │ │
│ Reader A Reader B Reader C │
│ (offset 1) (offset 3) (offset 2) │
│ │
│ • Events are RETAINED (not removed after reading) │
│ • Multiple readers can read the same events │
│ • Each reader tracks their own position (offset) │
│ • Full history available for replay │
│ │
│ Examples: Kafka, Kinesis, Redpanda, Pulsar, Redis Streams │
│ │
│ Use cases: │
│ - Event sourcing and audit logs │
│ - Real-time analytics │
│ - Multi-consumer fan-out │
│ - Replay for debugging or reprocessing │
│ │
└────────────────────────────────────────────────────────────────────────┘
The key insight: Queues are about work distribution. Streams are about event history.
Chapter 2: Messaging Patterns
2.1 Pattern 1: Point-to-Point (Queue)
In point-to-point messaging, each message is delivered to exactly one consumer.
┌────────────────────────────────────────────────────────────────────────┐
│ POINT-TO-POINT PATTERN │
│ │
│ ┌────────────────┐ │
│ │ QUEUE │ │
│ Producer ──────────▶ │ [M1][M2][M3] │ │
│ └───────┬────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │Consumer│ │Consumer│ │Consumer│ │
│ │ A │ │ B │ │ C │ │
│ └────────┘ └────────┘ └────────┘ │
│ │
│ M1 → Consumer A (removed from queue) │
│ M2 → Consumer B (removed from queue) │
│ M3 → Consumer C (removed from queue) │
│ │
│ Each message is processed by exactly ONE consumer. │
│ Consumers compete for messages. │
│ │
└────────────────────────────────────────────────────────────────────────┘
How it works:
- Producer sends message to queue
- Broker holds message until a consumer is available
- One consumer receives and processes the message
- Consumer acknowledges completion
- Message is removed from queue
Pros:
- Simple mental model
- Natural load balancing across consumers
- Easy to scale by adding consumers
- Built-in work distribution
Cons:
- Message is gone after processing (no replay)
- Only one consumer per message
- Harder to add new consumers that need historical data
Use when:
- You have background jobs to process
- Work needs to be distributed across workers
- You don't need to replay or re-read messages
2.2 Pattern 2: Publish-Subscribe (Fan-Out)
In pub-sub, each message is delivered to ALL subscribers.
┌────────────────────────────────────────────────────────────────────────┐
│ PUBLISH-SUBSCRIBE PATTERN │
│ │
│ ┌────────────────┐ │
│ │ TOPIC │ │
│ Publisher ─────────▶ │ [Message] │ │
│ └───────┬────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │Subscr. │ │Subscr. │ │Subscr. │ │
│ │ A │ │ B │ │ C │ │
│ │(Email) │ │(Mobile)│ │(Analytics) │
│ └────────┘ └────────┘ └────────┘ │
│ │
│ Same message delivered to ALL subscribers. │
│ Each subscriber gets a copy. │
│ │
└────────────────────────────────────────────────────────────────────────┘
How it works:
- Publisher sends message to topic
- Each subscriber receives a copy of the message
- Subscribers process independently
- Message may or may not be retained (depends on implementation)
Pros:
- Easy to add new subscribers
- Decouples publishers from subscribers
- Same event can trigger multiple actions
Cons:
- All subscribers must handle the load
- Can't easily distribute work within a subscriber type
Use when:
- Multiple systems need to react to the same event
- You're building event-driven architecture
- Adding new capabilities shouldn't require changing existing systems
2.3 Pattern 3: Consumer Groups (Partitioned Stream)
Consumer groups combine the best of both: fan-out ACROSS groups, load balancing WITHIN groups.
┌────────────────────────────────────────────────────────────────────────┐
│ CONSUMER GROUPS PATTERN │
│ │
│ This is Kafka's killer feature. │
│ │
│ ┌─────────────────────────────────────┐ │
│ │ TOPIC │ │
│ Producer ─────▶ │ Partition 0: [M1][M4][M7] │ │
│ │ Partition 1: [M2][M5][M8] │ │
│ │ Partition 2: [M3][M6][M9] │ │
│ └──────────────┬──────────────────────┘ │
│ │ │
│ ┌────────────────────────┼────────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Consumer Group A│ │ Consumer Group B│ │ Consumer Group C│ │
│ │ (Orders) │ │ (Analytics) │ │ (Audit) │ │
│ │ │ │ │ │ │ │
│ │ C1 C2 C3 │ │ C1 C2 │ │ C1 │ │
│ │ ▲ ▲ ▲ │ │ ▲ ▲ │ │ ▲ │ │
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ P0 P1 P2 │ │ P0,1 P2 │ │ P0,P1,P2 │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
│ Each GROUP gets ALL messages (fan-out) │
│ Within a group, messages are DIVIDED (load balance) │
│ Each partition goes to exactly one consumer per group │
│ │
└────────────────────────────────────────────────────────────────────────┘
How it works:
- Topic is divided into partitions
- Each consumer group gets all messages
- Within a group, each partition is assigned to one consumer
- Messages within a partition are processed in order
- Parallelism = number of partitions
Pros:
- Fan-out to multiple consumer groups
- Load balancing within each group
- Ordering guaranteed within partition
- Replay possible (rewind offset)
Cons:
- More complex to set up and operate
- Partition count limits parallelism
- Rebalancing can cause delays
Use when:
- Multiple systems need the same events
- You need both fan-out AND parallel processing
- Ordering within a key (e.g., per user) matters
2.4 Pattern Comparison
| Aspect | Point-to-Point | Pub-Sub | Consumer Groups |
|---|---|---|---|
| Message delivery | One consumer | All subscribers | All groups, one per group |
| Load balancing | Automatic | None | Within group |
| Ordering | FIFO (usually) | Per subscriber | Per partition |
| Replay | No | Depends | Yes |
| Use case | Job queue | Event notification | Event streaming |
| Example | RabbitMQ queue | SNS topic | Kafka topic |
Chapter 3: Trade-offs and Considerations
3.1 Trade-off Matrix
┌────────────────────────────────────────────────────────────────────────┐
│ QUEUE VS STREAM TRADE-OFFS │
│ │
│ QUEUE (RabbitMQ, SQS): │
│ ├── ✓ Simpler mental model │
│ ├── ✓ Message-level acknowledgment │
│ ├── ✓ Built-in retry and dead-letter support │
│ ├── ✓ Flexible routing (exchanges, bindings) │
│ ├── ✗ No replay capability │
│ ├── ✗ Single consumer per message │
│ ├── ✗ Message loss if broker crashes before persist │
│ └── Best for: Job queues, task distribution, RPC patterns │
│ │
│ STREAM (Kafka, Kinesis): │
│ ├── ✓ Replay capability (rewind offset) │
│ ├── ✓ Multiple consumer groups │
│ ├── ✓ High throughput (batching, zero-copy) │
│ ├── ✓ Durable storage (configurable retention) │
│ ├── ✗ More complex operations │
│ ├── ✗ Partition count limits parallelism │
│ ├── ✗ Consumer must track offset │
│ └── Best for: Event sourcing, analytics, multi-consumer fan-out │
│ │
└────────────────────────────────────────────────────────────────────────┘
3.2 Delivery Guarantees
This is where things get nuanced. What happens if something fails?
┌────────────────────────────────────────────────────────────────────────┐
│ DELIVERY GUARANTEES │
│ │
│ AT-MOST-ONCE │
│ ════════════ │
│ • Message may be lost │
│ • Message will NOT be duplicated │
│ • Implementation: Fire and forget, no ack │
│ • Use when: Loss acceptable (metrics, logs, non-critical) │
│ │
│ Producer ──▶ Broker ──▶ Consumer │
│ │ │ │
│ │ ┌─────┴─────┐ │
│ │ │ Crash! │ Message lost │
│ │ └───────────┘ │
│ │
│ AT-LEAST-ONCE │
│ ══════════════ │
│ • Message will NOT be lost │
│ • Message MAY be duplicated │
│ • Implementation: Ack after processing, retry on failure │
│ • Use when: Loss unacceptable, can handle duplicates │
│ │
│ Producer ──▶ Broker ──▶ Consumer │
│ │ │ │
│ │ ┌─────┴─────┐ │
│ │ │ Processed │ │
│ │ │ but ack │ Retry = duplicate │
│ │ │ lost │ │
│ │ └───────────┘ │
│ │
│ EXACTLY-ONCE │
│ ═════════════ │
│ • Message will NOT be lost │
│ • Message will NOT be duplicated │
│ • Implementation: Transactional processing + deduplication │
│ • Reality: Very hard. Usually "effectively once" via idempotency. │
│ │
│ This is often a LIE or requires special support from both │
│ the broker AND the consumer. Kafka 0.11+ supports it with │
│ transactions, but your consumer must participate. │
│ │
└────────────────────────────────────────────────────────────────────────┘
The practical reality:
Most systems use at-least-once delivery with idempotent consumers. This is what you learned in Week 2, Day 2 (Idempotency). The broker guarantees delivery, and your consumer handles duplicates.
3.3 Ordering Guarantees
When do messages arrive in order?
┌────────────────────────────────────────────────────────────────────────┐
│ ORDERING GUARANTEES │
│ │
│ GLOBAL ORDERING (All messages in order) │
│ ═══════════════ │
│ │
│ [M1] → [M2] → [M3] → [M4] → [M5] │
│ │
│ • Single partition/queue │
│ • Single consumer │
│ • No parallelism possible │
│ • Throughput limited │
│ │
│ PARTITION ORDERING (Order within partition) │
│ ══════════════════ │
│ │
│ Partition 0: [M1] → [M3] → [M5] (User A's messages in order) │
│ Partition 1: [M2] → [M4] → [M6] (User B's messages in order) │
│ │
│ • Messages with same key go to same partition │
│ • Order guaranteed within partition │
│ • Parallelism = number of partitions │
│ • This is Kafka's model │
│ │
│ NO ORDERING │
│ ═══════════ │
│ │
│ Messages processed in any order │
│ │
│ • Maximum parallelism │
│ • Consumer must handle out-of-order │
│ • Simpler infrastructure │
│ │
└────────────────────────────────────────────────────────────────────────┘
Common pattern: Use partition key to ensure related messages stay together.
# Example: Order events for same user stay in order
producer.send(
topic="orders",
key=user_id, # Partition key
value=order_event
)
# All events for user_123 go to same partition
# Processed in order by one consumer
3.4 Decision Framework
When choosing between queue and stream:
┌────────────────────────────────────────────────────────────────────────┐
│ DECISION FRAMEWORK │
│ │
│ START HERE │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ Do you need to replay messages? │ │
│ └─────────────────┬───────────────────────┘ │
│ │ │
│ YES ───────┼─────── NO │
│ │ │ │ │
│ ▼ │ ▼ │
│ STREAM │ ┌─────────────────────────────────────────┐ │
│ │ │ Do multiple systems need the same data? │ │
│ │ └─────────────────┬───────────────────────┘ │
│ │ │ │
│ │ YES ───────┼─────── NO │
│ │ │ │ │ │
│ │ ▼ │ ▼ │
│ │ STREAM │ ┌────────────────────┐ │
│ │ │ │ Is this a job/task │ │
│ │ │ │ to be processed? │ │
│ │ │ └─────────┬──────────┘ │
│ │ │ │ │
│ │ │ YES ───────┼─────── NO │
│ │ │ │ │ │ │
│ │ │ ▼ │ ▼ │
│ │ │ QUEUE │ STREAM │
│ │ │ │ │
│ │ │ │ │
│ │ │ │ │
└────────────────────────────────────────────────────────────────────────┘
SUMMARY:
────────
• Need replay? → Stream
• Multiple consumers need same data? → Stream
• Task/job processing? → Queue
• Event notification? → Stream (or pub-sub)
• Uncertain? → Start with queue (simpler), migrate if needed
Part II: Implementation
Chapter 4: Basic Implementation
4.1 Simple Queue with Redis
Let's start with the simplest possible queue using Redis. This is educational—not production-ready.
# Basic Queue Implementation with Redis
# WARNING: Not production-ready - for learning only
import redis
import json
from typing import Any, Optional
class SimpleRedisQueue:
"""
Basic queue using Redis lists.
Messages are pushed to the right, popped from the left (FIFO).
"""
def __init__(self, redis_url: str, queue_name: str):
self.redis = redis.from_url(redis_url)
self.queue_name = queue_name
def publish(self, message: dict) -> None:
"""
Add a message to the queue.
"""
serialized = json.dumps(message)
self.redis.rpush(self.queue_name, serialized)
def consume(self, timeout: int = 0) -> Optional[dict]:
"""
Get and remove a message from the queue.
Blocks until a message is available or timeout.
Args:
timeout: Seconds to wait (0 = wait forever)
Returns:
Message dict or None if timeout
"""
# BLPOP blocks until message available
result = self.redis.blpop(self.queue_name, timeout=timeout)
if result is None:
return None
_, message = result
return json.loads(message)
# Usage example
def producer_example():
queue = SimpleRedisQueue("redis://localhost:6379", "orders")
# Publish some orders
queue.publish({"order_id": "123", "user": "alice", "amount": 99.99})
queue.publish({"order_id": "124", "user": "bob", "amount": 49.99})
print("Published 2 orders")
def consumer_example():
queue = SimpleRedisQueue("redis://localhost:6379", "orders")
while True:
message = queue.consume(timeout=5)
if message is None:
print("No messages, waiting...")
continue
print(f"Processing order: {message['order_id']}")
# Process the order...
# NOTE: If we crash here, message is LOST!
# No acknowledgment, no retry.
What's wrong with this implementation?
- No acknowledgment: If consumer crashes after
BLPOPbut before processing, message is lost - No retry: Failed messages disappear forever
- No dead letter queue: No way to handle poison pills
- No visibility timeout: Can't handle slow consumers
4.2 Simple Stream with Kafka (Conceptual)
Here's a conceptual Kafka producer/consumer:
# Basic Kafka Producer/Consumer
# WARNING: Not production-ready - for learning only
from kafka import KafkaProducer, KafkaConsumer
import json
class SimpleKafkaProducer:
"""
Basic Kafka producer.
"""
def __init__(self, bootstrap_servers: str):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
def publish(self, topic: str, message: dict, key: str = None) -> None:
"""
Send message to topic.
Key determines which partition receives the message.
"""
self.producer.send(topic, value=message, key=key)
self.producer.flush() # Wait for send to complete
class SimpleKafkaConsumer:
"""
Basic Kafka consumer.
"""
def __init__(self, bootstrap_servers: str, topic: str, group_id: str):
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset='earliest', # Start from beginning if new
enable_auto_commit=True, # Automatically commit offsets
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
def consume(self):
"""
Yield messages from the topic.
"""
for message in self.consumer:
yield {
'topic': message.topic,
'partition': message.partition,
'offset': message.offset,
'key': message.key.decode('utf-8') if message.key else None,
'value': message.value
}
# Usage example
def kafka_producer_example():
producer = SimpleKafkaProducer("localhost:9092")
# Messages with same key go to same partition (ordered)
producer.publish("orders", {"order_id": "123", "action": "created"}, key="user_alice")
producer.publish("orders", {"order_id": "123", "action": "paid"}, key="user_alice")
producer.publish("orders", {"order_id": "124", "action": "created"}, key="user_bob")
def kafka_consumer_example():
consumer = SimpleKafkaConsumer(
"localhost:9092",
"orders",
group_id="order-processor"
)
for message in consumer.consume():
print(f"Partition {message['partition']}, Offset {message['offset']}")
print(f"Key: {message['key']}, Value: {message['value']}")
# Process the message...
# NOTE: With auto_commit=True, offset is committed automatically
# If we crash, we might reprocess (at-least-once)
4.3 Understanding the Flow
┌────────────────────────────────────────────────────────────────────────┐
│ KAFKA MESSAGE FLOW │
│ │
│ Step 1: Producer sends with key │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ producer.send("orders", key="user_123", value={...}) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Step 2: Key is hashed to determine partition │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ partition = hash("user_123") % num_partitions │ │
│ │ partition = 2847293847 % 6 = 3 │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Step 3: Message appended to partition log │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Partition 3: [...][msg][msg][NEW MSG] │ │
│ │ ▲ │ │
│ │ offset 847 │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Step 4: Consumer group assigns partitions │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Consumer Group "order-processor" │ │
│ │ Consumer A: Partitions 0, 1 │ │
│ │ Consumer B: Partitions 2, 3 ◄── Gets partition 3 │ │
│ │ Consumer C: Partitions 4, 5 │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Step 5: Consumer reads and commits offset │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Consumer B reads offset 847 │ │
│ │ Processes message │ │
│ │ Commits offset 848 (next to read) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────┘
Chapter 5: Production-Ready Implementation
5.1 Requirements for Production
Our production implementation needs:
- Reliable delivery — No message loss
- Acknowledgment — Confirm processing before removing
- Retry with backoff — Handle transient failures
- Dead letter queue — Handle permanent failures
- Monitoring — Track lag, errors, throughput
- Graceful shutdown — Don't lose in-flight messages
5.2 Production Kafka Consumer
# Production-Ready Kafka Consumer
# With proper error handling, retries, and monitoring
import asyncio
from dataclasses import dataclass, field
from typing import Callable, Optional, Dict, Any, List
from datetime import datetime, timedelta
from enum import Enum
import logging
import json
from contextlib import asynccontextmanager
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.errors import KafkaError
logger = logging.getLogger(__name__)
class ProcessingResult(Enum):
SUCCESS = "success"
RETRY = "retry" # Transient failure, retry later
DEAD_LETTER = "dead_letter" # Permanent failure, send to DLQ
@dataclass
class ConsumerConfig:
"""Configuration for Kafka consumer."""
bootstrap_servers: str
topic: str
group_id: str
# Retry configuration
max_retries: int = 3
retry_backoff_ms: int = 1000
max_retry_backoff_ms: int = 30000
# Consumer configuration
auto_offset_reset: str = "earliest"
enable_auto_commit: bool = False # Manual commit for reliability
max_poll_records: int = 100
session_timeout_ms: int = 30000
# Dead letter topic
dead_letter_topic: Optional[str] = None
@dataclass
class MessageContext:
"""Context for message processing."""
topic: str
partition: int
offset: int
key: Optional[str]
value: Dict[str, Any]
timestamp: datetime
headers: Dict[str, str] = field(default_factory=dict)
retry_count: int = 0
class KafkaConsumerService:
"""
Production-ready Kafka consumer with:
- Manual offset commits (at-least-once delivery)
- Retry with exponential backoff
- Dead letter queue for failed messages
- Graceful shutdown
- Metrics and logging
"""
def __init__(
self,
config: ConsumerConfig,
message_handler: Callable[[MessageContext], ProcessingResult]
):
self.config = config
self.message_handler = message_handler
self.consumer: Optional[AIOKafkaConsumer] = None
self.dlq_producer: Optional[AIOKafkaProducer] = None
self._running = False
self._shutdown_event = asyncio.Event()
# Metrics
self.messages_processed = 0
self.messages_failed = 0
self.messages_to_dlq = 0
async def start(self) -> None:
"""Start the consumer."""
logger.info(f"Starting consumer for topic {self.config.topic}")
self.consumer = AIOKafkaConsumer(
self.config.topic,
bootstrap_servers=self.config.bootstrap_servers,
group_id=self.config.group_id,
auto_offset_reset=self.config.auto_offset_reset,
enable_auto_commit=self.config.enable_auto_commit,
max_poll_records=self.config.max_poll_records,
session_timeout_ms=self.config.session_timeout_ms,
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None
)
await self.consumer.start()
# Start DLQ producer if configured
if self.config.dead_letter_topic:
self.dlq_producer = AIOKafkaProducer(
bootstrap_servers=self.config.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
await self.dlq_producer.start()
self._running = True
logger.info(f"Consumer started for topic {self.config.topic}")
async def stop(self) -> None:
"""Gracefully stop the consumer."""
logger.info("Stopping consumer...")
self._running = False
self._shutdown_event.set()
if self.consumer:
await self.consumer.stop()
if self.dlq_producer:
await self.dlq_producer.stop()
logger.info(f"Consumer stopped. Processed: {self.messages_processed}, "
f"Failed: {self.messages_failed}, DLQ: {self.messages_to_dlq}")
async def run(self) -> None:
"""Main consumer loop."""
try:
async for message in self.consumer:
if not self._running:
break
ctx = MessageContext(
topic=message.topic,
partition=message.partition,
offset=message.offset,
key=message.key,
value=message.value,
timestamp=datetime.fromtimestamp(message.timestamp / 1000),
headers={k: v.decode('utf-8') for k, v in (message.headers or [])},
retry_count=self._get_retry_count(message.headers)
)
await self._process_message(ctx)
except asyncio.CancelledError:
logger.info("Consumer loop cancelled")
except Exception as e:
logger.error(f"Consumer loop error: {e}", exc_info=True)
raise
async def _process_message(self, ctx: MessageContext) -> None:
"""Process a single message with retry logic."""
try:
result = await self._execute_handler(ctx)
if result == ProcessingResult.SUCCESS:
# Commit offset on success
await self.consumer.commit()
self.messages_processed += 1
logger.debug(f"Processed message at offset {ctx.offset}")
elif result == ProcessingResult.RETRY:
await self._handle_retry(ctx)
elif result == ProcessingResult.DEAD_LETTER:
await self._send_to_dlq(ctx, "Handler returned DEAD_LETTER")
await self.consumer.commit()
except Exception as e:
logger.error(f"Error processing message: {e}", exc_info=True)
await self._handle_retry(ctx)
async def _execute_handler(self, ctx: MessageContext) -> ProcessingResult:
"""Execute the message handler with error handling."""
try:
# Handler might be sync or async
result = self.message_handler(ctx)
if asyncio.iscoroutine(result):
result = await result
return result
except Exception as e:
logger.warning(f"Handler raised exception: {e}")
return ProcessingResult.RETRY
async def _handle_retry(self, ctx: MessageContext) -> None:
"""Handle message retry with exponential backoff."""
if ctx.retry_count >= self.config.max_retries:
logger.warning(f"Max retries ({self.config.max_retries}) exceeded "
f"for message at offset {ctx.offset}")
await self._send_to_dlq(ctx, f"Max retries exceeded after {ctx.retry_count} attempts")
await self.consumer.commit()
return
# Calculate backoff
backoff_ms = min(
self.config.retry_backoff_ms * (2 ** ctx.retry_count),
self.config.max_retry_backoff_ms
)
logger.info(f"Retrying message at offset {ctx.offset} "
f"(attempt {ctx.retry_count + 1}/{self.config.max_retries}) "
f"after {backoff_ms}ms")
await asyncio.sleep(backoff_ms / 1000)
# Re-process with incremented retry count
ctx.retry_count += 1
await self._process_message(ctx)
async def _send_to_dlq(self, ctx: MessageContext, reason: str) -> None:
"""Send failed message to dead letter queue."""
if not self.dlq_producer or not self.config.dead_letter_topic:
logger.warning(f"No DLQ configured, dropping message at offset {ctx.offset}")
self.messages_failed += 1
return
dlq_message = {
"original_topic": ctx.topic,
"original_partition": ctx.partition,
"original_offset": ctx.offset,
"original_key": ctx.key,
"original_value": ctx.value,
"original_timestamp": ctx.timestamp.isoformat(),
"failure_reason": reason,
"retry_count": ctx.retry_count,
"failed_at": datetime.utcnow().isoformat()
}
await self.dlq_producer.send_and_wait(
self.config.dead_letter_topic,
value=dlq_message,
key=ctx.key
)
self.messages_to_dlq += 1
logger.info(f"Sent message at offset {ctx.offset} to DLQ: {reason}")
def _get_retry_count(self, headers: Optional[List]) -> int:
"""Extract retry count from message headers."""
if not headers:
return 0
for key, value in headers:
if key == "retry_count":
return int(value.decode('utf-8'))
return 0
# =============================================================================
# Example Usage
# =============================================================================
async def order_handler(ctx: MessageContext) -> ProcessingResult:
"""Example message handler for order events."""
try:
order = ctx.value
logger.info(f"Processing order {order.get('order_id')} "
f"for user {ctx.key}")
# Simulate processing
if order.get("amount", 0) < 0:
# Invalid order - send to DLQ
logger.warning(f"Invalid order amount: {order.get('amount')}")
return ProcessingResult.DEAD_LETTER
if order.get("action") == "fail_temporary":
# Simulate transient failure
return ProcessingResult.RETRY
# Process the order...
await asyncio.sleep(0.1) # Simulate work
return ProcessingResult.SUCCESS
except KeyError as e:
# Missing required field - permanent failure
logger.error(f"Missing required field: {e}")
return ProcessingResult.DEAD_LETTER
except Exception as e:
# Unknown error - retry
logger.error(f"Processing error: {e}")
return ProcessingResult.RETRY
async def main():
"""Example main function."""
config = ConsumerConfig(
bootstrap_servers="localhost:9092",
topic="orders",
group_id="order-processor",
dead_letter_topic="orders-dlq",
max_retries=3,
retry_backoff_ms=1000
)
consumer = KafkaConsumerService(config, order_handler)
try:
await consumer.start()
await consumer.run()
except KeyboardInterrupt:
pass
finally:
await consumer.stop()
if __name__ == "__main__":
asyncio.run(main())
5.3 Production Kafka Producer
# Production-Ready Kafka Producer
# With batching, retries, and delivery confirmation
import asyncio
from dataclasses import dataclass
from typing import Optional, Dict, Any, Callable
import logging
import json
from datetime import datetime
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
logger = logging.getLogger(__name__)
@dataclass
class ProducerConfig:
"""Configuration for Kafka producer."""
bootstrap_servers: str
# Reliability settings
acks: str = "all" # Wait for all replicas
retries: int = 3 # Retry on failure
retry_backoff_ms: int = 100 # Backoff between retries
# Batching settings (for throughput)
linger_ms: int = 5 # Wait up to 5ms to batch
batch_size: int = 16384 # 16KB batch size
# Idempotence (prevent duplicates on retry)
enable_idempotence: bool = True
class KafkaProducerService:
"""
Production-ready Kafka producer with:
- Idempotent delivery (no duplicates)
- Delivery confirmation
- Batching for throughput
- Proper error handling
"""
def __init__(self, config: ProducerConfig):
self.config = config
self.producer: Optional[AIOKafkaProducer] = None
# Metrics
self.messages_sent = 0
self.messages_failed = 0
async def start(self) -> None:
"""Start the producer."""
self.producer = AIOKafkaProducer(
bootstrap_servers=self.config.bootstrap_servers,
acks=self.config.acks,
retries=self.config.retries,
retry_backoff_ms=self.config.retry_backoff_ms,
linger_ms=self.config.linger_ms,
max_batch_size=self.config.batch_size,
enable_idempotence=self.config.enable_idempotence,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
await self.producer.start()
logger.info("Producer started")
async def stop(self) -> None:
"""Stop the producer, flushing pending messages."""
if self.producer:
await self.producer.stop()
logger.info(f"Producer stopped. Sent: {self.messages_sent}, "
f"Failed: {self.messages_failed}")
async def send(
self,
topic: str,
value: Dict[str, Any],
key: Optional[str] = None,
headers: Optional[Dict[str, str]] = None
) -> bool:
"""
Send a message to Kafka.
Args:
topic: Target topic
value: Message payload
key: Partition key (messages with same key go to same partition)
headers: Optional message headers
Returns:
True if sent successfully, False otherwise
"""
try:
# Convert headers to Kafka format
kafka_headers = None
if headers:
kafka_headers = [(k, v.encode('utf-8')) for k, v in headers.items()]
# Send and wait for acknowledgment
result = await self.producer.send_and_wait(
topic,
value=value,
key=key,
headers=kafka_headers
)
self.messages_sent += 1
logger.debug(f"Sent message to {topic}:{result.partition}@{result.offset}")
return True
except KafkaError as e:
self.messages_failed += 1
logger.error(f"Failed to send message: {e}")
return False
async def send_batch(
self,
topic: str,
messages: list[tuple[Optional[str], Dict[str, Any]]]
) -> int:
"""
Send multiple messages efficiently.
Args:
topic: Target topic
messages: List of (key, value) tuples
Returns:
Number of successfully sent messages
"""
tasks = []
for key, value in messages:
# Don't wait for each message individually
future = await self.producer.send(topic, value=value, key=key)
tasks.append(future)
# Wait for all to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception))
self.messages_sent += success_count
self.messages_failed += len(results) - success_count
return success_count
# =============================================================================
# Example Usage
# =============================================================================
async def producer_example():
"""Example of using the producer."""
config = ProducerConfig(
bootstrap_servers="localhost:9092",
acks="all",
enable_idempotence=True
)
producer = KafkaProducerService(config)
await producer.start()
try:
# Send single message
await producer.send(
topic="orders",
key="user_123",
value={
"order_id": "order_456",
"action": "created",
"amount": 99.99,
"timestamp": datetime.utcnow().isoformat()
},
headers={"source": "api", "version": "1"}
)
# Send batch
messages = [
("user_123", {"order_id": "order_457", "action": "created"}),
("user_456", {"order_id": "order_458", "action": "created"}),
("user_789", {"order_id": "order_459", "action": "created"}),
]
sent = await producer.send_batch("orders", messages)
print(f"Sent {sent} messages")
finally:
await producer.stop()
Chapter 6: Edge Cases and Error Handling
6.1 Edge Case 1: Consumer Crashes Mid-Processing
┌────────────────────────────────────────────────────────────────────────┐
│ SCENARIO: Consumer crashes after reading but before committing │
│ │
│ Consumer reads message at offset 100 │
│ │ │
│ ▼ │
│ Consumer starts processing │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ CRASH! │ ◄── Process dies │
│ └─────────────────┘ │
│ │ │
│ ▼ │
│ Offset NOT committed (still at 99) │
│ │ │
│ ▼ │
│ New consumer starts, reads offset 100 again │
│ │ │
│ ▼ │
│ MESSAGE IS REPROCESSED (at-least-once) │
│ │
│ SOLUTION: Consumer must be IDEMPOTENT │
│ │
│ Option 1: Deduplicate by message ID │
│ Option 2: Use database transaction with offset │
│ Option 3: Make processing naturally idempotent │
│ │
└────────────────────────────────────────────────────────────────────────┘
Solution: Idempotent Consumer
class IdempotentOrderProcessor:
"""
Process orders idempotently using a processed_ids table.
"""
def __init__(self, db_pool):
self.db = db_pool
async def process_order(self, ctx: MessageContext) -> ProcessingResult:
"""Process order with idempotency check."""
order = ctx.value
message_id = f"{ctx.topic}:{ctx.partition}:{ctx.offset}"
async with self.db.acquire() as conn:
async with conn.transaction():
# Check if already processed
existing = await conn.fetchval(
"SELECT 1 FROM processed_messages WHERE message_id = $1",
message_id
)
if existing:
logger.info(f"Message {message_id} already processed, skipping")
return ProcessingResult.SUCCESS
# Process the order
await conn.execute(
"""
INSERT INTO orders (order_id, user_id, amount, status)
VALUES ($1, $2, $3, 'created')
""",
order['order_id'], order['user_id'], order['amount']
)
# Mark as processed (in same transaction!)
await conn.execute(
"""
INSERT INTO processed_messages (message_id, processed_at)
VALUES ($1, NOW())
""",
message_id
)
return ProcessingResult.SUCCESS
6.2 Edge Case 2: Consumer Rebalance
┌────────────────────────────────────────────────────────────────────────┐
│ SCENARIO: Partition reassignment during processing │
│ │
│ BEFORE: │
│ Consumer A: Partitions 0, 1, 2 │
│ Consumer B: Partitions 3, 4, 5 │
│ │
│ Consumer C joins the group... │
│ │
│ REBALANCE TRIGGERED: │
│ 1. All consumers stop fetching │
│ 2. Partitions are reassigned │
│ 3. Uncommitted offsets may be lost │
│ │
│ AFTER: │
│ Consumer A: Partitions 0, 1 │
│ Consumer B: Partitions 2, 3 │
│ Consumer C: Partitions 4, 5 │
│ │
│ PROBLEM: Consumer A was processing partition 2 during rebalance │
│ Work may be lost or duplicated │
│ │
│ SOLUTIONS: │
│ 1. Commit offsets frequently │
│ 2. Use cooperative rebalancing (Kafka 2.4+) │
│ 3. Implement rebalance listener to commit before revoke │
│ │
└────────────────────────────────────────────────────────────────────────┘
Solution: Rebalance Listener
from aiokafka import ConsumerRebalanceListener
class GracefulRebalanceListener(ConsumerRebalanceListener):
"""
Handle partition rebalance gracefully.
Commit offsets before partitions are revoked.
"""
def __init__(self, consumer: AIOKafkaConsumer, processor):
self.consumer = consumer
self.processor = processor
async def on_partitions_revoked(self, revoked):
"""Called before partitions are taken away."""
logger.info(f"Partitions being revoked: {revoked}")
# Finish any in-progress work
await self.processor.finish_current_batch()
# Commit current offsets
await self.consumer.commit()
logger.info("Committed offsets before rebalance")
async def on_partitions_assigned(self, assigned):
"""Called after new partitions are assigned."""
logger.info(f"Partitions assigned: {assigned}")
# Optionally seek to specific offsets
# for tp in assigned:
# await self.consumer.seek(tp, specific_offset)
6.3 Edge Case 3: Slow Consumer (Consumer Lag)
┌────────────────────────────────────────────────────────────────────────┐
│ SCENARIO: Consumer can't keep up with producer │
│ │
│ Producer rate: 10,000 messages/sec │
│ Consumer rate: 1,000 messages/sec │
│ │
│ Lag grows by 9,000 messages every second! │
│ │
│ After 1 hour: 32,400,000 messages behind │
│ │
│ SYMPTOMS: │
│ • Consumer lag metric increasing │
│ • Memory pressure on broker (retaining more data) │
│ • Data becoming "stale" before processing │
│ │
│ SOLUTIONS: │
│ 1. Add more consumers (up to partition count) │
│ 2. Increase partition count (requires topic recreation) │
│ 3. Optimize consumer processing │
│ 4. Use batch processing │
│ 5. Implement backpressure on producer │
│ │
└────────────────────────────────────────────────────────────────────────┘
Solution: Monitor and Alert on Lag
from dataclasses import dataclass
from typing import Dict
import asyncio
@dataclass
class LagMetrics:
partition: int
current_offset: int
end_offset: int
lag: int
class ConsumerLagMonitor:
"""Monitor consumer lag and alert when thresholds exceeded."""
def __init__(
self,
consumer: AIOKafkaConsumer,
warning_threshold: int = 10000,
critical_threshold: int = 100000
):
self.consumer = consumer
self.warning_threshold = warning_threshold
self.critical_threshold = critical_threshold
async def get_lag(self) -> Dict[int, LagMetrics]:
"""Get lag for all assigned partitions."""
lag_metrics = {}
# Get end offsets (latest)
partitions = self.consumer.assignment()
end_offsets = await self.consumer.end_offsets(partitions)
for tp in partitions:
current = await self.consumer.position(tp)
end = end_offsets[tp]
lag = end - current
lag_metrics[tp.partition] = LagMetrics(
partition=tp.partition,
current_offset=current,
end_offset=end,
lag=lag
)
return lag_metrics
async def check_lag(self) -> None:
"""Check lag and log warnings/errors."""
lag_metrics = await self.get_lag()
total_lag = sum(m.lag for m in lag_metrics.values())
if total_lag >= self.critical_threshold:
logger.error(f"CRITICAL: Consumer lag is {total_lag:,} messages!")
# Send alert to PagerDuty/Slack
elif total_lag >= self.warning_threshold:
logger.warning(f"WARNING: Consumer lag is {total_lag:,} messages")
else:
logger.debug(f"Consumer lag: {total_lag:,} messages")
# Log per-partition metrics
for partition, metrics in lag_metrics.items():
logger.debug(f"Partition {partition}: lag={metrics.lag:,}")
async def monitor_loop(self, interval_seconds: int = 30):
"""Continuously monitor lag."""
while True:
try:
await self.check_lag()
except Exception as e:
logger.error(f"Error checking lag: {e}")
await asyncio.sleep(interval_seconds)
6.4 Error Handling Matrix
| Error Type | Cause | Handling | Retry? |
|---|---|---|---|
| Deserialization error | Malformed message | Send to DLQ, skip | No |
| Validation error | Missing required field | Send to DLQ, skip | No |
| Database timeout | DB overloaded | Retry with backoff | Yes |
| Network error | Connection lost | Retry with backoff | Yes |
| Out of memory | Message too large | Send to DLQ, skip | No |
| Rate limited | Downstream limit hit | Retry with backoff | Yes |
| Consumer crash | Bug in handler | Restart, reprocess | Auto |
| Broker unavailable | Kafka down | Retry until available | Yes |
Part III: Real-World Application
Chapter 7: How Big Tech Does It
7.1 Case Study: LinkedIn — The Birthplace of Kafka
┌────────────────────────────────────────────────────────────────────────┐
│ LINKEDIN'S KAFKA ARCHITECTURE │
│ │
│ SCALE: │
│ • 7+ trillion messages per day │
│ • 100+ petabytes of data │
│ • Thousands of producers and consumers │
│ │
│ USE CASES: │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Activity Tracking │ │
│ │ • Page views, searches, profile views │ │
│ │ • Billions of events per day │ │
│ │ • Powers recommendations and analytics │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Data Pipeline │ │
│ │ • Stream data from OLTP to OLAP │ │
│ │ • Real-time ETL │ │
│ │ • Feed machine learning models │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Metrics and Monitoring │ │
│ │ • Application metrics piped through Kafka │ │
│ │ • Aggregated in real-time │ │
│ │ • Powers dashboards and alerts │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ KEY DECISIONS: │
│ • Chose stream (Kafka) over queue for replay capability │
│ • Partition by user ID for user-centric ordering │
│ • Multiple consumer groups for different use cases │
│ • 7-day retention for replay, then archive to HDFS │
│ │
│ LESSONS: │
│ • "If in doubt, log it" — capture everything, decide later │
│ • Kafka became the "central nervous system" of LinkedIn │
│ • Consumer groups allow adding new features without changes │
│ │
└────────────────────────────────────────────────────────────────────────┘
Reference: "The Log: What every software engineer should know about real-time data's unifying abstraction" — Jay Kreps (LinkedIn, co-creator of Kafka)
7.2 Case Study: Uber — Kafka for Real-Time Matching
┌────────────────────────────────────────────────────────────────────────┐
│ UBER'S MESSAGING ARCHITECTURE │
│ │
│ CHALLENGE: │
│ Match riders with drivers in real-time │
│ Millions of events per second │
│ Strong ordering requirements per trip │
│ │
│ ARCHITECTURE: │
│ │
│ ┌──────────┐ ┌───────────────────────────────────┐ │
│ │ Rider │────▶│ │ │
│ │ App │ │ KAFKA │ │
│ └──────────┘ │ │ │
│ │ Trip Events Topic │ │
│ ┌──────────┐ │ ┌───────────────────────────┐ │ │
│ │ Driver │────▶│ │ Key: trip_id │ │ │
│ │ App │ │ │ All events for same trip │ │ │
│ └──────────┘ │ │ go to same partition │ │ │
│ │ └───────────────────────────┘ │ │
│ ┌──────────┐ │ │ │
│ │ Dispatch │────▶│ Location Updates Topic │ │
│ │ Service │ │ ┌───────────────────────────┐ │ │
│ └──────────┘ │ │ Key: driver_id │ │ │
│ │ │ 4 billion/day │ │ │
│ │ └───────────────────────────┘ │ │
│ └───────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────┼───────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Matching │ │ Pricing │ │ Analytics │ │
│ │ Service │ │ Service │ │ Pipeline │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ KEY DECISIONS: │
│ • Partition by trip_id: All trip events in order │
│ • Consumer groups: Matching, pricing, analytics read same data │
│ • Exactly-once via idempotent writes to database │
│ • Multi-region Kafka for disaster recovery │
│ │
│ SCALE: │
│ • Trillions of messages per day across all topics │
│ • < 10ms latency for trip updates │
│ • 99.99% availability requirement │
│ │
└────────────────────────────────────────────────────────────────────────┘
7.3 Case Study: Stripe — SQS for Webhook Delivery
┌────────────────────────────────────────────────────────────────────────┐
│ STRIPE'S WEBHOOK ARCHITECTURE │
│ │
│ CHALLENGE: │
│ Deliver webhooks reliably to millions of merchants │
│ Retry failed deliveries with exponential backoff │
│ Handle merchants with slow/broken endpoints │
│ │
│ WHY SQS (Queue) NOT Kafka (Stream)? │
│ │
│ ✓ Each webhook is a "job to be done" (queue mental model) │
│ ✓ Need per-message acknowledgment │
│ ✓ Need visibility timeout for slow processing │
│ ✓ Built-in dead letter queue │
│ ✓ Don't need replay (webhooks are delivered, not replayed) │
│ │
│ ARCHITECTURE: │
│ │
│ ┌──────────┐ ┌─────────────────────────────────────┐ │
│ │ Payment │ │ SQS │ │
│ │ Service │────▶│ ┌─────────────────────────────┐ │ │
│ └──────────┘ │ │ Webhook Queue │ │ │
│ │ │ [hook][hook][hook][hook] │ │ │
│ │ └─────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────┐ │ │
│ │ │ Visibility Timeout: 30s │ │ │
│ │ │ (Message invisible while │ │ │
│ │ │ being processed) │ │ │
│ │ └─────────────────────────────┘ │ │
│ │ │ │ │
│ │ Failed? │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────┐ │ │
│ │ │ Dead Letter Queue │ │ │
│ │ │ (After 5 retries) │ │ │
│ │ └─────────────────────────────┘ │ │
│ └─────────────────────────────────────┘ │
│ │
│ KEY DECISIONS: │
│ • SQS for simple job queue semantics │
│ • Visibility timeout prevents duplicate delivery │
│ • Dead letter queue for investigation │
│ • Exponential backoff in application code │
│ │
│ LESSONS: │
│ • Not everything needs Kafka │
│ • Queue semantics fit webhook delivery perfectly │
│ • Managed service (SQS) reduces operational burden │
│ │
└────────────────────────────────────────────────────────────────────────┘
7.4 Comparison: When Each Company Chose What
| Company | Use Case | Choice | Why |
|---|---|---|---|
| Activity tracking | Kafka | Replay, multiple consumers | |
| Uber | Trip events | Kafka | Ordering per trip, fan-out |
| Stripe | Webhooks | SQS | Job queue, visibility timeout |
| Netflix | Video encoding | SQS | Job distribution to workers |
| Shopify | Order events | Kafka | Event sourcing, audit |
| Slack | Message delivery | Custom queue | Per-workspace isolation |
| Segment | Event routing | Kafka | Fan-out to 300+ destinations |
Chapter 8: Common Mistakes to Avoid
8.1 Mistake 1: Using Kafka When a Queue Would Do
┌────────────────────────────────────────────────────────────────────────┐
│ ❌ WRONG: Using Kafka for simple background jobs │
│ │
│ "We need to process image thumbnails. Let's use Kafka!" │
│ │
│ Problems: │
│ • Kafka complexity not needed │
│ • Partition count limits parallelism │
│ • Operational overhead (ZooKeeper, brokers) │
│ • No built-in retry/DLQ support │
│ │
├────────────────────────────────────────────────────────────────────────┤
│ ✅ CORRECT: Use a simple queue (Redis, SQS, Celery) │
│ │
│ • Jobs processed by available workers │
│ • Easy to scale workers up/down │
│ • Built-in retry support │
│ • Simpler operations │
│ │
│ Rule: If you don't need replay or multiple consumers, │
│ a queue is probably simpler. │
│ │
└────────────────────────────────────────────────────────────────────────┘
8.2 Mistake 2: Forgetting Consumer Idempotency
┌────────────────────────────────────────────────────────────────────────┐
│ ❌ WRONG: Assuming exactly-once delivery │
│ │
│ async def process_order(message): │
│ order = message.value │
│ db.execute("INSERT INTO orders VALUES (...)") # Duplicate! │
│ send_email(order.user, "Order confirmed") # Duplicate! │
│ │
│ If consumer crashes and restarts, message is redelivered. │
│ Order inserted twice. Two emails sent. │
│ │
├────────────────────────────────────────────────────────────────────────┤
│ ✅ CORRECT: Make consumer idempotent │
│ │
│ async def process_order(message): │
│ order = message.value │
│ message_id = f"{message.partition}:{message.offset}" │
│ │
│ # Check if already processed │
│ if await is_processed(message_id): │
│ return # Skip duplicate │
│ │
│ # Process in transaction │
│ async with db.transaction(): │
│ db.execute("INSERT INTO orders VALUES (...)") │
│ db.execute("INSERT INTO processed_ids VALUES (...)") │
│ │
│ # Idempotent email (check if sent) │
│ await send_email_if_not_sent(order.id, order.user) │
│ │
└────────────────────────────────────────────────────────────────────────┘
8.3 Mistake 3: Wrong Partition Key
┌────────────────────────────────────────────────────────────────────────┐
│ ❌ WRONG: Random or no partition key │
│ │
│ # No key = round-robin to partitions │
│ producer.send("orders", value=order) │
│ │
│ Problem: Order created → Order paid → Order shipped │
│ These might go to different partitions! │
│ Consumer might process "shipped" before "created" │
│ │
├────────────────────────────────────────────────────────────────────────┤
│ ✅ CORRECT: Key by the entity that needs ordering │
│ │
│ # All events for same order go to same partition │
│ producer.send("orders", key=order.id, value=order) │
│ │
│ # OR key by user if user-level ordering needed │
│ producer.send("orders", key=order.user_id, value=order) │
│ │
│ Rule: Key by whatever entity needs events processed in order │
│ │
└────────────────────────────────────────────────────────────────────────┘
8.4 Mistake 4: Too Few Partitions
┌────────────────────────────────────────────────────────────────────────┐
│ ❌ WRONG: Creating topic with 1 partition │
│ │
│ # Only 1 consumer can process at a time │
│ # Maximum throughput = 1 consumer's speed │
│ │
│ Partition 0: [msg][msg][msg][msg][msg][msg]... │
│ ▲ │
│ │ │
│ Only Consumer │
│ │
├────────────────────────────────────────────────────────────────────────┤
│ ✅ CORRECT: Plan partition count for growth │
│ │
│ # Rule of thumb: partitions >= expected max consumers │
│ # Can always have more partitions than consumers │
│ # Cannot easily reduce partitions later │
│ │
│ Partition 0: [msg][msg] → Consumer A │
│ Partition 1: [msg][msg] → Consumer B │
│ Partition 2: [msg][msg] → Consumer C │
│ Partition 3: [msg][msg] → Consumer D │
│ ... │
│ │
│ Start with 6-12 partitions for new topics. │
│ High-volume topics: 50-100+ partitions. │
│ │
└────────────────────────────────────────────────────────────────────────┘
8.5 Mistake Checklist
Before deploying a messaging system, verify:
- Consumer is idempotent — Can handle duplicate messages
- Partition key chosen correctly — Related messages stay together
- Enough partitions — Can scale consumers as needed
- Dead letter queue configured — Failed messages go somewhere
- Monitoring in place — Consumer lag, error rates visible
- Retention configured — Data kept long enough (but not forever)
- Serialization tested — Schema evolution won't break consumers
- Graceful shutdown — In-flight messages not lost on deploy
Part IV: Interview Preparation
Chapter 9: Interview Tips and Phrases
9.1 When to Bring Up Queues/Streams
Bring up messaging when:
- System has components that can operate independently
- You need to handle traffic spikes (buffer)
- Multiple services need to react to the same event
- You need audit/replay capabilities
- Synchronous calls would create tight coupling
9.2 Key Phrases to Use
┌────────────────────────────────────────────────────────────────────────┐
│ INTERVIEW PHRASES │
│ │
│ When proposing async architecture: │
│ │
│ "Rather than having the API call these services synchronously, │
│ I'd publish an event to a message queue. This decouples the │
│ services and allows us to handle spikes by buffering requests." │
│ │
│ When choosing between queue and stream: │
│ │
│ "For this use case, I'd use Kafka rather than a traditional queue │
│ because we need multiple consumer groups to process the same │
│ events—analytics needs the data, and so does the notification │
│ service. With a queue, once a message is consumed, it's gone." │
│ │
│ When discussing ordering: │
│ │
│ "I'd partition by user_id to ensure all events for a given user │
│ are processed in order. Kafka guarantees ordering within a │
│ partition, so this gives us user-level ordering while allowing │
│ parallelism across users." │
│ │
│ When discussing reliability: │
│ │
│ "The consumer needs to be idempotent because Kafka provides │
│ at-least-once delivery. I'd use the message offset as a │
│ deduplication key, storing it in the same transaction as the │
│ business operation." │
│ │
│ When discussing scale: │
│ │
│ "With 6 partitions, we can have up to 6 consumers processing │
│ in parallel. If we need more throughput, we'd increase the │
│ partition count, though this requires careful planning since │
│ you can't easily reduce partitions later." │
│ │
└────────────────────────────────────────────────────────────────────────┘
9.3 Questions to Ask Interviewer
- "What are the ordering requirements? Do events need to be processed in a specific order?"
- "How many consumers will need this data? Just one system, or multiple?"
- "Do we need replay capability? If something goes wrong, can we reprocess?"
- "What's the acceptable latency for message delivery?"
- "How bursty is the traffic? Are there predictable spikes?"
9.4 Common Follow-up Questions
| Question | Good Answer |
|---|---|
| "What if Kafka is down?" | "Producers would buffer locally and retry. We'd have monitoring to alert immediately. For critical paths, we might have a fallback to synchronous calls with circuit breaker." |
| "How do you handle poison pills?" | "Messages that fail repeatedly go to a dead letter queue after N retries. We have tooling to inspect, fix, and replay them. The key is capturing why they failed." |
| "What about message ordering?" | "Kafka guarantees ordering within a partition. I'd partition by the entity that needs ordering—like user_id or order_id. Across partitions, we'd either design for out-of-order or use timestamps." |
| "How do you scale consumers?" | "Add more consumer instances up to the partition count. If we need more parallelism, increase partitions. We'd monitor consumer lag to know when to scale." |
Chapter 10: Practice Problems
Problem 1: Order Processing System
Setup: Design an order processing system for an e-commerce platform. When a user places an order:
- Validate inventory
- Process payment
- Update inventory
- Send confirmation email
- Notify warehouse
Requirements:
- 10,000 orders per minute at peak
- Orders for same user must be processed in order
- No order can be lost
- Email and warehouse notification can be delayed
Questions to consider:
- Would you use a queue or stream? Why?
- How would you partition the messages?
- What happens if payment processing fails?
- How do you ensure emails aren't sent twice?
- Think about which operations MUST happen together vs can be async
- Consider using multiple topics for different stages
- Payment and inventory are critical; email is not
- Partition by user_id for user-level ordering
Architecture:
- API writes order to database + outbox in one transaction
- Outbox publisher sends to Kafka topic "orders" (key: user_id)
- Order processor validates inventory, processes payment
- On success, publishes to "orders.completed" topic
- Email service and warehouse service consume from "orders.completed"
Key decisions:
- Kafka for replay and multiple consumers
- Partition by user_id for ordering
- Transactional outbox to avoid dual-write
- Separate topics for different stages
- Idempotent consumers with deduplication
Problem 2: Real-Time Analytics Pipeline
Setup: Build a system to track user activity on a website. Events include page views, clicks, searches, and purchases. The data feeds into:
- Real-time dashboard (< 5 second delay)
- Daily reports
- ML recommendation engine
Requirements:
- 100,000 events per second at peak
- Events must not be lost
- Multiple consumers need the same data
- Need to replay last 7 days of data
Questions to consider:
- Why is this a stream (not queue) use case?
- How would you handle the different latency requirements?
- What partition strategy would you use?
- Stream is essential: multiple consumers, replay needed
- Consider consumer groups for each use case
- Think about what needs ordering vs what doesn't
- Consider data volume for partitioning decisions
Architecture:
- Kafka with 100+ partitions for throughput
- Partition by user_id (enables user-level analysis)
- Three consumer groups:
dashboard-consumer: Aggregates in real-timedaily-reports-consumer: Batch processes at end of dayml-consumer: Feeds recommendation model
- 7-day retention, then archive to S3
Key decisions:
- High partition count for parallelism
- Consumer groups for isolation
- Each consumer tracks its own offset
- Replay by resetting consumer offset
Problem 3: Notification Service
Setup: Design a notification service that sends emails, SMS, and push notifications. Notifications come from various services (orders, marketing, security alerts).
Requirements:
- Marketing can trigger 5 million notifications at once
- Security alerts (password reset) must be delivered within 30 seconds
- Email provider allows 1,000 sends/second
- Must handle provider outages gracefully
Questions to consider:
- How do you prioritize security alerts over marketing?
- How do you handle the rate limit to external providers?
- Queue or stream? Why?
- Consider multiple queues with different priorities
- Rate limiting needs to happen at the consumer level
- Queue fits better: job processing, no replay needed
- Think about circuit breakers for provider outages
Architecture:
- Priority queues: HIGH (security), MEDIUM (transactional), LOW (marketing)
- Separate queues per channel (email, SMS, push)
- Rate limiter before external provider calls
- Circuit breaker per provider with fallback
- Dead letter queue for failed deliveries
Key decisions:
- Queue (not stream): job processing, no replay needed
- Priority queues ensure security alerts not delayed
- Rate limiting at consumer level
- Provider abstraction allows fallback
Chapter 11: Mock Interview Dialogue
Scenario: Design an Event-Driven Order System
Interviewer: "We need to design an order processing system. When a user places an order, several things need to happen: validate inventory, charge payment, reserve inventory, send confirmation email, and notify the warehouse. How would you approach this?"
You: "Great question. Before I dive into the design, let me make sure I understand the requirements.
First, what's the expected scale? How many orders per second?"
Interviewer: "Let's say 1,000 orders per second at peak, with average around 100."
You: "And for reliability—is it acceptable to lose orders, or must every order be processed?"
Interviewer: "No order loss is acceptable. Payments are involved."
You: "Got it. Last question: do all these operations need to happen synchronously, or can some be delayed? For example, must the email be sent before we return a response to the user?"
Interviewer: "The user just needs to know their order was received. Email can be delayed by a few seconds, same with warehouse notification."
You: "Perfect. Based on these requirements, I'd use an event-driven architecture with a message stream—specifically Kafka. Let me draw this out.
┌────────────────────────────────────────────────────────────────────────┐
│ PROPOSED ARCHITECTURE │
│ │
│ User ──► API ──► Database + Outbox (single transaction) │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Kafka │ │
│ │ "orders" │ │
│ └──────┬──────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Order │ │ Email │ │ Warehouse │ │
│ │ Processor │ │ Service │ │ Service │ │
│ └──────┬──────┘ └─────────────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ Validate → Payment → Reserve Inventory │
│ │
└────────────────────────────────────────────────────────────────────────┘
Here's why I chose this approach:
-
Transactional outbox for the API: When the user submits an order, we write to our orders table AND an outbox table in a single database transaction. This avoids the dual-write problem where we might save to the database but fail to publish.
-
Kafka over a simple queue because multiple services need the same events—order processor, email, warehouse. With Kafka consumer groups, each service gets every message.
-
Partition by user_id so all orders from the same user go to the same partition. This ensures ordering per user."
Interviewer: "What happens if the payment processing fails?"
You: "Good question. The order processor would implement a saga pattern with compensation.
If payment fails:
- The order is marked as 'payment_failed' in the database
- Any reserved inventory is released (compensation)
- User is notified of the failure
For transient failures—like the payment service timing out—we'd retry with exponential backoff. After 3 retries, the message goes to a dead letter queue for manual investigation.
The key is that each step must be idempotent. If we retry payment and it already succeeded, the payment service should recognize the idempotency key and return the existing result rather than charging twice."
Interviewer: "How do you ensure the email isn't sent multiple times?"
You: "Two approaches:
First, the email service consumer is idempotent. It tracks which order IDs have been emailed using a simple database table. Before sending, it checks if a confirmation was already sent.
Second, the email service provider probably supports idempotency keys. We'd include the order ID in that key so even if we call the API twice, only one email goes out.
This is important because Kafka provides at-least-once delivery. Messages can be redelivered if a consumer crashes after processing but before committing the offset."
Interviewer: "How would you handle scaling this system?"
You: "A few dimensions to scale:
-
More partitions: Currently with 10 partitions, we can have 10 consumers per consumer group processing in parallel. At 1,000 orders/second, that's 100/second per consumer—very manageable. If we need more, we'd increase partition count.
-
More consumer instances: Within each consumer group, we can add instances up to the partition count. Kubernetes would auto-scale based on consumer lag metrics.
-
Separate topics by priority: If we later have high-priority orders (like same-day delivery), we might route those to a separate topic with more consumers.
The nice thing about this architecture is that each component scales independently. Email service slow? Add more email consumers. They don't affect order processing."
Interviewer: "Great. What metrics would you monitor?"
You: "Critical metrics:
- Consumer lag — Are we falling behind? If lag grows, we need more consumers.
- Processing latency — p99 of time from message publish to processing complete.
- Error rate — What percentage of messages fail and go to DLQ?
- DLQ depth — Growing DLQ means something systematic is wrong.
I'd have alerts on all of these. Consumer lag above 10,000 messages would page someone. DLQ growth of more than 100 messages in 5 minutes would alert immediately."
Summary
┌────────────────────────────────────────────────────────────────────────┐
│ DAY 1 KEY TAKEAWAYS │
│ │
│ CORE CONCEPT: │
│ • QUEUE = Work to be done (consumed and deleted) │
│ • STREAM = Events that happened (retained, multiple readers) │
│ • Choose based on whether you need replay and multiple consumers │
│ │
│ PATTERNS: │
│ • Point-to-Point: One consumer per message (job queues) │
│ • Pub-Sub: All subscribers get all messages (notifications) │
│ • Consumer Groups: Fan-out + load balance (Kafka's model) │
│ │
│ IMPLEMENTATION: │
│ • Producers need acks=all for durability │
│ • Consumers must be idempotent (at-least-once delivery) │
│ • Partition key determines ordering and parallelism │
│ • Dead letter queues for failed messages │
│ │
│ TRADE-OFFS: │
│ • Queue: Simpler, but no replay. Use for job processing. │
│ • Stream: Replay + fan-out, but more complex. Use for events. │
│ │
│ INTERVIEW TIPS: │
│ • Always ask about ordering requirements │
│ • Clarify if multiple systems need the same data │
│ • Mention idempotency when discussing reliability │
│ │
│ DEFAULT CHOICE: │
│ • Uncertain? Start with a queue (simpler) │
│ • Need replay or multiple consumers? Use a stream │
│ │
└────────────────────────────────────────────────────────────────────────┘
📚 Further Reading
Official Documentation
- Apache Kafka: https://kafka.apache.org/documentation/
- RabbitMQ Tutorials: https://www.rabbitmq.com/getstarted.html
- Amazon SQS: https://docs.aws.amazon.com/sqs/
- Redis Streams: https://redis.io/docs/data-types/streams/
Engineering Blogs
- LinkedIn Engineering: "The Log: What every software engineer should know about real-time data's unifying abstraction" — https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
- Uber Engineering: "Building Reliable Reprocessing and Dead Letter Queues with Kafka" — https://eng.uber.com/reliable-reprocessing/
- Confluent Blog: "Kafka vs RabbitMQ" — https://www.confluent.io/blog/kafka-vs-rabbitmq/
Books
- "Designing Data-Intensive Applications" by Martin Kleppmann — Chapter 11 covers stream processing
- "Kafka: The Definitive Guide" by Neha Narkhede — Comprehensive Kafka reference
- "Enterprise Integration Patterns" by Hohpe & Woolf — Classic patterns book
Papers
- "Kafka: a Distributed Messaging System for Log Processing" — Original Kafka paper from LinkedIn
End of Day 1: Queue vs Stream
Tomorrow: Day 2 — Transactional Outbox. We saw today that publishing to a queue after writing to a database is dangerous (dual-write problem). Tomorrow we'll solve this properly with the transactional outbox pattern, ensuring no message is ever lost between your database and message broker.