Week 3 — Day 3: Backpressure and Flow Control
System Design Mastery Series
Preface
Black Friday, 2019. An e-commerce platform's marketing team launches their biggest campaign ever: a flash sale notification to 10 million users. Within seconds, 2 million users click through to the site.
The notification service, designed for 5,000 events per second, suddenly receives 200,000 events per second. Here's what happened:
Timeline of Disaster:
09:00:00 - Flash sale notification sent to 10M users
09:00:05 - Click events start flooding in (50K/sec)
09:00:15 - Event queue depth: 500,000 messages
09:00:30 - Queue depth: 2,000,000 messages
09:00:45 - Kafka consumer lag: 3 minutes and growing
09:01:00 - Memory exhaustion on consumer nodes
09:01:15 - Consumers start crashing, rebalancing begins
09:01:30 - Rebalancing causes more lag, cascade begins
09:02:00 - All consumers down, queue depth: 5,000,000
09:05:00 - Database connection pool exhausted
09:05:30 - API servers returning 503s
09:10:00 - Complete site outage
Root cause: No backpressure handling.
The system had no way to say "slow down, I can't keep up."
Yesterday, we learned the transactional outbox pattern—how to reliably publish events. But we left a dangerous question unanswered:
What happens when producers send events faster than consumers can process them?
This is the problem of backpressure, and today we'll learn how to detect it, handle it, and design systems that gracefully degrade instead of catastrophically fail.
Part I: Foundations
Chapter 1: What Is Backpressure?
1.1 The Simple Definition
Backpressure is what happens when a downstream system can't keep up with an upstream system. It's the distributed systems equivalent of a traffic jam.
EVERYDAY ANALOGY: The Highway On-Ramp
Think of a highway during rush hour:
Surface Street (Producer) Highway (Consumer)
Cars arriving: 100/minute Capacity: 60 cars/minute
What happens?
Minute 1: 100 arrive, 60 merge, 40 waiting
Minute 2: 100 arrive, 60 merge, 80 waiting
Minute 3: 100 arrive, 60 merge, 120 waiting
...
Minute 10: 100 arrive, 60 merge, 400 waiting
The on-ramp backs up onto surface streets.
Surface streets back up.
Eventually, the entire area is gridlocked.
This is backpressure without flow control.
SOLUTIONS highways use:
1. METERING LIGHTS: Slow down the on-ramp (rate limiting)
2. RAMP CLOSURE: Stop new cars entirely (load shedding)
3. ALTERNATE ROUTES: Divert traffic elsewhere (spillover)
4. MORE LANES: Increase capacity (scaling)
1.2 Why It Happens in Distributed Systems
In synchronous systems, backpressure is automatic:
SYNCHRONOUS (Self-Regulating):
Client ──request──▶ Server ──response──▶ Client
│
Server busy?
Client waits.
Natural backpressure: clients can't send faster than server responds.
ASYNCHRONOUS (No Built-in Regulation):
Producer ──msg──▶ Queue ──msg──▶ Consumer
│ │
│ Queue grows
│ unbounded!
│
Producer doesn't wait.
Sends as fast as it can.
No natural backpressure: queue absorbs the difference... until it can't.
1.3 The Three Phases of Backpressure
PHASE 1: QUEUE ABSORPTION
─────────────────────────
Producer rate: 1000/sec
Consumer rate: 800/sec
Gap: 200/sec accumulating in queue
Queue is doing its job—buffering the difference.
This is fine... temporarily.
PHASE 2: RESOURCE EXHAUSTION
────────────────────────────
Queue depth growing: 1M, 2M, 5M messages
Memory usage climbing
Consumer lag growing (minutes, then hours)
Warning signs appear. Action needed.
PHASE 3: SYSTEM COLLAPSE
────────────────────────
Queue hits memory limit → OOM kills
Consumers crash → rebalancing → more lag
Timeouts cascade upstream
Entire system fails
The point of no return. Recovery is painful.
1.4 Key Terminology
| Term | Definition |
|---|---|
| Consumer lag | How far behind the consumer is (messages or time) |
| Queue depth | Number of messages waiting to be processed |
| Throughput | Messages processed per unit time |
| Saturation | When a resource is at 100% capacity |
| Load shedding | Deliberately dropping messages to survive |
| Rate limiting | Restricting how fast producers can send |
| Flow control | Mechanisms to match producer and consumer rates |
Chapter 2: Detecting Backpressure
2.1 Key Metrics to Monitor
Before you can handle backpressure, you need to detect it. These are the critical metrics:
METRIC 1: CONSUMER LAG
──────────────────────
What: Difference between latest offset and consumer's current offset
Where: Kafka, Kinesis, any partitioned stream
Alert: Lag > 10,000 messages or > 5 minutes
Example Kafka lag query:
kafka-consumer-groups --describe --group my-consumer
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
orders 0 1000 5000 4000 ← Problem!
orders 1 2000 2100 100 ← OK
orders 2 1500 8000 6500 ← Problem!
METRIC 2: QUEUE DEPTH
─────────────────────
What: Number of messages waiting in queue
Where: RabbitMQ, SQS, Redis queues
Alert: Depth > 100,000 or growing for > 5 minutes
Example RabbitMQ:
rabbitmqctl list_queues name messages
orders_queue 150000 ← Growing fast
METRIC 3: PROCESSING LATENCY
────────────────────────────
What: Time from message publish to processing complete
Where: Measured in consumer, reported to metrics system
Alert: p99 > 10 seconds (depends on SLA)
Latency breakdown:
Queue wait time: 8000ms ← Backpressure signal!
Processing time: 50ms
Total latency: 8050ms
METRIC 4: CONSUMER THROUGHPUT
─────────────────────────────
What: Messages processed per second
Where: Consumer metrics
Alert: Throughput dropping while lag increasing
Healthy: Throughput 1000/sec, Lag stable
Unhealthy: Throughput 500/sec, Lag growing ← Consumer struggling
2.2 Building a Backpressure Detection System
# Backpressure Detection System
import asyncio
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, List
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class BackpressureLevel(Enum):
NONE = "none" # All clear
WARNING = "warning" # Early signs, monitor closely
CRITICAL = "critical" # Take action now
EMERGENCY = "emergency" # System at risk of failure
@dataclass
class BackpressureConfig:
"""Thresholds for backpressure detection."""
# Lag thresholds
lag_warning: int = 10_000
lag_critical: int = 100_000
lag_emergency: int = 1_000_000
# Queue depth thresholds
depth_warning: int = 50_000
depth_critical: int = 200_000
depth_emergency: int = 1_000_000
# Latency thresholds (seconds)
latency_warning: float = 5.0
latency_critical: float = 30.0
latency_emergency: float = 300.0
# Rate of change thresholds
lag_growth_rate_warning: float = 1000.0 # messages/sec growth
@dataclass
class BackpressureStatus:
"""Current backpressure status."""
level: BackpressureLevel
consumer_lag: int
queue_depth: int
processing_latency_p99: float
lag_growth_rate: float
timestamp: datetime
details: str
class BackpressureMonitor:
"""
Monitors system metrics and detects backpressure conditions.
"""
def __init__(
self,
config: BackpressureConfig,
metrics_client, # Your metrics system (Prometheus, Datadog, etc.)
):
self.config = config
self.metrics = metrics_client
self._lag_history: List[tuple] = [] # (timestamp, lag)
async def check(self) -> BackpressureStatus:
"""Check current backpressure status."""
# Gather metrics
lag = await self.metrics.get_consumer_lag()
depth = await self.metrics.get_queue_depth()
latency = await self.metrics.get_processing_latency_p99()
# Calculate lag growth rate
growth_rate = self._calculate_lag_growth_rate(lag)
# Determine level
level = self._determine_level(lag, depth, latency, growth_rate)
# Build status
status = BackpressureStatus(
level=level,
consumer_lag=lag,
queue_depth=depth,
processing_latency_p99=latency,
lag_growth_rate=growth_rate,
timestamp=datetime.utcnow(),
details=self._build_details(lag, depth, latency, growth_rate)
)
# Log if not healthy
if level != BackpressureLevel.NONE:
logger.warning(f"Backpressure detected: {status}")
return status
def _determine_level(
self, lag: int, depth: int, latency: float, growth_rate: float
) -> BackpressureLevel:
"""Determine backpressure level from metrics."""
# Emergency: Any metric at emergency level
if (lag >= self.config.lag_emergency or
depth >= self.config.depth_emergency or
latency >= self.config.latency_emergency):
return BackpressureLevel.EMERGENCY
# Critical: Any metric at critical level
if (lag >= self.config.lag_critical or
depth >= self.config.depth_critical or
latency >= self.config.latency_critical):
return BackpressureLevel.CRITICAL
# Warning: Any metric at warning level OR lag growing fast
if (lag >= self.config.lag_warning or
depth >= self.config.depth_warning or
latency >= self.config.latency_warning or
growth_rate >= self.config.lag_growth_rate_warning):
return BackpressureLevel.WARNING
return BackpressureLevel.NONE
def _calculate_lag_growth_rate(self, current_lag: int) -> float:
"""Calculate how fast lag is growing (messages/sec)."""
now = datetime.utcnow()
self._lag_history.append((now, current_lag))
# Keep only last 60 seconds
cutoff = now - timedelta(seconds=60)
self._lag_history = [
(t, l) for t, l in self._lag_history if t > cutoff
]
if len(self._lag_history) < 2:
return 0.0
# Calculate rate from oldest to newest
oldest_time, oldest_lag = self._lag_history[0]
newest_time, newest_lag = self._lag_history[-1]
time_diff = (newest_time - oldest_time).total_seconds()
if time_diff == 0:
return 0.0
lag_diff = newest_lag - oldest_lag
return lag_diff / time_diff
def _build_details(
self, lag: int, depth: int, latency: float, growth_rate: float
) -> str:
"""Build human-readable details string."""
parts = []
if lag >= self.config.lag_warning:
parts.append(f"lag={lag:,}")
if depth >= self.config.depth_warning:
parts.append(f"depth={depth:,}")
if latency >= self.config.latency_warning:
parts.append(f"latency_p99={latency:.1f}s")
if growth_rate >= self.config.lag_growth_rate_warning:
parts.append(f"lag_growth={growth_rate:.0f}/sec")
return ", ".join(parts) if parts else "all metrics healthy"
Chapter 3: Backpressure Response Strategies
3.1 Strategy 1: Load Shedding
Drop excess messages to protect the system.
WHEN TO USE:
• Data loss is acceptable (metrics, logs, analytics)
• System survival is more important than completeness
• Processing all messages would cause total failure
HOW IT WORKS:
Normal: [msg][msg][msg][msg] → Consumer → Processed
Overloaded: [msg][msg][msg][msg][msg][msg][msg][msg][msg]
│
▼
Load Shedder
│
┌────┴────┐
▼ ▼
[msg][msg] [dropped]
│
▼
Consumer → Processed (subset)
IMPLEMENTATION OPTIONS:
1. RANDOM SAMPLING: Keep 1 in N messages
Simple, stateless, but loses ordering context
2. PRIORITY-BASED: Drop low-priority first
Keep critical messages, shed best-effort
3. RATE-BASED: Accept up to N messages/second
Predictable load, but bursty traffic gets dropped
4. QUEUE-DEPTH BASED: Shed when queue > threshold
Reactive, may oscillate
# Load Shedding Implementation
from enum import IntEnum
import random
class Priority(IntEnum):
CRITICAL = 0 # Never drop (payments, orders)
HIGH = 1 # Drop only in emergency
MEDIUM = 2 # Drop in critical+
LOW = 3 # Drop in warning+
class LoadShedder:
"""
Drops messages based on priority and backpressure level.
"""
# Which priorities to shed at each level
SHED_MATRIX = {
BackpressureLevel.NONE: set(),
BackpressureLevel.WARNING: {Priority.LOW},
BackpressureLevel.CRITICAL: {Priority.LOW, Priority.MEDIUM},
BackpressureLevel.EMERGENCY: {Priority.LOW, Priority.MEDIUM, Priority.HIGH},
# CRITICAL priority is NEVER shed
}
def __init__(self, monitor: BackpressureMonitor):
self.monitor = monitor
self.shed_count = 0
self.total_count = 0
async def should_process(self, message: dict) -> bool:
"""
Decide whether to process or shed this message.
Returns True if message should be processed.
"""
self.total_count += 1
# Get current backpressure level
status = await self.monitor.check()
# Get message priority
priority = Priority(message.get('priority', Priority.MEDIUM))
# Check if this priority should be shed
priorities_to_shed = self.SHED_MATRIX[status.level]
if priority in priorities_to_shed:
self.shed_count += 1
logger.debug(
f"Shedding message (priority={priority.name}, "
f"level={status.level.name})"
)
return False
return True
def get_shed_rate(self) -> float:
"""Get percentage of messages being shed."""
if self.total_count == 0:
return 0.0
return self.shed_count / self.total_count * 100
3.2 Strategy 2: Rate Limiting Producers
Slow down the source of messages.
WHEN TO USE:
• Producers can tolerate delays
• You control the producer code
• Better to slow down than drop
HOW IT WORKS:
Without rate limiting:
Producer → [msg][msg][msg][msg][msg] → Queue (overflow!)
With rate limiting:
Producer → Rate Limiter → [msg]...[msg]...[msg] → Queue (manageable)
│
"Slow down!"
"Wait 100ms"
IMPLEMENTATION APPROACHES:
1. CLIENT-SIDE RATE LIMITING
Producer checks with rate limiter before sending
Blocks or backs off if limit exceeded
2. API GATEWAY RATE LIMITING
Gateway rejects requests over limit (429 Too Many Requests)
Producer must handle rejection and retry
3. ADAPTIVE RATE LIMITING
Rate limit adjusts based on consumer lag
Tightens when lag grows, loosens when caught up
# Adaptive Rate Limiter
import asyncio
import time
from dataclasses import dataclass
@dataclass
class RateLimitConfig:
"""Configuration for adaptive rate limiting."""
base_rate: float = 1000.0 # Normal rate (messages/sec)
min_rate: float = 100.0 # Minimum rate under pressure
max_rate: float = 5000.0 # Maximum rate when healthy
# Lag thresholds for adjustment
lag_low: int = 1000 # Below this: increase rate
lag_high: int = 10000 # Above this: decrease rate
adjustment_factor: float = 0.1 # How much to adjust (10%)
class AdaptiveRateLimiter:
"""
Rate limiter that adjusts based on consumer lag.
When lag is low: allow higher rate
When lag is high: reduce rate to let consumer catch up
"""
def __init__(
self,
config: RateLimitConfig,
monitor: BackpressureMonitor
):
self.config = config
self.monitor = monitor
self.current_rate = config.base_rate
self.tokens = config.base_rate
self.last_update = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, timeout: float = 10.0) -> bool:
"""
Acquire permission to send a message.
Returns True if acquired, False if timed out.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
async with self._lock:
self._refill_tokens()
if self.tokens >= 1:
self.tokens -= 1
return True
# Wait a bit before retrying
await asyncio.sleep(0.01)
return False
async def adjust_rate(self):
"""Adjust rate based on current lag. Call periodically."""
status = await self.monitor.check()
lag = status.consumer_lag
async with self._lock:
if lag < self.config.lag_low:
# Lag is low, we can go faster
self.current_rate = min(
self.current_rate * (1 + self.config.adjustment_factor),
self.config.max_rate
)
logger.debug(f"Lag low ({lag}), increasing rate to {self.current_rate:.0f}/sec")
elif lag > self.config.lag_high:
# Lag is high, slow down
self.current_rate = max(
self.current_rate * (1 - self.config.adjustment_factor),
self.config.min_rate
)
logger.warning(f"Lag high ({lag}), decreasing rate to {self.current_rate:.0f}/sec")
def _refill_tokens(self):
"""Refill tokens based on elapsed time."""
now = time.monotonic()
elapsed = now - self.last_update
self.last_update = now
# Add tokens based on current rate
self.tokens = min(
self.tokens + elapsed * self.current_rate,
self.current_rate # Cap at 1 second worth
)
3.3 Strategy 3: Scaling Consumers
Add more processing capacity.
WHEN TO USE:
• You have headroom to scale (cost, partition count)
• Backpressure is due to insufficient consumers
• Traffic increase is sustained, not just a spike
HOW IT WORKS:
Before scaling (lag growing):
Partition 0 ──▶ Consumer A (overloaded)
Partition 1 ──▶ Consumer A (overloaded)
Partition 2 ──▶ Consumer A (overloaded)
After scaling (lag decreasing):
Partition 0 ──▶ Consumer A
Partition 1 ──▶ Consumer B
Partition 2 ──▶ Consumer C
SCALING CONSTRAINTS:
1. KAFKA: Consumers ≤ Partitions
12 partitions = max 12 consumers
Need more? Must increase partition count (disruptive)
2. SQS: No partition limit
Scale consumers freely
But: No ordering guarantees
3. DATABASE: Connection pool limits
Each consumer needs DB connections
Scale DB pool or use connection pooling
# Auto-scaling Consumer Group
from kubernetes import client, config as k8s_config
class ConsumerAutoScaler:
"""
Auto-scales consumer deployment based on lag.
Uses Kubernetes HPA custom metrics.
"""
def __init__(
self,
namespace: str,
deployment_name: str,
min_replicas: int = 1,
max_replicas: int = 10,
lag_per_replica: int = 10000 # Target lag per consumer
):
k8s_config.load_incluster_config()
self.apps_v1 = client.AppsV1Api()
self.namespace = namespace
self.deployment_name = deployment_name
self.min_replicas = min_replicas
self.max_replicas = max_replicas
self.lag_per_replica = lag_per_replica
async def scale_if_needed(self, current_lag: int, partition_count: int):
"""
Calculate and apply optimal replica count.
"""
# Get current replicas
deployment = self.apps_v1.read_namespaced_deployment(
self.deployment_name, self.namespace
)
current_replicas = deployment.spec.replicas
# Calculate desired replicas based on lag
desired_by_lag = max(1, current_lag // self.lag_per_replica)
# Cap at partition count (can't have more consumers than partitions)
desired_replicas = min(
max(self.min_replicas, desired_by_lag),
min(self.max_replicas, partition_count)
)
if desired_replicas != current_replicas:
logger.info(
f"Scaling {self.deployment_name}: "
f"{current_replicas} → {desired_replicas} "
f"(lag={current_lag:,})"
)
# Apply scale
deployment.spec.replicas = desired_replicas
self.apps_v1.patch_namespaced_deployment(
self.deployment_name,
self.namespace,
deployment
)
3.4 Strategy 4: Spillover Buffering
Overflow to secondary storage when primary is full.
WHEN TO USE:
• All messages must eventually be processed
• Primary queue has size limits
• You can tolerate delayed processing for overflow
HOW IT WORKS:
Normal flow:
Producer → Kafka → Consumer
When Kafka consumer is lagging:
Producer → Kafka → Consumer (processing)
│
└── Lag detected
│
▼
Spillover kicks in
│
▼
New messages → S3/Disk → Process later when caught up
SPILLOVER STORAGE OPTIONS:
1. LOCAL DISK
Fast, simple
Limited by disk size
Lost if machine fails
2. OBJECT STORAGE (S3)
Unlimited capacity
Durable
Higher latency
3. SECONDARY QUEUE
Another Kafka cluster
Full queue semantics
More infrastructure
3.5 Strategy Comparison
| Strategy | Data Loss | Latency Impact | Complexity | Best For |
|---|---|---|---|---|
| Load Shedding | Yes (controlled) | None | Low | Metrics, logs |
| Rate Limiting | No | Higher producer latency | Medium | API traffic |
| Scaling | No | Minimal | Medium | Sustained load |
| Spillover | No | Higher for overflow | High | All messages critical |
Chapter 3: Trade-offs and Considerations
3.1 The Fundamental Trade-off
THE BACKPRESSURE TRILEMMA
You can optimize for two of three:
Completeness
(No data loss)
/\
/ \
/ \
/ \
/ ?? \
/ \
/____________\
Latency Stability
(Fast) (No crashes)
SCENARIOS:
1. Completeness + Latency = Unstable
Process everything fast = overwhelm system during spikes
2. Completeness + Stability = High Latency
Keep all data, don't crash = lag grows during spikes
3. Latency + Stability = Data Loss
Stay fast and stable = must shed load during spikes
REALITY: Most systems choose Completeness + Stability
and accept temporary latency increases.
3.2 Choosing a Strategy
DECISION FRAMEWORK
START: Is data loss acceptable?
│
├── YES: Load Shedding
│ (Simplest, most effective)
│
└── NO: Can you control producers?
│
├── YES: Rate Limiting
│ (Prevents overload at source)
│
└── NO: Can you scale consumers?
│
├── YES: Auto-scaling
│ (Match capacity to demand)
│
└── NO: Spillover Buffering
(Store overflow, process later)
OFTEN: Combine multiple strategies
- Rate limit producers (first line of defense)
- Auto-scale consumers (handle sustained increases)
- Load shed as last resort (survive spikes)
Part II: Implementation
Chapter 4: Basic Implementation
4.1 Simple Backpressure-Aware Consumer
# Basic Backpressure-Aware Consumer
# WARNING: Not production-ready - for learning only
import asyncio
from typing import Callable, Any
class SimpleBackpressureConsumer:
"""
Consumer that monitors its own processing rate
and signals when falling behind.
"""
def __init__(
self,
process_fn: Callable[[dict], Any],
target_rate: float = 100.0, # Target messages/sec
max_queue_size: int = 1000
):
self.process_fn = process_fn
self.target_rate = target_rate
self.max_queue_size = max_queue_size
self._queue = asyncio.Queue(maxsize=max_queue_size)
self._processed_count = 0
self._dropped_count = 0
async def submit(self, message: dict) -> bool:
"""
Submit a message for processing.
Returns False if queue is full (backpressure signal).
"""
try:
self._queue.put_nowait(message)
return True
except asyncio.QueueFull:
self._dropped_count += 1
return False # Backpressure signal!
async def run(self):
"""Process messages from queue."""
while True:
message = await self._queue.get()
try:
await self.process_fn(message)
self._processed_count += 1
except Exception as e:
print(f"Processing error: {e}")
finally:
self._queue.task_done()
@property
def queue_depth(self) -> int:
return self._queue.qsize()
@property
def is_under_pressure(self) -> bool:
return self._queue.qsize() > self.max_queue_size * 0.8
# Usage
async def process_message(msg):
await asyncio.sleep(0.01) # Simulate work
print(f"Processed: {msg}")
async def main():
consumer = SimpleBackpressureConsumer(process_message)
# Start consumer
consumer_task = asyncio.create_task(consumer.run())
# Submit messages, handle backpressure
for i in range(10000):
success = await consumer.submit({"id": i})
if not success:
print(f"Backpressure! Queue full at message {i}")
await asyncio.sleep(0.1) # Back off
4.2 Understanding the Flow
MESSAGE FLOW WITH BACKPRESSURE HANDLING
Producer Consumer
│ │
│ 1. Try to send │
├──────────────────────────▶│
│ │
│ 2. Queue full?
│ │
│ 3a. Queue has space │
│◀─────── Accept ───────────│
│ │
│ 3b. Queue full │
│◀─────── Reject ───────────│
│ │
│ 4. Handle rejection │
│ - Back off │
│ - Try alternate queue │
│ - Drop if allowed │
│ │
CONSUMER INTERNAL FLOW
┌─────────────────────────────────────┐
│ Consumer │
│ │
Messages ──▶ [Queue] ──▶ [Processor] ──▶ Done
│ │ │ │
│ │ Monitor rate │
│ │ │ │
│ │ Rate dropping? │
│ │ │ │
│ │ YES: Alert! │
│ │ │ │
│ Depth growing? ◀──┘ │
│ │ │
│ YES: Backpressure signal │
│ │
└─────────────────────────────────────┘
Chapter 5: Production-Ready Implementation
5.1 Requirements for Production
- Multi-level backpressure detection — Warning, critical, emergency
- Multiple response strategies — Rate limit, shed, scale
- Graceful degradation — Prioritize critical messages
- Observability — Metrics, alerts, dashboards
- Recovery — Auto-heal when pressure subsides
5.2 Complete Backpressure Management System
# Production-Ready Backpressure Management System
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List, Callable, Set
from enum import Enum
import logging
import time
logger = logging.getLogger(__name__)
# =============================================================================
# Configuration
# =============================================================================
@dataclass
class BackpressureManagerConfig:
"""Configuration for backpressure management."""
# Detection thresholds
lag_warning: int = 10_000
lag_critical: int = 100_000
lag_emergency: int = 500_000
queue_depth_warning: int = 50_000
queue_depth_critical: int = 200_000
# Response configuration
enable_load_shedding: bool = True
enable_rate_limiting: bool = True
enable_auto_scaling: bool = True
# Rate limiting
base_rate_limit: float = 10_000.0 # messages/sec
min_rate_limit: float = 1_000.0
# Scaling
min_consumers: int = 1
max_consumers: int = 20
scale_up_threshold: int = 50_000 # Lag to trigger scale up
scale_down_threshold: int = 5_000 # Lag to trigger scale down
scale_cooldown_seconds: int = 300 # Wait between scaling actions
# Priorities that can be shed at each level
shed_on_warning: Set[int] = field(default_factory=lambda: {3}) # LOW only
shed_on_critical: Set[int] = field(default_factory=lambda: {2, 3}) # MEDIUM, LOW
shed_on_emergency: Set[int] = field(default_factory=lambda: {1, 2, 3}) # All except CRITICAL
class Priority(Enum):
CRITICAL = 0 # Never shed (payments, orders)
HIGH = 1 # Shed only in emergency
MEDIUM = 2 # Shed in critical+
LOW = 3 # Shed in warning+
# =============================================================================
# Core Backpressure Manager
# =============================================================================
class BackpressureManager:
"""
Comprehensive backpressure management system.
Handles:
- Detection: Monitors lag, queue depth, latency
- Response: Rate limiting, load shedding, scaling
- Recovery: Auto-adjusts as pressure subsides
"""
def __init__(
self,
config: BackpressureManagerConfig,
metrics_client, # Your metrics system
scaler = None, # Optional: Kubernetes scaler
):
self.config = config
self.metrics = metrics_client
self.scaler = scaler
# State
self.current_level = BackpressureLevel.NONE
self.current_rate_limit = config.base_rate_limit
self.last_scale_time: Optional[datetime] = None
# Rate limiter state
self._tokens = config.base_rate_limit
self._last_token_update = time.monotonic()
self._token_lock = asyncio.Lock()
# Metrics
self.messages_accepted = 0
self.messages_shed = 0
self.messages_rate_limited = 0
async def start(self):
"""Start background monitoring loop."""
asyncio.create_task(self._monitor_loop())
asyncio.create_task(self._adjust_loop())
logger.info("Backpressure manager started")
async def should_accept(self, message: Dict[str, Any]) -> bool:
"""
Decide whether to accept or reject a message.
Applies rate limiting and load shedding based on current pressure.
Args:
message: Message dict with optional 'priority' field
Returns:
True if message should be processed, False if shed/limited
"""
priority = Priority(message.get('priority', Priority.MEDIUM.value))
# 1. Check load shedding
if self.config.enable_load_shedding:
if self._should_shed(priority):
self.messages_shed += 1
logger.debug(f"Shedding message (priority={priority.name})")
return False
# 2. Check rate limiting
if self.config.enable_rate_limiting:
acquired = await self._acquire_token()
if not acquired:
self.messages_rate_limited += 1
logger.debug("Rate limited message")
return False
self.messages_accepted += 1
return True
def _should_shed(self, priority: Priority) -> bool:
"""Check if this priority should be shed at current level."""
if self.current_level == BackpressureLevel.NONE:
return False
if self.current_level == BackpressureLevel.WARNING:
return priority.value in self.config.shed_on_warning
if self.current_level == BackpressureLevel.CRITICAL:
return priority.value in self.config.shed_on_critical
if self.current_level == BackpressureLevel.EMERGENCY:
return priority.value in self.config.shed_on_emergency
return False
async def _acquire_token(self, timeout: float = 0.1) -> bool:
"""Try to acquire a rate limit token."""
async with self._token_lock:
self._refill_tokens()
if self._tokens >= 1:
self._tokens -= 1
return True
return False
def _refill_tokens(self):
"""Refill tokens based on elapsed time and current rate."""
now = time.monotonic()
elapsed = now - self._last_token_update
self._last_token_update = now
self._tokens = min(
self._tokens + elapsed * self.current_rate_limit,
self.current_rate_limit # Cap at 1 second worth
)
async def _monitor_loop(self):
"""Background loop to monitor backpressure metrics."""
while True:
try:
await self._update_level()
except Exception as e:
logger.error(f"Monitor error: {e}")
await asyncio.sleep(1) # Check every second
async def _update_level(self):
"""Update current backpressure level from metrics."""
lag = await self.metrics.get_consumer_lag()
depth = await self.metrics.get_queue_depth()
old_level = self.current_level
# Determine new level
if lag >= self.config.lag_emergency:
self.current_level = BackpressureLevel.EMERGENCY
elif lag >= self.config.lag_critical or depth >= self.config.queue_depth_critical:
self.current_level = BackpressureLevel.CRITICAL
elif lag >= self.config.lag_warning or depth >= self.config.queue_depth_warning:
self.current_level = BackpressureLevel.WARNING
else:
self.current_level = BackpressureLevel.NONE
# Log level changes
if self.current_level != old_level:
logger.warning(
f"Backpressure level changed: {old_level.name} → {self.current_level.name} "
f"(lag={lag:,}, depth={depth:,})"
)
async def _adjust_loop(self):
"""Background loop to adjust rate limits and scaling."""
while True:
try:
await self._adjust_rate_limit()
await self._adjust_scaling()
except Exception as e:
logger.error(f"Adjust error: {e}")
await asyncio.sleep(10) # Adjust every 10 seconds
async def _adjust_rate_limit(self):
"""Adjust rate limit based on current level."""
if not self.config.enable_rate_limiting:
return
old_rate = self.current_rate_limit
if self.current_level == BackpressureLevel.EMERGENCY:
# Minimum rate
self.current_rate_limit = self.config.min_rate_limit
elif self.current_level == BackpressureLevel.CRITICAL:
# Reduce by 50%
self.current_rate_limit = max(
self.config.base_rate_limit * 0.5,
self.config.min_rate_limit
)
elif self.current_level == BackpressureLevel.WARNING:
# Reduce by 25%
self.current_rate_limit = max(
self.config.base_rate_limit * 0.75,
self.config.min_rate_limit
)
else:
# Normal rate
self.current_rate_limit = self.config.base_rate_limit
if abs(self.current_rate_limit - old_rate) > 100:
logger.info(f"Rate limit adjusted: {old_rate:.0f} → {self.current_rate_limit:.0f}/sec")
async def _adjust_scaling(self):
"""Adjust consumer count based on lag."""
if not self.config.enable_auto_scaling or not self.scaler:
return
# Check cooldown
if self.last_scale_time:
cooldown = datetime.utcnow() - self.last_scale_time
if cooldown.total_seconds() < self.config.scale_cooldown_seconds:
return
lag = await self.metrics.get_consumer_lag()
current_count = await self.scaler.get_replica_count()
desired_count = current_count
if lag > self.config.scale_up_threshold and current_count < self.config.max_consumers:
# Scale up
desired_count = min(current_count + 2, self.config.max_consumers)
elif lag < self.config.scale_down_threshold and current_count > self.config.min_consumers:
# Scale down
desired_count = max(current_count - 1, self.config.min_consumers)
if desired_count != current_count:
logger.info(f"Scaling consumers: {current_count} → {desired_count} (lag={lag:,})")
await self.scaler.scale_to(desired_count)
self.last_scale_time = datetime.utcnow()
def get_stats(self) -> Dict[str, Any]:
"""Get current backpressure statistics."""
total = self.messages_accepted + self.messages_shed + self.messages_rate_limited
return {
"level": self.current_level.name,
"rate_limit": self.current_rate_limit,
"accepted": self.messages_accepted,
"shed": self.messages_shed,
"rate_limited": self.messages_rate_limited,
"shed_rate": (self.messages_shed / total * 100) if total > 0 else 0,
"limited_rate": (self.messages_rate_limited / total * 100) if total > 0 else 0,
}
# =============================================================================
# Priority-Aware Producer
# =============================================================================
class BackpressureAwareProducer:
"""
Producer that respects backpressure signals.
"""
def __init__(
self,
backpressure_manager: BackpressureManager,
kafka_producer,
topic: str
):
self.bp_manager = backpressure_manager
self.kafka = kafka_producer
self.topic = topic
async def send(
self,
message: Dict[str, Any],
priority: Priority = Priority.MEDIUM,
timeout: float = 10.0
) -> bool:
"""
Send a message, respecting backpressure.
Returns True if sent, False if rejected due to backpressure.
"""
message['priority'] = priority.value
# Check if we should accept this message
if not await self.bp_manager.should_accept(message):
return False
# Send to Kafka
try:
await self.kafka.send_and_wait(self.topic, message)
return True
except Exception as e:
logger.error(f"Send failed: {e}")
return False
async def send_critical(self, message: Dict[str, Any]) -> bool:
"""Send a critical message (never shed)."""
return await self.send(message, Priority.CRITICAL)
async def send_high(self, message: Dict[str, Any]) -> bool:
"""Send a high-priority message."""
return await self.send(message, Priority.HIGH)
async def send_low(self, message: Dict[str, Any]) -> bool:
"""Send a low-priority message (first to be shed)."""
return await self.send(message, Priority.LOW)
5.3 Consumer with Built-in Backpressure Handling
# Consumer with Backpressure-Aware Processing
class BackpressureAwareConsumer:
"""
Kafka consumer that handles its own backpressure.
Features:
- Monitors processing rate and queue depth
- Pauses consumption when falling behind
- Resumes when caught up
- Reports metrics for external monitoring
"""
def __init__(
self,
kafka_consumer,
processor: Callable,
max_batch_size: int = 100,
max_processing_time: float = 30.0, # seconds per batch
pause_threshold: int = 10_000, # Pause if internal queue > this
resume_threshold: int = 1_000 # Resume if internal queue < this
):
self.kafka = kafka_consumer
self.processor = processor
self.max_batch_size = max_batch_size
self.max_processing_time = max_processing_time
self.pause_threshold = pause_threshold
self.resume_threshold = resume_threshold
self._internal_queue: asyncio.Queue = asyncio.Queue()
self._paused = False
self._running = False
# Metrics
self.messages_processed = 0
self.batches_processed = 0
self.processing_time_total = 0.0
async def start(self):
"""Start consumer loops."""
self._running = True
asyncio.create_task(self._fetch_loop())
asyncio.create_task(self._process_loop())
asyncio.create_task(self._backpressure_loop())
logger.info("Backpressure-aware consumer started")
async def stop(self):
"""Stop consumer gracefully."""
self._running = False
logger.info(
f"Consumer stopped. Processed: {self.messages_processed:,}, "
f"Batches: {self.batches_processed:,}, "
f"Avg batch time: {self._avg_batch_time():.2f}s"
)
async def _fetch_loop(self):
"""Fetch messages from Kafka into internal queue."""
while self._running:
if self._paused:
await asyncio.sleep(0.1)
continue
try:
# Fetch batch from Kafka
messages = await self.kafka.getmany(
timeout_ms=1000,
max_records=self.max_batch_size
)
for tp, msgs in messages.items():
for msg in msgs:
await self._internal_queue.put(msg)
except Exception as e:
logger.error(f"Fetch error: {e}")
await asyncio.sleep(1)
async def _process_loop(self):
"""Process messages from internal queue."""
batch = []
batch_start = time.monotonic()
while self._running:
try:
# Get message with timeout
try:
msg = await asyncio.wait_for(
self._internal_queue.get(),
timeout=0.1
)
batch.append(msg)
except asyncio.TimeoutError:
pass
# Process batch if full or time exceeded
should_process = (
len(batch) >= self.max_batch_size or
(len(batch) > 0 and
time.monotonic() - batch_start > self.max_processing_time)
)
if should_process and batch:
await self._process_batch(batch)
batch = []
batch_start = time.monotonic()
except Exception as e:
logger.error(f"Process error: {e}")
async def _process_batch(self, batch: List):
"""Process a batch of messages."""
start = time.monotonic()
try:
await self.processor(batch)
elapsed = time.monotonic() - start
self.messages_processed += len(batch)
self.batches_processed += 1
self.processing_time_total += elapsed
# Commit offsets
await self.kafka.commit()
except Exception as e:
logger.error(f"Batch processing error: {e}")
raise
async def _backpressure_loop(self):
"""Monitor internal queue and pause/resume fetching."""
while self._running:
queue_size = self._internal_queue.qsize()
if not self._paused and queue_size > self.pause_threshold:
self._paused = True
logger.warning(
f"Pausing fetch: internal queue at {queue_size:,} "
f"(threshold: {self.pause_threshold:,})"
)
elif self._paused and queue_size < self.resume_threshold:
self._paused = False
logger.info(
f"Resuming fetch: internal queue at {queue_size:,} "
f"(threshold: {self.resume_threshold:,})"
)
await asyncio.sleep(1)
def _avg_batch_time(self) -> float:
if self.batches_processed == 0:
return 0.0
return self.processing_time_total / self.batches_processed
def get_metrics(self) -> Dict[str, Any]:
return {
"internal_queue_size": self._internal_queue.qsize(),
"is_paused": self._paused,
"messages_processed": self.messages_processed,
"batches_processed": self.batches_processed,
"avg_batch_time_seconds": self._avg_batch_time(),
}
Chapter 6: Edge Cases and Error Handling
6.1 Edge Case 1: Cascading Backpressure
SCENARIO: Consumer slows down, causing upstream to back up
Service A → Kafka → Service B → Kafka → Service C
│
Slow/down
│
▼
Backpressure propagates upstream!
TIMELINE:
T0: Service C slows down
T1: Service B's output queue backs up
T2: Service B slows down (waiting on C)
T3: Kafka topic B fills up
T4: Service A's output queue backs up
T5: Service A slows down
T6: Entire pipeline degraded
SOLUTIONS:
1. CIRCUIT BREAKER between services
If C is slow, B stops waiting and fails fast
2. INDEPENDENT BUFFERING at each stage
Each service can absorb its own backpressure
3. BULKHEAD ISOLATION
Separate resources per downstream dependency
6.2 Edge Case 2: Recovery Thundering Herd
SCENARIO: System recovers, causing spike from backlog
During outage:
- 1M messages accumulated in queue
- Rate limit reduced to 100/sec
After recovery:
- Rate limit restored to 10,000/sec
- All 1M messages processed at full speed
- Downstream services overwhelmed!
SOLUTION: Gradual rate restoration
Instead of:
rate = 100 → rate = 10,000 (instant)
Do:
rate = 100
rate = 500 (after 1 min stable)
rate = 1,000 (after 1 min stable)
rate = 2,500 (after 1 min stable)
rate = 5,000 (after 1 min stable)
rate = 10,000 (after 1 min stable)
# Gradual Recovery Rate Limiter
class GradualRecoveryRateLimiter:
"""Rate limiter that slowly restores rate after backpressure."""
def __init__(
self,
base_rate: float,
min_rate: float,
recovery_steps: int = 5,
step_duration_seconds: float = 60.0
):
self.base_rate = base_rate
self.min_rate = min_rate
self.recovery_steps = recovery_steps
self.step_duration = step_duration_seconds
self.current_rate = base_rate
self.recovery_start: Optional[datetime] = None
self.in_recovery = False
def reduce_rate(self, factor: float):
"""Reduce rate immediately during backpressure."""
self.current_rate = max(self.base_rate * factor, self.min_rate)
self.in_recovery = False
self.recovery_start = None
def start_recovery(self):
"""Start gradual rate recovery."""
if not self.in_recovery:
self.in_recovery = True
self.recovery_start = datetime.utcnow()
def get_current_rate(self) -> float:
"""Get current rate, accounting for gradual recovery."""
if not self.in_recovery:
return self.current_rate
# Calculate recovery progress
elapsed = (datetime.utcnow() - self.recovery_start).total_seconds()
step = int(elapsed / self.step_duration)
if step >= self.recovery_steps:
# Fully recovered
self.in_recovery = False
self.current_rate = self.base_rate
return self.base_rate
# Interpolate between min and base
progress = step / self.recovery_steps
rate_range = self.base_rate - self.current_rate
return self.current_rate + (rate_range * progress)
6.3 Edge Case 3: Uneven Partition Load
SCENARIO: One partition gets much more traffic than others
Partition distribution:
P0: 100,000 messages (hot partition)
P1: 10,000 messages
P2: 8,000 messages
P3: 12,000 messages
Consumer assignment:
Consumer A → P0 (overloaded!)
Consumer B → P1, P2, P3 (underloaded)
SYMPTOMS:
- Consumer A lagging
- Consumer B idle
- Overall lag growing despite capacity
SOLUTIONS:
1. BETTER PARTITION KEY
Don't use hot keys (celebrity users, popular products)
Add jitter: hash(user_id + random(0-10))
2. MORE PARTITIONS
More partitions = better distribution
But: more complexity, rebalancing overhead
3. WEIGHTED ASSIGNMENT
Assign hot partitions to dedicated consumers
Manual intervention required
6.4 Error Handling Matrix
| Error | Detection | Response | Recovery |
|---|---|---|---|
| Consumer lag growing | Metric threshold | Rate limit, scale up | Auto when caught up |
| Queue memory exhausted | OOM, health check | Load shed, pause | Restart, clear backlog |
| Processing timeout | Deadline exceeded | Fail message, retry | Retry with backoff |
| Downstream unavailable | Circuit breaker | Pause processing | Resume when healthy |
| Partition imbalance | Per-partition lag | Rebalance, add partitions | Manual tuning |
Part III: Real-World Application
Chapter 7: How Big Tech Does It
7.1 Case Study: Netflix — Backpressure in Video Pipeline
NETFLIX VIDEO ENCODING PIPELINE
Scale:
- 100+ million members
- Thousands of new titles per year
- Each title: 100+ encoding jobs (resolutions, codecs)
Challenge:
- Encoding is CPU-intensive (hours per job)
- New releases cause job spikes
- Can't drop encoding jobs
Architecture:
Upload → Job Queue → Encoder Fleet → Output
│
Backpressure
Management
│
┌────────┴────────┐
│ │
Priority Auto-scaling
Queuing │
│ Spot Instances
│ On-Demand Fallback
Critical ─────────────────────▶ Dedicated
(New releases) Encoders
Backpressure Strategy:
1. PRIORITY QUEUES
- New releases: highest priority
- Re-encodes: medium priority
- Archive optimization: lowest priority
2. AUTO-SCALING ENCODER FLEET
- Scale up Spot instances for surge
- Fall back to On-Demand if Spot unavailable
- Scale down during quiet periods
3. RATE LIMITING INGESTION
- Limit simultaneous uploads
- Queue uploads when encoder fleet saturated
4. NO LOAD SHEDDING
- Every encoding job must complete
- Accept delays during peaks
Results:
- Handle 10x traffic spikes
- No encoding jobs lost
- Acceptable delay during peaks (hours, not days)
7.2 Case Study: LinkedIn — Kafka Quotas
LINKEDIN KAFKA QUOTA SYSTEM
Scale:
- 7+ trillion messages per day
- Thousands of producers
- Multi-tenant clusters
Challenge:
- One bad producer can impact entire cluster
- Need fair sharing of resources
- Must protect cluster stability
Solution: Kafka Quotas
Broker Configuration:
producer_byte_rate = 10MB/sec per client
consumer_byte_rate = 50MB/sec per client
request_percentage = 50% of broker capacity
When quota exceeded:
Broker SLOWS DOWN client (delays response)
Client naturally backs off
No data loss, just delay
Implementation Details:
1. PER-CLIENT QUOTAS
Each producer has byte/sec limit
Tracked at broker level
2. PER-TOPIC QUOTAS
Critical topics get guaranteed bandwidth
Best-effort topics get remainder
3. RESPONSE THROTTLING
Instead of rejecting, broker delays response
Client thinks broker is slow
Natural backpressure without errors
Metrics Tracked:
- throttle_time_ms: How long clients are delayed
- produce_byte_rate: Per-client production rate
- quota_violation: Count of quota violations
Results:
- No cluster-wide outages from runaway producers
- Fair resource sharing
- Graceful degradation under load
7.3 Case Study: Uber — Surge Pricing Backpressure
UBER REAL-TIME PRICING SYSTEM
Challenge:
- Price calculation needs real-time supply/demand
- Events: ride requests, driver locations, completions
- Millions of events per minute in busy cities
Problem:
- New Year's Eve: 10x normal traffic
- If pricing falls behind, prices are stale
- Stale prices = wrong driver incentives
Architecture:
Events ─▶ Kafka ─▶ Pricing Service ─▶ Price Cache
│
Backpressure
│
┌───────────┴───────────┐
│ │
Load Shedding Sampling
(old events) (reduce rate)
Backpressure Strategy:
1. EVENT EXPIRATION
Events older than 30 seconds = dropped
Pricing needs current data, not historical
2. SAMPLING MODE
When lag > 10 seconds:
Process 1 in 10 location updates
Process ALL ride requests (critical)
3. GEOGRAPHIC SHARDING
Each city = separate processing pipeline
NYC backpressure doesn't affect LA
4. GRACEFUL DEGRADATION
If pricing unavailable: use last known price
Always return SOMETHING to riders
Key Insight:
"It's better to have an approximate current price
than an exact price from 5 minutes ago."
Results:
- Handle New Year's Eve traffic
- Pricing lag < 5 seconds during peaks
- No city-wide pricing outages
7.4 Summary: Industry Patterns
| Company | System | Backpressure Strategy |
|---|---|---|
| Netflix | Encoding | Priority queues, auto-scaling, no shedding |
| Kafka | Quotas, throttling, per-client limits | |
| Uber | Pricing | Event expiration, sampling, sharding |
| Timeline | Rate limiting, degraded mode, caching | |
| Stripe | Webhooks | Retry queues, exponential backoff |
| Discord | Messages | Per-guild rate limits, presence sampling |
Chapter 8: Common Mistakes to Avoid
8.1 Mistake 1: Unbounded Queues
❌ WRONG: Queue with no size limit
messages = [] # Grows forever!
while True:
msg = receive_message()
messages.append(msg) # No limit!
Eventually:
- Memory exhausted
- OOM killer strikes
- All messages lost
✅ CORRECT: Bounded queue with rejection
from asyncio import Queue
messages = Queue(maxsize=10000) # Bounded!
async def receive():
msg = get_message()
try:
messages.put_nowait(msg)
except QueueFull:
handle_backpressure(msg) # Reject, shed, or buffer elsewhere
8.2 Mistake 2: Ignoring Consumer Lag
❌ WRONG: No lag monitoring
# Consume forever, hope for the best
while True:
msg = consumer.poll()
process(msg)
# No visibility into:
# - How far behind are we?
# - Is lag growing?
# - Will we ever catch up?
✅ CORRECT: Monitor and alert on lag
async def consume_with_monitoring():
while True:
# Check lag periodically
lag = get_consumer_lag()
if lag > CRITICAL_THRESHOLD:
alert_oncall("Consumer lag critical!")
trigger_scale_up()
elif lag > WARNING_THRESHOLD:
log_warning(f"Consumer lag warning: {lag}")
# Process messages
msg = await consumer.poll()
await process(msg)
8.3 Mistake 3: Rate Limiting Only at Ingestion
❌ WRONG: Rate limit at API, ignore downstream
API Gateway: 1000 req/sec ✓
But internally:
API → ServiceA → ServiceB → Database
│ │
No limit No limit
ServiceB can still overwhelm Database!
✅ CORRECT: Rate limit at every boundary
API Gateway: 1000 req/sec
ServiceA → ServiceB: 500 req/sec per client
ServiceB → Database: Connection pool limit
Each service protects itself from overload.
Defense in depth.
8.4 Mistake 4: Instant Recovery
❌ WRONG: Restore full rate immediately after recovery
During outage:
rate_limit = 100/sec
After recovery:
rate_limit = 10000/sec # INSTANT!
Result:
- 1M queued messages flood downstream
- Downstream overwhelmed
- New outage!
✅ CORRECT: Gradual rate restoration
After recovery:
rate_limit = 100/sec
# Increase gradually
wait 60 seconds, check stability
rate_limit = 500/sec
wait 60 seconds, check stability
rate_limit = 2000/sec
wait 60 seconds, check stability
rate_limit = 5000/sec
wait 60 seconds, check stability
rate_limit = 10000/sec # Full rate
8.5 Mistake Checklist
Before deploying, verify:
- Queues are bounded — Size limits prevent memory exhaustion
- Lag is monitored — Alert before it becomes critical
- Shedding strategy defined — Know what to drop and when
- Priorities assigned — Critical messages protected
- Recovery is gradual — No thundering herd after outage
- Each boundary protected — Defense in depth
- Scaling configured — Auto-scale before manual intervention needed
- Runbooks written — Clear steps for each backpressure level
Part IV: Interview Preparation
Chapter 9: Interview Tips and Phrases
9.1 When to Bring Up Backpressure
Bring up backpressure when:
- Designing async/queue-based systems
- Discussing traffic spikes or burst handling
- The interviewer asks about failure modes
- System has multiple components with different throughputs
- Scale requirements mention "peaks" or "spikes"
9.2 Key Phrases to Use
INTRODUCING BACKPRESSURE:
"One thing we need to consider is backpressure—what happens
when producers send faster than consumers can process.
Without handling this, queues grow unbounded and the system
can crash under load."
DESCRIBING DETECTION:
"I'd monitor three key metrics for backpressure: consumer lag,
queue depth, and processing latency. When any of these exceed
thresholds, we need to respond—either by rate limiting,
shedding load, or scaling up."
EXPLAINING LOAD SHEDDING:
"For this system, I'd implement priority-based load shedding.
Critical messages like orders are never dropped, but
low-priority messages like analytics events can be shed
during peaks. It's better to lose some metrics than crash
the whole system."
DISCUSSING RATE LIMITING:
"At the API level, we'd rate limit producers. But I'd also
use adaptive rate limiting that tightens when consumer lag
grows. This creates natural backpressure—producers slow
down automatically when the system is stressed."
ADDRESSING RECOVERY:
"After an outage, we wouldn't restore full rate immediately—
that would cause a thundering herd as the backlog floods
downstream. Instead, we'd gradually increase the rate,
checking system stability at each step."
9.3 Questions to Ask Interviewer
- "What's the expected traffic pattern—steady or bursty?"
- "Is it acceptable to drop some messages during peaks, or must we process everything?"
- "What's the latency requirement—real-time or eventual?"
- "Can we rate limit at the source, or do we not control the producers?"
9.4 Common Follow-up Questions
| Question | Good Answer |
|---|---|
| "How do you detect backpressure?" | "Monitor consumer lag, queue depth, and processing latency. Set thresholds for warning, critical, and emergency levels. Alert on-call when thresholds exceeded." |
| "What if you can't drop any messages?" | "Then we need buffering (spill to disk/S3), scaling (add consumers), and rate limiting (slow producers). We accept higher latency during peaks rather than losing data." |
| "How do you prioritize messages?" | "Add a priority field. Critical operations (payments, orders) get highest priority and are never shed. Analytics and logs are low priority and shed first during backpressure." |
| "What about cascading backpressure?" | "Use circuit breakers between services. If downstream is slow, fail fast rather than blocking. Each service should have its own buffering and rate limiting." |
Chapter 10: Practice Problems
Problem 1: Notification System Under Load
Setup: You're designing a notification service. Marketing can trigger campaigns sending to 10 million users. The email provider allows 1,000 emails/second.
Requirements:
- Marketing campaigns can't block transactional emails (password reset)
- Provider rate limit must not be exceeded
- All notifications must eventually be sent
Questions:
- How do you prevent marketing campaigns from blocking critical emails?
- How do you handle the 10M notification burst?
- What backpressure mechanisms would you use?
- Priority queues: critical vs bulk
- Rate limiting per category
- Backlog is expected, plan for it
- Consider email batching for efficiency
Architecture:
Notifications → Priority Router → [Critical Queue] → Rate Limiter → Email Provider
→ [Bulk Queue] ↗
Critical queue: Always processed first
Bulk queue: Processed only when critical queue empty
Rate limiter: 1000/sec total
- Critical: guaranteed 200/sec minimum
- Bulk: up to 800/sec when no critical
Backpressure handling:
- Priority separation protects critical emails
- Rate limiting prevents provider overload
- Bulk queue absorbs campaigns (10M / 800 = ~3.5 hours)
- Monitor queue depth, alert if backlog > 24 hours
Problem 2: Real-Time Analytics Pipeline
Setup: You're building a real-time analytics system. Events come from user activity (clicks, views). Dashboard needs < 5 second freshness.
Requirements:
- 100K events/second peak
- < 5 second event-to-dashboard latency
- Some data loss acceptable during extreme peaks
Questions:
- How do you maintain low latency under normal load?
- What happens when events exceed processing capacity?
- How do you balance completeness vs latency?
- Sampling is acceptable for analytics
- Consider aggregation at ingestion
- Track lag and shed when exceeds latency budget
Backpressure strategy:
-
Normal mode (lag < 5s)
- Process all events
- Full accuracy
-
Sampling mode (lag 5-30s)
- Sample 1-in-10 events
- Extrapolate in aggregation
- Log sampling rate for accuracy context
-
Survival mode (lag > 30s)
- Sample 1-in-100 events
- Alert on-call
- Scale up consumers
Implementation:
if lag < 5:
sample_rate = 1.0 # 100%
elif lag < 30:
sample_rate = 0.1 # 10%
else:
sample_rate = 0.01 # 1%
alert_oncall()
if random() < sample_rate:
process(event, weight=1/sample_rate)
Problem 3: Order Processing with Downstream Limits
Setup: Your order service processes orders, then calls:
- Inventory service (500 req/sec limit)
- Payment service (1000 req/sec limit)
- Notification service (200 req/sec limit)
You receive 2000 orders/second during flash sale.
Questions:
- Which downstream service is the bottleneck?
- How do you prevent overwhelming each service?
- How do you handle the order backlog?
- Slowest downstream = system throughput limit
- Bulkhead pattern for isolation
- Consider which calls can be async
Analysis:
- Bottleneck: Notification (200/sec)
- But: Notification can be async!
Redesign:
Order → Inventory (sync) → Payment (sync) → Return to user
↓
Notification (async via queue)
Rate limiting per downstream:
inventory_limiter = RateLimiter(500/sec)
payment_limiter = RateLimiter(1000/sec)
async def process_order(order):
# These limit our throughput to 500/sec
await inventory_limiter.acquire()
await reserve_inventory(order)
await payment_limiter.acquire()
await charge_payment(order)
# This is async, doesn't block
await notification_queue.send(order)
Handling 2000/sec:
- Max sync throughput: 500/sec (inventory limit)
- Backlog: 1500/sec accumulates in order queue
- Flash sale duration × 1500 = backlog size
- Clear backlog after sale ends
Chapter 11: Mock Interview Dialogue
Scenario: Design a Tweet Processing Pipeline
Interviewer: "We need to design the system that processes tweets for the home timeline. When a user tweets, their followers should see it in their timeline. How would you handle backpressure?"
You: "Great question. Let me first understand the scale. How many tweets per second, and how many followers on average?"
Interviewer: "Let's say 10,000 tweets per second, average 500 followers each. Some celebrity accounts have 50 million followers."
You: "So that's 10K tweets/sec × 500 = 5 million timeline updates per second on average, but with huge variance due to celebrities. Let me think through the backpressure scenarios.
The main challenge is fan-out to followers. When a celebrity with 50M followers tweets, that's a massive burst. Here's how I'd handle it:
First, I'd separate celebrities from regular users. Celebrity tweets go to a dedicated high-throughput pipeline with more resources. Regular tweets go through the standard pipeline.
Second, for celebrity tweets, I'd use a pull-on-read model instead of push-on-write. Instead of updating 50M timelines immediately, I'd:
- Store the tweet in a 'celebrity tweets' cache
- When a follower loads their timeline, merge in recent celebrity tweets at read time
This converts a write amplification problem into a read-time merge."
Interviewer: "What about regular users with normal follower counts?"
You: "For regular users, push-on-write works. But I'd still add backpressure handling:
Detection: Monitor:
- Queue depth of timeline update events
- Consumer lag
- Timeline write latency
Response layers:
-
Rate limiting at tweet ingestion If the timeline update queue is backing up, slow down tweet acceptance. Return 429 to clients and let them retry.
-
Priority-based processing High-engagement users (mutual follows, frequent interactions) get higher priority. Updates to inactive users can be delayed.
-
Batch processing Instead of one write per follower, batch updates. Update 100 timelines in one operation.
-
Load shedding for inactive users If a user hasn't logged in for 30 days, skip timeline update. They'll get a full rebuild when they return."
Interviewer: "What happens if the queue grows very large?"
You: "If lag exceeds thresholds, I'd escalate:
Warning (lag > 1 minute):
- Alert on-call
- Scale up consumers
- Tighten rate limit on tweets
Critical (lag > 5 minutes):
- Drop updates for users inactive > 7 days
- Switch more users to pull-on-read
- Consider pausing low-priority features (like retweet notifications)
Emergency (lag > 30 minutes):
- Aggressive shedding—only update highly active users
- Full timeline rebuilds for affected users later
The key is that timeline staleness of a few minutes is acceptable—users won't notice if a tweet appears 2 minutes late. But they will notice if the whole system crashes."
Interviewer: "How would you prevent one celebrity from impacting other users?"
You: "Bulkhead isolation. I'd have separate consumer groups:
- Celebrity timeline updates (high resource allocation)
- Regular timeline updates (normal resources)
If celebrity processing falls behind, it doesn't affect regular users. Each pool has its own backpressure handling.
Also, I'd use partition isolation. Partition by follower ID, so one hot celebrity's updates spread across all partitions rather than overwhelming one."
Summary
DAY 3 KEY TAKEAWAYS
CORE CONCEPT:
• Backpressure = producers outpacing consumers
• Without handling: unbounded queues → memory exhaustion → crash
• Three phases: absorption → exhaustion → collapse
DETECTION:
• Consumer lag: how far behind
• Queue depth: how much waiting
• Processing latency: how slow
• Set thresholds: warning, critical, emergency
RESPONSE STRATEGIES:
• Load shedding: drop low-priority messages
• Rate limiting: slow down producers
• Scaling: add more consumers
• Spillover: buffer overflow to disk/S3
IMPLEMENTATION:
• Priority queues protect critical messages
• Adaptive rate limits respond to lag
• Bounded queues prevent memory exhaustion
• Gradual recovery prevents thundering herd
INTERVIEW TIPS:
• Mention backpressure when designing async systems
• Always have a plan for "what if queue grows unbounded"
• Priority + shedding is common pattern
• Recovery needs to be gradual, not instant
DEFAULT CHOICE:
• Start with rate limiting (least data loss)
• Add load shedding for non-critical data
• Auto-scale for sustained increases
📚 Further Reading
Official Documentation
- Kafka Consumer Configuration: https://kafka.apache.org/documentation/#consumerconfigs
- RabbitMQ Flow Control: https://www.rabbitmq.com/flow-control.html
- AWS SQS Throttling: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-throughput-horizontal-scaling-and-batching.html
Engineering Blogs
- Netflix: "Zuul 2: The Netflix Journey to Async, Non-Blocking Systems" — https://netflixtechblog.com/zuul-2-the-netflix-journey-to-asynchronous-non-blocking-systems-45947377fb5c
- LinkedIn: "Kafka Quotas" — https://www.linkedin.com/blog/engineering/data-management/kafka-ecosystem-at-linkedin
- Uber: "Handling Peak Traffic" — https://eng.uber.com/
Books
- "Release It!" by Michael Nygard — Chapters on stability patterns
- "Designing Data-Intensive Applications" by Martin Kleppmann — Chapter 11: Stream Processing
Papers
- "Congestion Control for High Bandwidth-Delay Product Networks" — TCP congestion control concepts apply
End of Day 3: Backpressure and Flow Control
Tomorrow: Day 4 — Dead Letters and Poison Pills. We've learned how to handle backpressure, but what about messages that simply can't be processed? Tomorrow we'll dive into dead letter queues—how to handle, debug, and replay failed messages without losing data or crashing consumers.