Himanshu Kukreja
0%
LearnSystem DesignWeek 3Backpressure And Flow Control
Day 03

Week 3 — Day 3: Backpressure and Flow Control

System Design Mastery Series


Preface

Black Friday, 2019. An e-commerce platform's marketing team launches their biggest campaign ever: a flash sale notification to 10 million users. Within seconds, 2 million users click through to the site.

The notification service, designed for 5,000 events per second, suddenly receives 200,000 events per second. Here's what happened:

Timeline of Disaster:

09:00:00 - Flash sale notification sent to 10M users
09:00:05 - Click events start flooding in (50K/sec)
09:00:15 - Event queue depth: 500,000 messages
09:00:30 - Queue depth: 2,000,000 messages
09:00:45 - Kafka consumer lag: 3 minutes and growing
09:01:00 - Memory exhaustion on consumer nodes
09:01:15 - Consumers start crashing, rebalancing begins
09:01:30 - Rebalancing causes more lag, cascade begins
09:02:00 - All consumers down, queue depth: 5,000,000
09:05:00 - Database connection pool exhausted
09:05:30 - API servers returning 503s
09:10:00 - Complete site outage

Root cause: No backpressure handling.
The system had no way to say "slow down, I can't keep up."

Yesterday, we learned the transactional outbox pattern—how to reliably publish events. But we left a dangerous question unanswered:

What happens when producers send events faster than consumers can process them?

This is the problem of backpressure, and today we'll learn how to detect it, handle it, and design systems that gracefully degrade instead of catastrophically fail.


Part I: Foundations

Chapter 1: What Is Backpressure?

1.1 The Simple Definition

Backpressure is what happens when a downstream system can't keep up with an upstream system. It's the distributed systems equivalent of a traffic jam.

EVERYDAY ANALOGY: The Highway On-Ramp

Think of a highway during rush hour:

  Surface Street (Producer)     Highway (Consumer)
  Cars arriving: 100/minute     Capacity: 60 cars/minute
  
  What happens?
  
  Minute 1:  100 arrive, 60 merge, 40 waiting
  Minute 2:  100 arrive, 60 merge, 80 waiting
  Minute 3:  100 arrive, 60 merge, 120 waiting
  ...
  Minute 10: 100 arrive, 60 merge, 400 waiting
  
  The on-ramp backs up onto surface streets.
  Surface streets back up.
  Eventually, the entire area is gridlocked.
  
  This is backpressure without flow control.

SOLUTIONS highways use:

  1. METERING LIGHTS: Slow down the on-ramp (rate limiting)
  2. RAMP CLOSURE: Stop new cars entirely (load shedding)
  3. ALTERNATE ROUTES: Divert traffic elsewhere (spillover)
  4. MORE LANES: Increase capacity (scaling)

1.2 Why It Happens in Distributed Systems

In synchronous systems, backpressure is automatic:

SYNCHRONOUS (Self-Regulating):

  Client ──request──▶ Server ──response──▶ Client
                         │
                    Server busy?
                    Client waits.
                    
  Natural backpressure: clients can't send faster than server responds.

ASYNCHRONOUS (No Built-in Regulation):

  Producer ──msg──▶ Queue ──msg──▶ Consumer
       │              │
       │         Queue grows
       │         unbounded!
       │
  Producer doesn't wait.
  Sends as fast as it can.
  
  No natural backpressure: queue absorbs the difference... until it can't.

1.3 The Three Phases of Backpressure

PHASE 1: QUEUE ABSORPTION
─────────────────────────
Producer rate: 1000/sec
Consumer rate: 800/sec
Gap: 200/sec accumulating in queue

Queue is doing its job—buffering the difference.
This is fine... temporarily.


PHASE 2: RESOURCE EXHAUSTION  
────────────────────────────
Queue depth growing: 1M, 2M, 5M messages
Memory usage climbing
Consumer lag growing (minutes, then hours)

Warning signs appear. Action needed.


PHASE 3: SYSTEM COLLAPSE
────────────────────────
Queue hits memory limit → OOM kills
Consumers crash → rebalancing → more lag
Timeouts cascade upstream
Entire system fails

The point of no return. Recovery is painful.

1.4 Key Terminology

Term Definition
Consumer lag How far behind the consumer is (messages or time)
Queue depth Number of messages waiting to be processed
Throughput Messages processed per unit time
Saturation When a resource is at 100% capacity
Load shedding Deliberately dropping messages to survive
Rate limiting Restricting how fast producers can send
Flow control Mechanisms to match producer and consumer rates

Chapter 2: Detecting Backpressure

2.1 Key Metrics to Monitor

Before you can handle backpressure, you need to detect it. These are the critical metrics:

METRIC 1: CONSUMER LAG
──────────────────────
What: Difference between latest offset and consumer's current offset
Where: Kafka, Kinesis, any partitioned stream
Alert: Lag > 10,000 messages or > 5 minutes

Example Kafka lag query:
  kafka-consumer-groups --describe --group my-consumer
  
  TOPIC     PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
  orders    0          1000            5000            4000  ← Problem!
  orders    1          2000            2100            100   ← OK
  orders    2          1500            8000            6500  ← Problem!


METRIC 2: QUEUE DEPTH
─────────────────────
What: Number of messages waiting in queue
Where: RabbitMQ, SQS, Redis queues
Alert: Depth > 100,000 or growing for > 5 minutes

Example RabbitMQ:
  rabbitmqctl list_queues name messages
  
  orders_queue    150000  ← Growing fast


METRIC 3: PROCESSING LATENCY
────────────────────────────
What: Time from message publish to processing complete
Where: Measured in consumer, reported to metrics system
Alert: p99 > 10 seconds (depends on SLA)

Latency breakdown:
  Queue wait time: 8000ms  ← Backpressure signal!
  Processing time: 50ms
  Total latency:   8050ms


METRIC 4: CONSUMER THROUGHPUT
─────────────────────────────
What: Messages processed per second
Where: Consumer metrics
Alert: Throughput dropping while lag increasing

Healthy:   Throughput 1000/sec, Lag stable
Unhealthy: Throughput 500/sec, Lag growing  ← Consumer struggling

2.2 Building a Backpressure Detection System

# Backpressure Detection System

import asyncio
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, List
from enum import Enum
import logging

logger = logging.getLogger(__name__)


class BackpressureLevel(Enum):
    NONE = "none"           # All clear
    WARNING = "warning"     # Early signs, monitor closely
    CRITICAL = "critical"   # Take action now
    EMERGENCY = "emergency" # System at risk of failure


@dataclass
class BackpressureConfig:
    """Thresholds for backpressure detection."""
    # Lag thresholds
    lag_warning: int = 10_000
    lag_critical: int = 100_000
    lag_emergency: int = 1_000_000
    
    # Queue depth thresholds  
    depth_warning: int = 50_000
    depth_critical: int = 200_000
    depth_emergency: int = 1_000_000
    
    # Latency thresholds (seconds)
    latency_warning: float = 5.0
    latency_critical: float = 30.0
    latency_emergency: float = 300.0
    
    # Rate of change thresholds
    lag_growth_rate_warning: float = 1000.0  # messages/sec growth


@dataclass
class BackpressureStatus:
    """Current backpressure status."""
    level: BackpressureLevel
    consumer_lag: int
    queue_depth: int
    processing_latency_p99: float
    lag_growth_rate: float
    timestamp: datetime
    details: str


class BackpressureMonitor:
    """
    Monitors system metrics and detects backpressure conditions.
    """
    
    def __init__(
        self,
        config: BackpressureConfig,
        metrics_client,  # Your metrics system (Prometheus, Datadog, etc.)
    ):
        self.config = config
        self.metrics = metrics_client
        self._lag_history: List[tuple] = []  # (timestamp, lag)
    
    async def check(self) -> BackpressureStatus:
        """Check current backpressure status."""
        # Gather metrics
        lag = await self.metrics.get_consumer_lag()
        depth = await self.metrics.get_queue_depth()
        latency = await self.metrics.get_processing_latency_p99()
        
        # Calculate lag growth rate
        growth_rate = self._calculate_lag_growth_rate(lag)
        
        # Determine level
        level = self._determine_level(lag, depth, latency, growth_rate)
        
        # Build status
        status = BackpressureStatus(
            level=level,
            consumer_lag=lag,
            queue_depth=depth,
            processing_latency_p99=latency,
            lag_growth_rate=growth_rate,
            timestamp=datetime.utcnow(),
            details=self._build_details(lag, depth, latency, growth_rate)
        )
        
        # Log if not healthy
        if level != BackpressureLevel.NONE:
            logger.warning(f"Backpressure detected: {status}")
        
        return status
    
    def _determine_level(
        self, lag: int, depth: int, latency: float, growth_rate: float
    ) -> BackpressureLevel:
        """Determine backpressure level from metrics."""
        
        # Emergency: Any metric at emergency level
        if (lag >= self.config.lag_emergency or
            depth >= self.config.depth_emergency or
            latency >= self.config.latency_emergency):
            return BackpressureLevel.EMERGENCY
        
        # Critical: Any metric at critical level
        if (lag >= self.config.lag_critical or
            depth >= self.config.depth_critical or
            latency >= self.config.latency_critical):
            return BackpressureLevel.CRITICAL
        
        # Warning: Any metric at warning level OR lag growing fast
        if (lag >= self.config.lag_warning or
            depth >= self.config.depth_warning or
            latency >= self.config.latency_warning or
            growth_rate >= self.config.lag_growth_rate_warning):
            return BackpressureLevel.WARNING
        
        return BackpressureLevel.NONE
    
    def _calculate_lag_growth_rate(self, current_lag: int) -> float:
        """Calculate how fast lag is growing (messages/sec)."""
        now = datetime.utcnow()
        self._lag_history.append((now, current_lag))
        
        # Keep only last 60 seconds
        cutoff = now - timedelta(seconds=60)
        self._lag_history = [
            (t, l) for t, l in self._lag_history if t > cutoff
        ]
        
        if len(self._lag_history) < 2:
            return 0.0
        
        # Calculate rate from oldest to newest
        oldest_time, oldest_lag = self._lag_history[0]
        newest_time, newest_lag = self._lag_history[-1]
        
        time_diff = (newest_time - oldest_time).total_seconds()
        if time_diff == 0:
            return 0.0
        
        lag_diff = newest_lag - oldest_lag
        return lag_diff / time_diff
    
    def _build_details(
        self, lag: int, depth: int, latency: float, growth_rate: float
    ) -> str:
        """Build human-readable details string."""
        parts = []
        
        if lag >= self.config.lag_warning:
            parts.append(f"lag={lag:,}")
        if depth >= self.config.depth_warning:
            parts.append(f"depth={depth:,}")
        if latency >= self.config.latency_warning:
            parts.append(f"latency_p99={latency:.1f}s")
        if growth_rate >= self.config.lag_growth_rate_warning:
            parts.append(f"lag_growth={growth_rate:.0f}/sec")
        
        return ", ".join(parts) if parts else "all metrics healthy"

Chapter 3: Backpressure Response Strategies

3.1 Strategy 1: Load Shedding

Drop excess messages to protect the system.

WHEN TO USE:
  • Data loss is acceptable (metrics, logs, analytics)
  • System survival is more important than completeness
  • Processing all messages would cause total failure

HOW IT WORKS:

  Normal:     [msg][msg][msg][msg] → Consumer → Processed
  
  Overloaded: [msg][msg][msg][msg][msg][msg][msg][msg][msg]
                   │
                   ▼
              Load Shedder
                   │
              ┌────┴────┐
              ▼         ▼
           [msg][msg]  [dropped]
              │
              ▼
           Consumer → Processed (subset)

IMPLEMENTATION OPTIONS:

  1. RANDOM SAMPLING: Keep 1 in N messages
     Simple, stateless, but loses ordering context
     
  2. PRIORITY-BASED: Drop low-priority first
     Keep critical messages, shed best-effort
     
  3. RATE-BASED: Accept up to N messages/second
     Predictable load, but bursty traffic gets dropped
     
  4. QUEUE-DEPTH BASED: Shed when queue > threshold
     Reactive, may oscillate
# Load Shedding Implementation

from enum import IntEnum
import random


class Priority(IntEnum):
    CRITICAL = 0   # Never drop (payments, orders)
    HIGH = 1       # Drop only in emergency
    MEDIUM = 2     # Drop in critical+
    LOW = 3        # Drop in warning+


class LoadShedder:
    """
    Drops messages based on priority and backpressure level.
    """
    
    # Which priorities to shed at each level
    SHED_MATRIX = {
        BackpressureLevel.NONE: set(),
        BackpressureLevel.WARNING: {Priority.LOW},
        BackpressureLevel.CRITICAL: {Priority.LOW, Priority.MEDIUM},
        BackpressureLevel.EMERGENCY: {Priority.LOW, Priority.MEDIUM, Priority.HIGH},
        # CRITICAL priority is NEVER shed
    }
    
    def __init__(self, monitor: BackpressureMonitor):
        self.monitor = monitor
        self.shed_count = 0
        self.total_count = 0
    
    async def should_process(self, message: dict) -> bool:
        """
        Decide whether to process or shed this message.
        
        Returns True if message should be processed.
        """
        self.total_count += 1
        
        # Get current backpressure level
        status = await self.monitor.check()
        
        # Get message priority
        priority = Priority(message.get('priority', Priority.MEDIUM))
        
        # Check if this priority should be shed
        priorities_to_shed = self.SHED_MATRIX[status.level]
        
        if priority in priorities_to_shed:
            self.shed_count += 1
            logger.debug(
                f"Shedding message (priority={priority.name}, "
                f"level={status.level.name})"
            )
            return False
        
        return True
    
    def get_shed_rate(self) -> float:
        """Get percentage of messages being shed."""
        if self.total_count == 0:
            return 0.0
        return self.shed_count / self.total_count * 100

3.2 Strategy 2: Rate Limiting Producers

Slow down the source of messages.

WHEN TO USE:
  • Producers can tolerate delays
  • You control the producer code
  • Better to slow down than drop

HOW IT WORKS:

  Without rate limiting:
    Producer → [msg][msg][msg][msg][msg] → Queue (overflow!)
    
  With rate limiting:
    Producer → Rate Limiter → [msg]...[msg]...[msg] → Queue (manageable)
                    │
               "Slow down!"
               "Wait 100ms"

IMPLEMENTATION APPROACHES:

  1. CLIENT-SIDE RATE LIMITING
     Producer checks with rate limiter before sending
     Blocks or backs off if limit exceeded
     
  2. API GATEWAY RATE LIMITING  
     Gateway rejects requests over limit (429 Too Many Requests)
     Producer must handle rejection and retry
     
  3. ADAPTIVE RATE LIMITING
     Rate limit adjusts based on consumer lag
     Tightens when lag grows, loosens when caught up
# Adaptive Rate Limiter

import asyncio
import time
from dataclasses import dataclass


@dataclass
class RateLimitConfig:
    """Configuration for adaptive rate limiting."""
    base_rate: float = 1000.0          # Normal rate (messages/sec)
    min_rate: float = 100.0            # Minimum rate under pressure
    max_rate: float = 5000.0           # Maximum rate when healthy
    
    # Lag thresholds for adjustment
    lag_low: int = 1000                # Below this: increase rate
    lag_high: int = 10000              # Above this: decrease rate
    
    adjustment_factor: float = 0.1     # How much to adjust (10%)


class AdaptiveRateLimiter:
    """
    Rate limiter that adjusts based on consumer lag.
    
    When lag is low: allow higher rate
    When lag is high: reduce rate to let consumer catch up
    """
    
    def __init__(
        self,
        config: RateLimitConfig,
        monitor: BackpressureMonitor
    ):
        self.config = config
        self.monitor = monitor
        self.current_rate = config.base_rate
        self.tokens = config.base_rate
        self.last_update = time.monotonic()
        self._lock = asyncio.Lock()
    
    async def acquire(self, timeout: float = 10.0) -> bool:
        """
        Acquire permission to send a message.
        
        Returns True if acquired, False if timed out.
        """
        deadline = time.monotonic() + timeout
        
        while time.monotonic() < deadline:
            async with self._lock:
                self._refill_tokens()
                
                if self.tokens >= 1:
                    self.tokens -= 1
                    return True
            
            # Wait a bit before retrying
            await asyncio.sleep(0.01)
        
        return False
    
    async def adjust_rate(self):
        """Adjust rate based on current lag. Call periodically."""
        status = await self.monitor.check()
        lag = status.consumer_lag
        
        async with self._lock:
            if lag < self.config.lag_low:
                # Lag is low, we can go faster
                self.current_rate = min(
                    self.current_rate * (1 + self.config.adjustment_factor),
                    self.config.max_rate
                )
                logger.debug(f"Lag low ({lag}), increasing rate to {self.current_rate:.0f}/sec")
                
            elif lag > self.config.lag_high:
                # Lag is high, slow down
                self.current_rate = max(
                    self.current_rate * (1 - self.config.adjustment_factor),
                    self.config.min_rate
                )
                logger.warning(f"Lag high ({lag}), decreasing rate to {self.current_rate:.0f}/sec")
    
    def _refill_tokens(self):
        """Refill tokens based on elapsed time."""
        now = time.monotonic()
        elapsed = now - self.last_update
        self.last_update = now
        
        # Add tokens based on current rate
        self.tokens = min(
            self.tokens + elapsed * self.current_rate,
            self.current_rate  # Cap at 1 second worth
        )

3.3 Strategy 3: Scaling Consumers

Add more processing capacity.

WHEN TO USE:
  • You have headroom to scale (cost, partition count)
  • Backpressure is due to insufficient consumers
  • Traffic increase is sustained, not just a spike

HOW IT WORKS:

  Before scaling (lag growing):
    Partition 0 ──▶ Consumer A (overloaded)
    Partition 1 ──▶ Consumer A (overloaded)
    Partition 2 ──▶ Consumer A (overloaded)
    
  After scaling (lag decreasing):
    Partition 0 ──▶ Consumer A
    Partition 1 ──▶ Consumer B
    Partition 2 ──▶ Consumer C

SCALING CONSTRAINTS:

  1. KAFKA: Consumers ≤ Partitions
     12 partitions = max 12 consumers
     Need more? Must increase partition count (disruptive)
     
  2. SQS: No partition limit
     Scale consumers freely
     But: No ordering guarantees
     
  3. DATABASE: Connection pool limits
     Each consumer needs DB connections
     Scale DB pool or use connection pooling
# Auto-scaling Consumer Group

from kubernetes import client, config as k8s_config


class ConsumerAutoScaler:
    """
    Auto-scales consumer deployment based on lag.
    Uses Kubernetes HPA custom metrics.
    """
    
    def __init__(
        self,
        namespace: str,
        deployment_name: str,
        min_replicas: int = 1,
        max_replicas: int = 10,
        lag_per_replica: int = 10000  # Target lag per consumer
    ):
        k8s_config.load_incluster_config()
        self.apps_v1 = client.AppsV1Api()
        
        self.namespace = namespace
        self.deployment_name = deployment_name
        self.min_replicas = min_replicas
        self.max_replicas = max_replicas
        self.lag_per_replica = lag_per_replica
    
    async def scale_if_needed(self, current_lag: int, partition_count: int):
        """
        Calculate and apply optimal replica count.
        """
        # Get current replicas
        deployment = self.apps_v1.read_namespaced_deployment(
            self.deployment_name, self.namespace
        )
        current_replicas = deployment.spec.replicas
        
        # Calculate desired replicas based on lag
        desired_by_lag = max(1, current_lag // self.lag_per_replica)
        
        # Cap at partition count (can't have more consumers than partitions)
        desired_replicas = min(
            max(self.min_replicas, desired_by_lag),
            min(self.max_replicas, partition_count)
        )
        
        if desired_replicas != current_replicas:
            logger.info(
                f"Scaling {self.deployment_name}: "
                f"{current_replicas} → {desired_replicas} "
                f"(lag={current_lag:,})"
            )
            
            # Apply scale
            deployment.spec.replicas = desired_replicas
            self.apps_v1.patch_namespaced_deployment(
                self.deployment_name,
                self.namespace,
                deployment
            )

3.4 Strategy 4: Spillover Buffering

Overflow to secondary storage when primary is full.

WHEN TO USE:
  • All messages must eventually be processed
  • Primary queue has size limits
  • You can tolerate delayed processing for overflow

HOW IT WORKS:

  Normal flow:
    Producer → Kafka → Consumer
    
  When Kafka consumer is lagging:
    Producer → Kafka → Consumer (processing)
                 │
                 └── Lag detected
                        │
                        ▼
                 Spillover kicks in
                        │
                        ▼
    New messages → S3/Disk → Process later when caught up

SPILLOVER STORAGE OPTIONS:

  1. LOCAL DISK
     Fast, simple
     Limited by disk size
     Lost if machine fails
     
  2. OBJECT STORAGE (S3)
     Unlimited capacity
     Durable
     Higher latency
     
  3. SECONDARY QUEUE
     Another Kafka cluster
     Full queue semantics
     More infrastructure

3.5 Strategy Comparison

Strategy Data Loss Latency Impact Complexity Best For
Load Shedding Yes (controlled) None Low Metrics, logs
Rate Limiting No Higher producer latency Medium API traffic
Scaling No Minimal Medium Sustained load
Spillover No Higher for overflow High All messages critical

Chapter 3: Trade-offs and Considerations

3.1 The Fundamental Trade-off

THE BACKPRESSURE TRILEMMA

You can optimize for two of three:

         Completeness
        (No data loss)
             /\
            /  \
           /    \
          /      \
         /   ??   \
        /          \
       /____________\
  Latency          Stability
  (Fast)        (No crashes)

SCENARIOS:

  1. Completeness + Latency = Unstable
     Process everything fast = overwhelm system during spikes
     
  2. Completeness + Stability = High Latency
     Keep all data, don't crash = lag grows during spikes
     
  3. Latency + Stability = Data Loss
     Stay fast and stable = must shed load during spikes

REALITY: Most systems choose Completeness + Stability
         and accept temporary latency increases.

3.2 Choosing a Strategy

DECISION FRAMEWORK

START: Is data loss acceptable?
       │
       ├── YES: Load Shedding
       │        (Simplest, most effective)
       │
       └── NO: Can you control producers?
               │
               ├── YES: Rate Limiting
               │        (Prevents overload at source)
               │
               └── NO: Can you scale consumers?
                       │
                       ├── YES: Auto-scaling
                       │        (Match capacity to demand)
                       │
                       └── NO: Spillover Buffering
                               (Store overflow, process later)

OFTEN: Combine multiple strategies
       - Rate limit producers (first line of defense)
       - Auto-scale consumers (handle sustained increases)
       - Load shed as last resort (survive spikes)

Part II: Implementation

Chapter 4: Basic Implementation

4.1 Simple Backpressure-Aware Consumer

# Basic Backpressure-Aware Consumer
# WARNING: Not production-ready - for learning only

import asyncio
from typing import Callable, Any


class SimpleBackpressureConsumer:
    """
    Consumer that monitors its own processing rate
    and signals when falling behind.
    """
    
    def __init__(
        self,
        process_fn: Callable[[dict], Any],
        target_rate: float = 100.0,  # Target messages/sec
        max_queue_size: int = 1000
    ):
        self.process_fn = process_fn
        self.target_rate = target_rate
        self.max_queue_size = max_queue_size
        
        self._queue = asyncio.Queue(maxsize=max_queue_size)
        self._processed_count = 0
        self._dropped_count = 0
    
    async def submit(self, message: dict) -> bool:
        """
        Submit a message for processing.
        Returns False if queue is full (backpressure signal).
        """
        try:
            self._queue.put_nowait(message)
            return True
        except asyncio.QueueFull:
            self._dropped_count += 1
            return False  # Backpressure signal!
    
    async def run(self):
        """Process messages from queue."""
        while True:
            message = await self._queue.get()
            
            try:
                await self.process_fn(message)
                self._processed_count += 1
            except Exception as e:
                print(f"Processing error: {e}")
            finally:
                self._queue.task_done()
    
    @property
    def queue_depth(self) -> int:
        return self._queue.qsize()
    
    @property
    def is_under_pressure(self) -> bool:
        return self._queue.qsize() > self.max_queue_size * 0.8


# Usage
async def process_message(msg):
    await asyncio.sleep(0.01)  # Simulate work
    print(f"Processed: {msg}")

async def main():
    consumer = SimpleBackpressureConsumer(process_message)
    
    # Start consumer
    consumer_task = asyncio.create_task(consumer.run())
    
    # Submit messages, handle backpressure
    for i in range(10000):
        success = await consumer.submit({"id": i})
        
        if not success:
            print(f"Backpressure! Queue full at message {i}")
            await asyncio.sleep(0.1)  # Back off

4.2 Understanding the Flow

MESSAGE FLOW WITH BACKPRESSURE HANDLING

Producer                    Consumer
   │                           │
   │  1. Try to send           │
   ├──────────────────────────▶│
   │                           │
   │                    2. Queue full?
   │                           │
   │  3a. Queue has space      │
   │◀─────── Accept ───────────│
   │                           │
   │  3b. Queue full           │
   │◀─────── Reject ───────────│
   │                           │
   │  4. Handle rejection      │
   │     - Back off            │
   │     - Try alternate queue │
   │     - Drop if allowed     │
   │                           │


CONSUMER INTERNAL FLOW

           ┌─────────────────────────────────────┐
           │           Consumer                   │
           │                                      │
Messages ──▶  [Queue]  ──▶  [Processor]  ──▶ Done
           │     │              │                 │
           │     │         Monitor rate           │
           │     │              │                 │
           │     │         Rate dropping?         │
           │     │              │                 │
           │     │         YES: Alert!            │
           │     │              │                 │
           │  Depth growing? ◀──┘                 │
           │     │                                │
           │  YES: Backpressure signal            │
           │                                      │
           └─────────────────────────────────────┘

Chapter 5: Production-Ready Implementation

5.1 Requirements for Production

  1. Multi-level backpressure detection — Warning, critical, emergency
  2. Multiple response strategies — Rate limit, shed, scale
  3. Graceful degradation — Prioritize critical messages
  4. Observability — Metrics, alerts, dashboards
  5. Recovery — Auto-heal when pressure subsides

5.2 Complete Backpressure Management System

# Production-Ready Backpressure Management System

import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List, Callable, Set
from enum import Enum
import logging
import time

logger = logging.getLogger(__name__)


# =============================================================================
# Configuration
# =============================================================================

@dataclass
class BackpressureManagerConfig:
    """Configuration for backpressure management."""
    
    # Detection thresholds
    lag_warning: int = 10_000
    lag_critical: int = 100_000
    lag_emergency: int = 500_000
    
    queue_depth_warning: int = 50_000
    queue_depth_critical: int = 200_000
    
    # Response configuration
    enable_load_shedding: bool = True
    enable_rate_limiting: bool = True
    enable_auto_scaling: bool = True
    
    # Rate limiting
    base_rate_limit: float = 10_000.0  # messages/sec
    min_rate_limit: float = 1_000.0
    
    # Scaling
    min_consumers: int = 1
    max_consumers: int = 20
    scale_up_threshold: int = 50_000    # Lag to trigger scale up
    scale_down_threshold: int = 5_000   # Lag to trigger scale down
    scale_cooldown_seconds: int = 300   # Wait between scaling actions
    
    # Priorities that can be shed at each level
    shed_on_warning: Set[int] = field(default_factory=lambda: {3})      # LOW only
    shed_on_critical: Set[int] = field(default_factory=lambda: {2, 3})  # MEDIUM, LOW
    shed_on_emergency: Set[int] = field(default_factory=lambda: {1, 2, 3})  # All except CRITICAL


class Priority(Enum):
    CRITICAL = 0   # Never shed (payments, orders)
    HIGH = 1       # Shed only in emergency
    MEDIUM = 2     # Shed in critical+
    LOW = 3        # Shed in warning+


# =============================================================================
# Core Backpressure Manager
# =============================================================================

class BackpressureManager:
    """
    Comprehensive backpressure management system.
    
    Handles:
    - Detection: Monitors lag, queue depth, latency
    - Response: Rate limiting, load shedding, scaling
    - Recovery: Auto-adjusts as pressure subsides
    """
    
    def __init__(
        self,
        config: BackpressureManagerConfig,
        metrics_client,        # Your metrics system
        scaler = None,         # Optional: Kubernetes scaler
    ):
        self.config = config
        self.metrics = metrics_client
        self.scaler = scaler
        
        # State
        self.current_level = BackpressureLevel.NONE
        self.current_rate_limit = config.base_rate_limit
        self.last_scale_time: Optional[datetime] = None
        
        # Rate limiter state
        self._tokens = config.base_rate_limit
        self._last_token_update = time.monotonic()
        self._token_lock = asyncio.Lock()
        
        # Metrics
        self.messages_accepted = 0
        self.messages_shed = 0
        self.messages_rate_limited = 0
    
    async def start(self):
        """Start background monitoring loop."""
        asyncio.create_task(self._monitor_loop())
        asyncio.create_task(self._adjust_loop())
        logger.info("Backpressure manager started")
    
    async def should_accept(self, message: Dict[str, Any]) -> bool:
        """
        Decide whether to accept or reject a message.
        
        Applies rate limiting and load shedding based on current pressure.
        
        Args:
            message: Message dict with optional 'priority' field
            
        Returns:
            True if message should be processed, False if shed/limited
        """
        priority = Priority(message.get('priority', Priority.MEDIUM.value))
        
        # 1. Check load shedding
        if self.config.enable_load_shedding:
            if self._should_shed(priority):
                self.messages_shed += 1
                logger.debug(f"Shedding message (priority={priority.name})")
                return False
        
        # 2. Check rate limiting
        if self.config.enable_rate_limiting:
            acquired = await self._acquire_token()
            if not acquired:
                self.messages_rate_limited += 1
                logger.debug("Rate limited message")
                return False
        
        self.messages_accepted += 1
        return True
    
    def _should_shed(self, priority: Priority) -> bool:
        """Check if this priority should be shed at current level."""
        if self.current_level == BackpressureLevel.NONE:
            return False
        
        if self.current_level == BackpressureLevel.WARNING:
            return priority.value in self.config.shed_on_warning
        
        if self.current_level == BackpressureLevel.CRITICAL:
            return priority.value in self.config.shed_on_critical
        
        if self.current_level == BackpressureLevel.EMERGENCY:
            return priority.value in self.config.shed_on_emergency
        
        return False
    
    async def _acquire_token(self, timeout: float = 0.1) -> bool:
        """Try to acquire a rate limit token."""
        async with self._token_lock:
            self._refill_tokens()
            
            if self._tokens >= 1:
                self._tokens -= 1
                return True
        
        return False
    
    def _refill_tokens(self):
        """Refill tokens based on elapsed time and current rate."""
        now = time.monotonic()
        elapsed = now - self._last_token_update
        self._last_token_update = now
        
        self._tokens = min(
            self._tokens + elapsed * self.current_rate_limit,
            self.current_rate_limit  # Cap at 1 second worth
        )
    
    async def _monitor_loop(self):
        """Background loop to monitor backpressure metrics."""
        while True:
            try:
                await self._update_level()
            except Exception as e:
                logger.error(f"Monitor error: {e}")
            
            await asyncio.sleep(1)  # Check every second
    
    async def _update_level(self):
        """Update current backpressure level from metrics."""
        lag = await self.metrics.get_consumer_lag()
        depth = await self.metrics.get_queue_depth()
        
        old_level = self.current_level
        
        # Determine new level
        if lag >= self.config.lag_emergency:
            self.current_level = BackpressureLevel.EMERGENCY
        elif lag >= self.config.lag_critical or depth >= self.config.queue_depth_critical:
            self.current_level = BackpressureLevel.CRITICAL
        elif lag >= self.config.lag_warning or depth >= self.config.queue_depth_warning:
            self.current_level = BackpressureLevel.WARNING
        else:
            self.current_level = BackpressureLevel.NONE
        
        # Log level changes
        if self.current_level != old_level:
            logger.warning(
                f"Backpressure level changed: {old_level.name} → {self.current_level.name} "
                f"(lag={lag:,}, depth={depth:,})"
            )
    
    async def _adjust_loop(self):
        """Background loop to adjust rate limits and scaling."""
        while True:
            try:
                await self._adjust_rate_limit()
                await self._adjust_scaling()
            except Exception as e:
                logger.error(f"Adjust error: {e}")
            
            await asyncio.sleep(10)  # Adjust every 10 seconds
    
    async def _adjust_rate_limit(self):
        """Adjust rate limit based on current level."""
        if not self.config.enable_rate_limiting:
            return
        
        old_rate = self.current_rate_limit
        
        if self.current_level == BackpressureLevel.EMERGENCY:
            # Minimum rate
            self.current_rate_limit = self.config.min_rate_limit
            
        elif self.current_level == BackpressureLevel.CRITICAL:
            # Reduce by 50%
            self.current_rate_limit = max(
                self.config.base_rate_limit * 0.5,
                self.config.min_rate_limit
            )
            
        elif self.current_level == BackpressureLevel.WARNING:
            # Reduce by 25%
            self.current_rate_limit = max(
                self.config.base_rate_limit * 0.75,
                self.config.min_rate_limit
            )
            
        else:
            # Normal rate
            self.current_rate_limit = self.config.base_rate_limit
        
        if abs(self.current_rate_limit - old_rate) > 100:
            logger.info(f"Rate limit adjusted: {old_rate:.0f} → {self.current_rate_limit:.0f}/sec")
    
    async def _adjust_scaling(self):
        """Adjust consumer count based on lag."""
        if not self.config.enable_auto_scaling or not self.scaler:
            return
        
        # Check cooldown
        if self.last_scale_time:
            cooldown = datetime.utcnow() - self.last_scale_time
            if cooldown.total_seconds() < self.config.scale_cooldown_seconds:
                return
        
        lag = await self.metrics.get_consumer_lag()
        current_count = await self.scaler.get_replica_count()
        
        desired_count = current_count
        
        if lag > self.config.scale_up_threshold and current_count < self.config.max_consumers:
            # Scale up
            desired_count = min(current_count + 2, self.config.max_consumers)
            
        elif lag < self.config.scale_down_threshold and current_count > self.config.min_consumers:
            # Scale down
            desired_count = max(current_count - 1, self.config.min_consumers)
        
        if desired_count != current_count:
            logger.info(f"Scaling consumers: {current_count} → {desired_count} (lag={lag:,})")
            await self.scaler.scale_to(desired_count)
            self.last_scale_time = datetime.utcnow()
    
    def get_stats(self) -> Dict[str, Any]:
        """Get current backpressure statistics."""
        total = self.messages_accepted + self.messages_shed + self.messages_rate_limited
        
        return {
            "level": self.current_level.name,
            "rate_limit": self.current_rate_limit,
            "accepted": self.messages_accepted,
            "shed": self.messages_shed,
            "rate_limited": self.messages_rate_limited,
            "shed_rate": (self.messages_shed / total * 100) if total > 0 else 0,
            "limited_rate": (self.messages_rate_limited / total * 100) if total > 0 else 0,
        }


# =============================================================================
# Priority-Aware Producer
# =============================================================================

class BackpressureAwareProducer:
    """
    Producer that respects backpressure signals.
    """
    
    def __init__(
        self,
        backpressure_manager: BackpressureManager,
        kafka_producer,
        topic: str
    ):
        self.bp_manager = backpressure_manager
        self.kafka = kafka_producer
        self.topic = topic
    
    async def send(
        self,
        message: Dict[str, Any],
        priority: Priority = Priority.MEDIUM,
        timeout: float = 10.0
    ) -> bool:
        """
        Send a message, respecting backpressure.
        
        Returns True if sent, False if rejected due to backpressure.
        """
        message['priority'] = priority.value
        
        # Check if we should accept this message
        if not await self.bp_manager.should_accept(message):
            return False
        
        # Send to Kafka
        try:
            await self.kafka.send_and_wait(self.topic, message)
            return True
        except Exception as e:
            logger.error(f"Send failed: {e}")
            return False
    
    async def send_critical(self, message: Dict[str, Any]) -> bool:
        """Send a critical message (never shed)."""
        return await self.send(message, Priority.CRITICAL)
    
    async def send_high(self, message: Dict[str, Any]) -> bool:
        """Send a high-priority message."""
        return await self.send(message, Priority.HIGH)
    
    async def send_low(self, message: Dict[str, Any]) -> bool:
        """Send a low-priority message (first to be shed)."""
        return await self.send(message, Priority.LOW)

5.3 Consumer with Built-in Backpressure Handling

# Consumer with Backpressure-Aware Processing

class BackpressureAwareConsumer:
    """
    Kafka consumer that handles its own backpressure.
    
    Features:
    - Monitors processing rate and queue depth
    - Pauses consumption when falling behind
    - Resumes when caught up
    - Reports metrics for external monitoring
    """
    
    def __init__(
        self,
        kafka_consumer,
        processor: Callable,
        max_batch_size: int = 100,
        max_processing_time: float = 30.0,  # seconds per batch
        pause_threshold: int = 10_000,       # Pause if internal queue > this
        resume_threshold: int = 1_000        # Resume if internal queue < this
    ):
        self.kafka = kafka_consumer
        self.processor = processor
        self.max_batch_size = max_batch_size
        self.max_processing_time = max_processing_time
        self.pause_threshold = pause_threshold
        self.resume_threshold = resume_threshold
        
        self._internal_queue: asyncio.Queue = asyncio.Queue()
        self._paused = False
        self._running = False
        
        # Metrics
        self.messages_processed = 0
        self.batches_processed = 0
        self.processing_time_total = 0.0
    
    async def start(self):
        """Start consumer loops."""
        self._running = True
        asyncio.create_task(self._fetch_loop())
        asyncio.create_task(self._process_loop())
        asyncio.create_task(self._backpressure_loop())
        logger.info("Backpressure-aware consumer started")
    
    async def stop(self):
        """Stop consumer gracefully."""
        self._running = False
        logger.info(
            f"Consumer stopped. Processed: {self.messages_processed:,}, "
            f"Batches: {self.batches_processed:,}, "
            f"Avg batch time: {self._avg_batch_time():.2f}s"
        )
    
    async def _fetch_loop(self):
        """Fetch messages from Kafka into internal queue."""
        while self._running:
            if self._paused:
                await asyncio.sleep(0.1)
                continue
            
            try:
                # Fetch batch from Kafka
                messages = await self.kafka.getmany(
                    timeout_ms=1000,
                    max_records=self.max_batch_size
                )
                
                for tp, msgs in messages.items():
                    for msg in msgs:
                        await self._internal_queue.put(msg)
                
            except Exception as e:
                logger.error(f"Fetch error: {e}")
                await asyncio.sleep(1)
    
    async def _process_loop(self):
        """Process messages from internal queue."""
        batch = []
        batch_start = time.monotonic()
        
        while self._running:
            try:
                # Get message with timeout
                try:
                    msg = await asyncio.wait_for(
                        self._internal_queue.get(),
                        timeout=0.1
                    )
                    batch.append(msg)
                except asyncio.TimeoutError:
                    pass
                
                # Process batch if full or time exceeded
                should_process = (
                    len(batch) >= self.max_batch_size or
                    (len(batch) > 0 and 
                     time.monotonic() - batch_start > self.max_processing_time)
                )
                
                if should_process and batch:
                    await self._process_batch(batch)
                    batch = []
                    batch_start = time.monotonic()
                    
            except Exception as e:
                logger.error(f"Process error: {e}")
    
    async def _process_batch(self, batch: List):
        """Process a batch of messages."""
        start = time.monotonic()
        
        try:
            await self.processor(batch)
            
            elapsed = time.monotonic() - start
            self.messages_processed += len(batch)
            self.batches_processed += 1
            self.processing_time_total += elapsed
            
            # Commit offsets
            await self.kafka.commit()
            
        except Exception as e:
            logger.error(f"Batch processing error: {e}")
            raise
    
    async def _backpressure_loop(self):
        """Monitor internal queue and pause/resume fetching."""
        while self._running:
            queue_size = self._internal_queue.qsize()
            
            if not self._paused and queue_size > self.pause_threshold:
                self._paused = True
                logger.warning(
                    f"Pausing fetch: internal queue at {queue_size:,} "
                    f"(threshold: {self.pause_threshold:,})"
                )
            
            elif self._paused and queue_size < self.resume_threshold:
                self._paused = False
                logger.info(
                    f"Resuming fetch: internal queue at {queue_size:,} "
                    f"(threshold: {self.resume_threshold:,})"
                )
            
            await asyncio.sleep(1)
    
    def _avg_batch_time(self) -> float:
        if self.batches_processed == 0:
            return 0.0
        return self.processing_time_total / self.batches_processed
    
    def get_metrics(self) -> Dict[str, Any]:
        return {
            "internal_queue_size": self._internal_queue.qsize(),
            "is_paused": self._paused,
            "messages_processed": self.messages_processed,
            "batches_processed": self.batches_processed,
            "avg_batch_time_seconds": self._avg_batch_time(),
        }

Chapter 6: Edge Cases and Error Handling

6.1 Edge Case 1: Cascading Backpressure

SCENARIO: Consumer slows down, causing upstream to back up

Service A → Kafka → Service B → Kafka → Service C
                         │
                    Slow/down
                         │
                         ▼
          Backpressure propagates upstream!

TIMELINE:
  T0: Service C slows down
  T1: Service B's output queue backs up
  T2: Service B slows down (waiting on C)
  T3: Kafka topic B fills up
  T4: Service A's output queue backs up
  T5: Service A slows down
  T6: Entire pipeline degraded

SOLUTIONS:

  1. CIRCUIT BREAKER between services
     If C is slow, B stops waiting and fails fast
     
  2. INDEPENDENT BUFFERING at each stage
     Each service can absorb its own backpressure
     
  3. BULKHEAD ISOLATION
     Separate resources per downstream dependency

6.2 Edge Case 2: Recovery Thundering Herd

SCENARIO: System recovers, causing spike from backlog

During outage:
  - 1M messages accumulated in queue
  - Rate limit reduced to 100/sec
  
After recovery:
  - Rate limit restored to 10,000/sec
  - All 1M messages processed at full speed
  - Downstream services overwhelmed!
  
SOLUTION: Gradual rate restoration

Instead of:
  rate = 100 → rate = 10,000 (instant)
  
Do:
  rate = 100
  rate = 500   (after 1 min stable)
  rate = 1,000 (after 1 min stable)
  rate = 2,500 (after 1 min stable)
  rate = 5,000 (after 1 min stable)
  rate = 10,000 (after 1 min stable)
# Gradual Recovery Rate Limiter

class GradualRecoveryRateLimiter:
    """Rate limiter that slowly restores rate after backpressure."""
    
    def __init__(
        self,
        base_rate: float,
        min_rate: float,
        recovery_steps: int = 5,
        step_duration_seconds: float = 60.0
    ):
        self.base_rate = base_rate
        self.min_rate = min_rate
        self.recovery_steps = recovery_steps
        self.step_duration = step_duration_seconds
        
        self.current_rate = base_rate
        self.recovery_start: Optional[datetime] = None
        self.in_recovery = False
    
    def reduce_rate(self, factor: float):
        """Reduce rate immediately during backpressure."""
        self.current_rate = max(self.base_rate * factor, self.min_rate)
        self.in_recovery = False
        self.recovery_start = None
    
    def start_recovery(self):
        """Start gradual rate recovery."""
        if not self.in_recovery:
            self.in_recovery = True
            self.recovery_start = datetime.utcnow()
    
    def get_current_rate(self) -> float:
        """Get current rate, accounting for gradual recovery."""
        if not self.in_recovery:
            return self.current_rate
        
        # Calculate recovery progress
        elapsed = (datetime.utcnow() - self.recovery_start).total_seconds()
        step = int(elapsed / self.step_duration)
        
        if step >= self.recovery_steps:
            # Fully recovered
            self.in_recovery = False
            self.current_rate = self.base_rate
            return self.base_rate
        
        # Interpolate between min and base
        progress = step / self.recovery_steps
        rate_range = self.base_rate - self.current_rate
        
        return self.current_rate + (rate_range * progress)

6.3 Edge Case 3: Uneven Partition Load

SCENARIO: One partition gets much more traffic than others

Partition distribution:
  P0: 100,000 messages (hot partition)
  P1: 10,000 messages
  P2: 8,000 messages
  P3: 12,000 messages
  
Consumer assignment:
  Consumer A → P0 (overloaded!)
  Consumer B → P1, P2, P3 (underloaded)
  
SYMPTOMS:
  - Consumer A lagging
  - Consumer B idle
  - Overall lag growing despite capacity

SOLUTIONS:

  1. BETTER PARTITION KEY
     Don't use hot keys (celebrity users, popular products)
     Add jitter: hash(user_id + random(0-10))
     
  2. MORE PARTITIONS
     More partitions = better distribution
     But: more complexity, rebalancing overhead
     
  3. WEIGHTED ASSIGNMENT
     Assign hot partitions to dedicated consumers
     Manual intervention required

6.4 Error Handling Matrix

Error Detection Response Recovery
Consumer lag growing Metric threshold Rate limit, scale up Auto when caught up
Queue memory exhausted OOM, health check Load shed, pause Restart, clear backlog
Processing timeout Deadline exceeded Fail message, retry Retry with backoff
Downstream unavailable Circuit breaker Pause processing Resume when healthy
Partition imbalance Per-partition lag Rebalance, add partitions Manual tuning

Part III: Real-World Application

Chapter 7: How Big Tech Does It

7.1 Case Study: Netflix — Backpressure in Video Pipeline

NETFLIX VIDEO ENCODING PIPELINE

Scale:
  - 100+ million members
  - Thousands of new titles per year
  - Each title: 100+ encoding jobs (resolutions, codecs)
  
Challenge:
  - Encoding is CPU-intensive (hours per job)
  - New releases cause job spikes
  - Can't drop encoding jobs
  
Architecture:

  Upload → Job Queue → Encoder Fleet → Output
              │
         Backpressure
         Management
              │
     ┌────────┴────────┐
     │                 │
  Priority        Auto-scaling
  Queuing            │
     │          Spot Instances
     │          On-Demand Fallback
  Critical ─────────────────────▶ Dedicated
  (New releases)                   Encoders

Backpressure Strategy:

  1. PRIORITY QUEUES
     - New releases: highest priority
     - Re-encodes: medium priority
     - Archive optimization: lowest priority
     
  2. AUTO-SCALING ENCODER FLEET
     - Scale up Spot instances for surge
     - Fall back to On-Demand if Spot unavailable
     - Scale down during quiet periods
     
  3. RATE LIMITING INGESTION
     - Limit simultaneous uploads
     - Queue uploads when encoder fleet saturated
     
  4. NO LOAD SHEDDING
     - Every encoding job must complete
     - Accept delays during peaks

Results:
  - Handle 10x traffic spikes
  - No encoding jobs lost
  - Acceptable delay during peaks (hours, not days)

7.2 Case Study: LinkedIn — Kafka Quotas

LINKEDIN KAFKA QUOTA SYSTEM

Scale:
  - 7+ trillion messages per day
  - Thousands of producers
  - Multi-tenant clusters

Challenge:
  - One bad producer can impact entire cluster
  - Need fair sharing of resources
  - Must protect cluster stability

Solution: Kafka Quotas

  Broker Configuration:
    producer_byte_rate = 10MB/sec per client
    consumer_byte_rate = 50MB/sec per client
    request_percentage = 50% of broker capacity
    
  When quota exceeded:
    Broker SLOWS DOWN client (delays response)
    Client naturally backs off
    No data loss, just delay

Implementation Details:

  1. PER-CLIENT QUOTAS
     Each producer has byte/sec limit
     Tracked at broker level
     
  2. PER-TOPIC QUOTAS  
     Critical topics get guaranteed bandwidth
     Best-effort topics get remainder
     
  3. RESPONSE THROTTLING
     Instead of rejecting, broker delays response
     Client thinks broker is slow
     Natural backpressure without errors

Metrics Tracked:
  - throttle_time_ms: How long clients are delayed
  - produce_byte_rate: Per-client production rate
  - quota_violation: Count of quota violations

Results:
  - No cluster-wide outages from runaway producers
  - Fair resource sharing
  - Graceful degradation under load

7.3 Case Study: Uber — Surge Pricing Backpressure

UBER REAL-TIME PRICING SYSTEM

Challenge:
  - Price calculation needs real-time supply/demand
  - Events: ride requests, driver locations, completions
  - Millions of events per minute in busy cities
  
Problem:
  - New Year's Eve: 10x normal traffic
  - If pricing falls behind, prices are stale
  - Stale prices = wrong driver incentives

Architecture:

  Events ─▶ Kafka ─▶ Pricing Service ─▶ Price Cache
                          │
                    Backpressure
                          │
              ┌───────────┴───────────┐
              │                       │
         Load Shedding            Sampling
         (old events)            (reduce rate)

Backpressure Strategy:

  1. EVENT EXPIRATION
     Events older than 30 seconds = dropped
     Pricing needs current data, not historical
     
  2. SAMPLING MODE
     When lag > 10 seconds:
       Process 1 in 10 location updates
       Process ALL ride requests (critical)
       
  3. GEOGRAPHIC SHARDING
     Each city = separate processing pipeline
     NYC backpressure doesn't affect LA
     
  4. GRACEFUL DEGRADATION
     If pricing unavailable: use last known price
     Always return SOMETHING to riders

Key Insight:
  "It's better to have an approximate current price
   than an exact price from 5 minutes ago."

Results:
  - Handle New Year's Eve traffic
  - Pricing lag < 5 seconds during peaks
  - No city-wide pricing outages

7.4 Summary: Industry Patterns

Company System Backpressure Strategy
Netflix Encoding Priority queues, auto-scaling, no shedding
LinkedIn Kafka Quotas, throttling, per-client limits
Uber Pricing Event expiration, sampling, sharding
Twitter Timeline Rate limiting, degraded mode, caching
Stripe Webhooks Retry queues, exponential backoff
Discord Messages Per-guild rate limits, presence sampling

Chapter 8: Common Mistakes to Avoid

8.1 Mistake 1: Unbounded Queues

❌ WRONG: Queue with no size limit

messages = []  # Grows forever!

while True:
    msg = receive_message()
    messages.append(msg)  # No limit!
    
Eventually:
  - Memory exhausted
  - OOM killer strikes
  - All messages lost


✅ CORRECT: Bounded queue with rejection

from asyncio import Queue

messages = Queue(maxsize=10000)  # Bounded!

async def receive():
    msg = get_message()
    try:
        messages.put_nowait(msg)
    except QueueFull:
        handle_backpressure(msg)  # Reject, shed, or buffer elsewhere

8.2 Mistake 2: Ignoring Consumer Lag

❌ WRONG: No lag monitoring

# Consume forever, hope for the best
while True:
    msg = consumer.poll()
    process(msg)

# No visibility into:
# - How far behind are we?
# - Is lag growing?
# - Will we ever catch up?


✅ CORRECT: Monitor and alert on lag

async def consume_with_monitoring():
    while True:
        # Check lag periodically
        lag = get_consumer_lag()
        
        if lag > CRITICAL_THRESHOLD:
            alert_oncall("Consumer lag critical!")
            trigger_scale_up()
        elif lag > WARNING_THRESHOLD:
            log_warning(f"Consumer lag warning: {lag}")
        
        # Process messages
        msg = await consumer.poll()
        await process(msg)

8.3 Mistake 3: Rate Limiting Only at Ingestion

❌ WRONG: Rate limit at API, ignore downstream

API Gateway: 1000 req/sec ✓

But internally:
  API → ServiceA → ServiceB → Database
         │            │
      No limit     No limit
      
ServiceB can still overwhelm Database!


✅ CORRECT: Rate limit at every boundary

API Gateway: 1000 req/sec
ServiceA → ServiceB: 500 req/sec per client
ServiceB → Database: Connection pool limit

Each service protects itself from overload.
Defense in depth.

8.4 Mistake 4: Instant Recovery

❌ WRONG: Restore full rate immediately after recovery

During outage:
  rate_limit = 100/sec
  
After recovery:
  rate_limit = 10000/sec  # INSTANT!
  
Result:
  - 1M queued messages flood downstream
  - Downstream overwhelmed
  - New outage!


✅ CORRECT: Gradual rate restoration

After recovery:
  rate_limit = 100/sec
  
  # Increase gradually
  wait 60 seconds, check stability
  rate_limit = 500/sec
  
  wait 60 seconds, check stability
  rate_limit = 2000/sec
  
  wait 60 seconds, check stability  
  rate_limit = 5000/sec
  
  wait 60 seconds, check stability
  rate_limit = 10000/sec  # Full rate

8.5 Mistake Checklist

Before deploying, verify:

  • Queues are bounded — Size limits prevent memory exhaustion
  • Lag is monitored — Alert before it becomes critical
  • Shedding strategy defined — Know what to drop and when
  • Priorities assigned — Critical messages protected
  • Recovery is gradual — No thundering herd after outage
  • Each boundary protected — Defense in depth
  • Scaling configured — Auto-scale before manual intervention needed
  • Runbooks written — Clear steps for each backpressure level

Part IV: Interview Preparation

Chapter 9: Interview Tips and Phrases

9.1 When to Bring Up Backpressure

Bring up backpressure when:

  • Designing async/queue-based systems
  • Discussing traffic spikes or burst handling
  • The interviewer asks about failure modes
  • System has multiple components with different throughputs
  • Scale requirements mention "peaks" or "spikes"

9.2 Key Phrases to Use

INTRODUCING BACKPRESSURE:

"One thing we need to consider is backpressure—what happens 
when producers send faster than consumers can process. 
Without handling this, queues grow unbounded and the system 
can crash under load."


DESCRIBING DETECTION:

"I'd monitor three key metrics for backpressure: consumer lag, 
queue depth, and processing latency. When any of these exceed 
thresholds, we need to respond—either by rate limiting, 
shedding load, or scaling up."


EXPLAINING LOAD SHEDDING:

"For this system, I'd implement priority-based load shedding. 
Critical messages like orders are never dropped, but 
low-priority messages like analytics events can be shed 
during peaks. It's better to lose some metrics than crash 
the whole system."


DISCUSSING RATE LIMITING:

"At the API level, we'd rate limit producers. But I'd also 
use adaptive rate limiting that tightens when consumer lag 
grows. This creates natural backpressure—producers slow 
down automatically when the system is stressed."


ADDRESSING RECOVERY:

"After an outage, we wouldn't restore full rate immediately—
that would cause a thundering herd as the backlog floods 
downstream. Instead, we'd gradually increase the rate, 
checking system stability at each step."

9.3 Questions to Ask Interviewer

  • "What's the expected traffic pattern—steady or bursty?"
  • "Is it acceptable to drop some messages during peaks, or must we process everything?"
  • "What's the latency requirement—real-time or eventual?"
  • "Can we rate limit at the source, or do we not control the producers?"

9.4 Common Follow-up Questions

Question Good Answer
"How do you detect backpressure?" "Monitor consumer lag, queue depth, and processing latency. Set thresholds for warning, critical, and emergency levels. Alert on-call when thresholds exceeded."
"What if you can't drop any messages?" "Then we need buffering (spill to disk/S3), scaling (add consumers), and rate limiting (slow producers). We accept higher latency during peaks rather than losing data."
"How do you prioritize messages?" "Add a priority field. Critical operations (payments, orders) get highest priority and are never shed. Analytics and logs are low priority and shed first during backpressure."
"What about cascading backpressure?" "Use circuit breakers between services. If downstream is slow, fail fast rather than blocking. Each service should have its own buffering and rate limiting."

Chapter 10: Practice Problems

Problem 1: Notification System Under Load

Setup: You're designing a notification service. Marketing can trigger campaigns sending to 10 million users. The email provider allows 1,000 emails/second.

Requirements:

  • Marketing campaigns can't block transactional emails (password reset)
  • Provider rate limit must not be exceeded
  • All notifications must eventually be sent

Questions:

  1. How do you prevent marketing campaigns from blocking critical emails?
  2. How do you handle the 10M notification burst?
  3. What backpressure mechanisms would you use?
  • Priority queues: critical vs bulk
  • Rate limiting per category
  • Backlog is expected, plan for it
  • Consider email batching for efficiency

Architecture:

Notifications → Priority Router → [Critical Queue] → Rate Limiter → Email Provider
                                → [Bulk Queue]    ↗
                                
Critical queue: Always processed first
Bulk queue: Processed only when critical queue empty

Rate limiter: 1000/sec total
  - Critical: guaranteed 200/sec minimum
  - Bulk: up to 800/sec when no critical

Backpressure handling:

  1. Priority separation protects critical emails
  2. Rate limiting prevents provider overload
  3. Bulk queue absorbs campaigns (10M / 800 = ~3.5 hours)
  4. Monitor queue depth, alert if backlog > 24 hours

Problem 2: Real-Time Analytics Pipeline

Setup: You're building a real-time analytics system. Events come from user activity (clicks, views). Dashboard needs < 5 second freshness.

Requirements:

  • 100K events/second peak
  • < 5 second event-to-dashboard latency
  • Some data loss acceptable during extreme peaks

Questions:

  1. How do you maintain low latency under normal load?
  2. What happens when events exceed processing capacity?
  3. How do you balance completeness vs latency?
  • Sampling is acceptable for analytics
  • Consider aggregation at ingestion
  • Track lag and shed when exceeds latency budget

Backpressure strategy:

  1. Normal mode (lag < 5s)

    • Process all events
    • Full accuracy
  2. Sampling mode (lag 5-30s)

    • Sample 1-in-10 events
    • Extrapolate in aggregation
    • Log sampling rate for accuracy context
  3. Survival mode (lag > 30s)

    • Sample 1-in-100 events
    • Alert on-call
    • Scale up consumers

Implementation:

if lag < 5:
    sample_rate = 1.0  # 100%
elif lag < 30:
    sample_rate = 0.1  # 10%
else:
    sample_rate = 0.01 # 1%
    alert_oncall()

if random() < sample_rate:
    process(event, weight=1/sample_rate)

Problem 3: Order Processing with Downstream Limits

Setup: Your order service processes orders, then calls:

  • Inventory service (500 req/sec limit)
  • Payment service (1000 req/sec limit)
  • Notification service (200 req/sec limit)

You receive 2000 orders/second during flash sale.

Questions:

  1. Which downstream service is the bottleneck?
  2. How do you prevent overwhelming each service?
  3. How do you handle the order backlog?
  • Slowest downstream = system throughput limit
  • Bulkhead pattern for isolation
  • Consider which calls can be async

Analysis:

  • Bottleneck: Notification (200/sec)
  • But: Notification can be async!

Redesign:

Order → Inventory (sync) → Payment (sync) → Return to user
                                         ↓
                               Notification (async via queue)

Rate limiting per downstream:

inventory_limiter = RateLimiter(500/sec)
payment_limiter = RateLimiter(1000/sec)

async def process_order(order):
    # These limit our throughput to 500/sec
    await inventory_limiter.acquire()
    await reserve_inventory(order)
    
    await payment_limiter.acquire()
    await charge_payment(order)
    
    # This is async, doesn't block
    await notification_queue.send(order)

Handling 2000/sec:

  • Max sync throughput: 500/sec (inventory limit)
  • Backlog: 1500/sec accumulates in order queue
  • Flash sale duration × 1500 = backlog size
  • Clear backlog after sale ends

Chapter 11: Mock Interview Dialogue

Scenario: Design a Tweet Processing Pipeline

Interviewer: "We need to design the system that processes tweets for the home timeline. When a user tweets, their followers should see it in their timeline. How would you handle backpressure?"

You: "Great question. Let me first understand the scale. How many tweets per second, and how many followers on average?"

Interviewer: "Let's say 10,000 tweets per second, average 500 followers each. Some celebrity accounts have 50 million followers."

You: "So that's 10K tweets/sec × 500 = 5 million timeline updates per second on average, but with huge variance due to celebrities. Let me think through the backpressure scenarios.

The main challenge is fan-out to followers. When a celebrity with 50M followers tweets, that's a massive burst. Here's how I'd handle it:

First, I'd separate celebrities from regular users. Celebrity tweets go to a dedicated high-throughput pipeline with more resources. Regular tweets go through the standard pipeline.

Second, for celebrity tweets, I'd use a pull-on-read model instead of push-on-write. Instead of updating 50M timelines immediately, I'd:

  1. Store the tweet in a 'celebrity tweets' cache
  2. When a follower loads their timeline, merge in recent celebrity tweets at read time

This converts a write amplification problem into a read-time merge."

Interviewer: "What about regular users with normal follower counts?"

You: "For regular users, push-on-write works. But I'd still add backpressure handling:

Detection: Monitor:

  • Queue depth of timeline update events
  • Consumer lag
  • Timeline write latency

Response layers:

  1. Rate limiting at tweet ingestion If the timeline update queue is backing up, slow down tweet acceptance. Return 429 to clients and let them retry.

  2. Priority-based processing High-engagement users (mutual follows, frequent interactions) get higher priority. Updates to inactive users can be delayed.

  3. Batch processing Instead of one write per follower, batch updates. Update 100 timelines in one operation.

  4. Load shedding for inactive users If a user hasn't logged in for 30 days, skip timeline update. They'll get a full rebuild when they return."

Interviewer: "What happens if the queue grows very large?"

You: "If lag exceeds thresholds, I'd escalate:

Warning (lag > 1 minute):

  • Alert on-call
  • Scale up consumers
  • Tighten rate limit on tweets

Critical (lag > 5 minutes):

  • Drop updates for users inactive > 7 days
  • Switch more users to pull-on-read
  • Consider pausing low-priority features (like retweet notifications)

Emergency (lag > 30 minutes):

  • Aggressive shedding—only update highly active users
  • Full timeline rebuilds for affected users later

The key is that timeline staleness of a few minutes is acceptable—users won't notice if a tweet appears 2 minutes late. But they will notice if the whole system crashes."

Interviewer: "How would you prevent one celebrity from impacting other users?"

You: "Bulkhead isolation. I'd have separate consumer groups:

  • Celebrity timeline updates (high resource allocation)
  • Regular timeline updates (normal resources)

If celebrity processing falls behind, it doesn't affect regular users. Each pool has its own backpressure handling.

Also, I'd use partition isolation. Partition by follower ID, so one hot celebrity's updates spread across all partitions rather than overwhelming one."


Summary

DAY 3 KEY TAKEAWAYS

CORE CONCEPT:
• Backpressure = producers outpacing consumers
• Without handling: unbounded queues → memory exhaustion → crash
• Three phases: absorption → exhaustion → collapse

DETECTION:
• Consumer lag: how far behind
• Queue depth: how much waiting  
• Processing latency: how slow
• Set thresholds: warning, critical, emergency

RESPONSE STRATEGIES:
• Load shedding: drop low-priority messages
• Rate limiting: slow down producers
• Scaling: add more consumers
• Spillover: buffer overflow to disk/S3

IMPLEMENTATION:
• Priority queues protect critical messages
• Adaptive rate limits respond to lag
• Bounded queues prevent memory exhaustion
• Gradual recovery prevents thundering herd

INTERVIEW TIPS:
• Mention backpressure when designing async systems
• Always have a plan for "what if queue grows unbounded"
• Priority + shedding is common pattern
• Recovery needs to be gradual, not instant

DEFAULT CHOICE:
• Start with rate limiting (least data loss)
• Add load shedding for non-critical data
• Auto-scale for sustained increases

📚 Further Reading

Official Documentation

Engineering Blogs

Books

  • "Release It!" by Michael Nygard — Chapters on stability patterns
  • "Designing Data-Intensive Applications" by Martin Kleppmann — Chapter 11: Stream Processing

Papers

  • "Congestion Control for High Bandwidth-Delay Product Networks" — TCP congestion control concepts apply

End of Day 3: Backpressure and Flow Control

Tomorrow: Day 4 — Dead Letters and Poison Pills. We've learned how to handle backpressure, but what about messages that simply can't be processed? Tomorrow we'll dive into dead letter queues—how to handle, debug, and replay failed messages without losing data or crashing consumers.