Himanshu Kukreja
0%
Day 02

Week 9 — Day 2: Noisy Neighbor Prevention

System Design Mastery Series — Multi-Tenancy, Security, and Compliance Week


Preface

Yesterday, we learned how to isolate tenant data. But there's another isolation problem:

THE NOISY NEIGHBOR DISASTER

Monday morning, 9:00 AM:

Dashboard alerts fire:
├── API latency: 200ms → 15,000ms
├── Database CPU: 30% → 98%
├── Error rate: 0.1% → 45%
└── Support tickets: Flooding in

You investigate:

┌────────────────────────────────────────────────────────────────────────┐
│                                                                        │
│  TOP QUERIES BY CPU:                                                   │
│                                                                        │
│  1. SELECT * FROM events                                               │
│     WHERE tenant_id = 'acme_corp'                                      │
│     AND created_at > '2020-01-01'                                      │
│     ORDER BY created_at                                                │
│                                                                        │
│     Duration: 847 seconds (still running)                              │
│     Rows scanned: 2.3 billion                                          │
│     Tenant: acme_corp                                                  │
│                                                                        │
│  Source: Someone at Acme Corp ran an "export all data" report          │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Impact:
├── 500 other tenants can't use your product
├── You're losing $50,000/hour in churned customers
├── Your SLA is violated
├── One tenant's legitimate use broke everyone

This is the "noisy neighbor" problem.

Today, we'll learn to prevent one tenant from ruining everyone else's experience through quotas, rate limiting, and fair scheduling.


Part I: Foundations

Chapter 1: Understanding the Noisy Neighbor Problem

1.1 What Is a Noisy Neighbor?

A noisy neighbor is a tenant whose resource consumption negatively impacts other tenants sharing the same infrastructure.

NOISY NEIGHBOR VISUALIZATION

Shared Resources (100% capacity):
┌───────────────────────────────────────────────────────────────────────┐
│                                                                       │
│  NORMAL STATE:                                                        │
│  ┌────────────────────────────────────────────────────────────────┐   │
│  │████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░  │   │
│  │ Tenant A (15%)                                                 │   │
│  └────────────────────────────────────────────────────────────────┘   │
│  ┌────────────────────────────────────────────────────────────────┐   │
│  │██████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░  │   │
│  │ Tenant B (25%)                                                 │   │
│  └────────────────────────────────────────────────────────────────┘   │
│  ┌────────────────────────────────────────────────────────────────┐   │
│  │██████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░  │   │
│  │ Tenant C (20%)                                                 │   │
│  └────────────────────────────────────────────────────────────────┘   │
│                                                                       │
│  Available capacity: 40% — System healthy                             │
│                                                                       │
│  ─────────────────────────────────────────────────────────────────    │
│                                                                       │
│  NOISY NEIGHBOR STATE:                                                │
│  ┌────────────────────────────────────────────────────────────────┐   │
│  │████████████████████████████████████████████████████████████████│   │
│  │ Tenant A (95%) ← NOISY NEIGHBOR                                │   │
│  └────────────────────────────────────────────────────────────────┘   │
│  ┌────────────────────────────────────────────────────────────────┐   │
│  │██░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░  │   │
│  │ Tenant B (3%) ← STARVED                                        │   │
│  └────────────────────────────────────────────────────────────────┘   │
│  ┌────────────────────────────────────────────────────────────────┐   │
│  │█░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░  │   │
│  │ Tenant C (2%) ← STARVED                                        │   │
│  └────────────────────────────────────────────────────────────────┘   │
│                                                                       │
│  Available capacity: 0% — System degraded                             │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

1.2 Types of Noisy Neighbor Scenarios

NOISY NEIGHBOR CATEGORIES

1. CPU-BOUND NOISY NEIGHBOR
   ├── Complex queries or computations
   ├── Infinite loops or bugs
   ├── ML model training on shared infra
   └── Example: Analytics query scanning billions of rows

2. MEMORY-BOUND NOISY NEIGHBOR
   ├── Large in-memory operations
   ├── Memory leaks
   ├── Massive cache fills
   └── Example: Loading entire dataset into memory

3. I/O-BOUND NOISY NEIGHBOR
   ├── Bulk data imports/exports
   ├── Large file uploads
   ├── Sequential table scans
   └── Example: Migrating 10TB of data

4. NETWORK-BOUND NOISY NEIGHBOR
   ├── High request volume
   ├── Large payload transfers
   ├── Webhook storms
   └── Example: API polling every millisecond

5. STORAGE-BOUND NOISY NEIGHBOR
   ├── Excessive data accumulation
   ├── Log explosion
   ├── Never-deleted temp files
   └── Example: Storing every event forever

6. CONNECTION-BOUND NOISY NEIGHBOR
   ├── Connection pool exhaustion
   ├── Long-held connections
   ├── Connection leaks
   └── Example: Opening 1000 database connections

1.3 Why This Is Hard

THE CHALLENGE

Unlike data isolation (clear boundaries), resource isolation is fuzzy:

QUESTIONS WITHOUT EASY ANSWERS:
├── How much CPU is "fair" for a $50/month customer vs $5000/month?
├── Should we throttle a customer doing legitimate work?
├── How do we explain limits without frustrating users?
├── What if a customer's spike is temporary?
└── How do we balance fairness with resource efficiency?

THE BUSINESS TENSION:
├── Too strict: Customers feel limited, churn increases
├── Too loose: Noisy neighbors hurt everyone, churn increases
├── Sweet spot: Hard to find and varies by customer

TECHNICAL CHALLENGES:
├── Real-time enforcement at scale
├── Fair allocation across heterogeneous workloads
├── Graceful degradation when limits are hit
├── Visibility into who's using what
└── Predictable behavior for capacity planning

Chapter 2: Resource Isolation Strategies

2.1 The Resource Isolation Spectrum

ISOLATION APPROACHES

Level 1: NO ISOLATION (Don't do this)
──────────────────────────────────────
Shared resources, first-come-first-served
└── Problem: One tenant can starve all others

Level 2: SOFT LIMITS (Monitoring only)
──────────────────────────────────────
Track usage, alert on abuse, manual intervention
└── Problem: By the time you react, damage is done

Level 3: RATE LIMITING
──────────────────────
Limit requests per second/minute
└── Protects against: Request volume
└── Doesn't protect: Expensive operations

Level 4: RESOURCE QUOTAS
────────────────────────
Hard limits on CPU, memory, connections, storage
└── Protects against: Most noisy neighbors
└── Challenge: Setting appropriate limits

Level 5: FAIR SCHEDULING
────────────────────────
Dynamic resource allocation based on demand
└── Protects against: All scenarios
└── Challenge: Complex to implement

Level 6: PHYSICAL ISOLATION
───────────────────────────
Dedicated resources per tenant
└── Protects against: Everything
└── Challenge: Cost prohibitive for most

2.2 Multi-Layer Defense

DEFENSE IN DEPTH FOR NOISY NEIGHBORS

Layer 1: API GATEWAY
├── Request rate limiting
├── Payload size limits
├── Connection limits
└── Blocks: Volume-based attacks

Layer 2: APPLICATION
├── Query complexity analysis
├── Operation quotas
├── Concurrent operation limits
└── Blocks: Expensive operations

Layer 3: DATABASE
├── Statement timeouts
├── Connection limits per tenant
├── Row limit on queries
└── Blocks: Database abuse

Layer 4: INFRASTRUCTURE
├── CPU/memory quotas (k8s)
├── I/O throttling
├── Network bandwidth limits
└── Blocks: Resource exhaustion

Each layer catches what previous layers missed.

Chapter 3: Quota Design Principles

3.1 Types of Quotas

QUOTA CATEGORIES

1. RATE QUOTAS (per time window)
   ├── Requests per second
   ├── API calls per minute
   ├── Events per hour
   └── Example: 1000 API calls/minute

2. CONCURRENCY QUOTAS (simultaneous)
   ├── Active connections
   ├── Running queries
   ├── Parallel jobs
   └── Example: Max 10 concurrent requests

3. VOLUME QUOTAS (total amount)
   ├── Storage used
   ├── Data transferred
   ├── Records created
   └── Example: 100GB storage limit

4. COMPUTE QUOTAS (processing)
   ├── CPU seconds
   ├── Query execution time
   ├── Batch processing time
   └── Example: 1000 CPU-seconds/hour

5. FEATURE QUOTAS (capability)
   ├── Number of users
   ├── Number of projects
   ├── Number of integrations
   └── Example: Max 50 users

3.2 Quota Tiers by Plan

EXAMPLE QUOTA STRUCTURE

┌────────────────────────────────────────────────────────────────────────┐
│                         QUOTA TIERS                                    │
│                                                                        │
│  Resource              │ Free      │ Pro       │ Enterprise            │
│  ──────────────────────┼───────────┼───────────┼─────────────────────  │
│  API calls/minute      │ 60        │ 1,000     │ 10,000                │
│  Storage (GB)          │ 1         │ 100       │ Unlimited*            │
│  Users                 │ 5         │ 50        │ Unlimited             │
│  Concurrent requests   │ 5         │ 50        │ 200                   │
│  Query timeout (sec)   │ 30        │ 120       │ 300                   │
│  Export rows           │ 1,000     │ 100,000   │ 1,000,000             │
│  Webhooks              │ 5         │ 50        │ 500                   │
│  File upload (MB)      │ 10        │ 100       │ 500                   │
│  Data retention (days) │ 30        │ 365       │ Custom                │
│                                                                        │
│  * "Unlimited" = high limit with fair use policy                       │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Part II: Implementation

Chapter 4: Rate Limiting Implementation

4.1 Token Bucket Algorithm

# noisy_neighbor/rate_limiter.py

"""
Rate limiting implementation using token bucket algorithm.

Token bucket provides:
- Smooth rate limiting (not bursty)
- Allows small bursts within limit
- Simple to implement and reason about
"""

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, Tuple
import asyncio
import time


@dataclass
class RateLimitConfig:
    """Configuration for rate limiting."""
    requests_per_second: float
    burst_size: int  # Maximum tokens (allows short bursts)
    
    @property
    def refill_rate(self) -> float:
        """Tokens added per second."""
        return self.requests_per_second


@dataclass
class RateLimitResult:
    """Result of a rate limit check."""
    allowed: bool
    tokens_remaining: float
    retry_after_seconds: Optional[float] = None
    limit: int = 0
    reset_at: Optional[datetime] = None


class TokenBucket:
    """
    Token bucket rate limiter.
    
    Tokens are added at a constant rate (refill_rate).
    Each request consumes one token.
    Requests are rejected when bucket is empty.
    """
    
    def __init__(self, config: RateLimitConfig):
        self.config = config
        self.tokens = float(config.burst_size)
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()
    
    async def try_acquire(self, tokens: int = 1) -> RateLimitResult:
        """
        Try to acquire tokens from the bucket.
        
        Returns RateLimitResult indicating if request is allowed.
        """
        async with self._lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return RateLimitResult(
                    allowed=True,
                    tokens_remaining=self.tokens,
                    limit=self.config.burst_size
                )
            else:
                # Calculate when tokens will be available
                tokens_needed = tokens - self.tokens
                wait_seconds = tokens_needed / self.config.refill_rate
                
                return RateLimitResult(
                    allowed=False,
                    tokens_remaining=self.tokens,
                    retry_after_seconds=wait_seconds,
                    limit=self.config.burst_size,
                    reset_at=datetime.utcnow() + timedelta(seconds=wait_seconds)
                )
    
    def _refill(self):
        """Refill tokens based on elapsed time."""
        now = time.monotonic()
        elapsed = now - self.last_refill
        
        # Add tokens based on elapsed time
        tokens_to_add = elapsed * self.config.refill_rate
        self.tokens = min(
            self.config.burst_size,
            self.tokens + tokens_to_add
        )
        
        self.last_refill = now


class DistributedRateLimiter:
    """
    Distributed rate limiter using Redis.
    
    Uses Redis for coordination across multiple app instances.
    Implements sliding window log algorithm for accuracy.
    """
    
    def __init__(self, redis_client, config: RateLimitConfig):
        self.redis = redis_client
        self.config = config
    
    async def try_acquire(
        self,
        key: str,
        tokens: int = 1
    ) -> RateLimitResult:
        """
        Try to acquire tokens for a given key.
        
        Key format: "ratelimit:{tenant_id}:{resource}"
        """
        now = time.time()
        window_start = now - 1.0  # 1 second window
        
        # Lua script for atomic operation
        script = """
        local key = KEYS[1]
        local now = tonumber(ARGV[1])
        local window_start = tonumber(ARGV[2])
        local limit = tonumber(ARGV[3])
        local tokens = tonumber(ARGV[4])
        
        -- Remove old entries outside window
        redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)
        
        -- Count current entries in window
        local current = redis.call('ZCARD', key)
        
        if current + tokens <= limit then
            -- Add new entries
            for i = 1, tokens do
                redis.call('ZADD', key, now, now .. ':' .. i .. ':' .. math.random())
            end
            redis.call('EXPIRE', key, 2)  -- Expire after 2 seconds
            return {1, limit - current - tokens}  -- allowed, remaining
        else
            -- Calculate retry after
            local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
            local retry_after = 0
            if #oldest > 0 then
                retry_after = oldest[2] + 1 - now
            end
            return {0, retry_after}  -- denied, retry_after
        end
        """
        
        result = await self.redis.eval(
            script,
            keys=[key],
            args=[now, window_start, self.config.burst_size, tokens]
        )
        
        allowed = result[0] == 1
        
        if allowed:
            return RateLimitResult(
                allowed=True,
                tokens_remaining=result[1],
                limit=self.config.burst_size
            )
        else:
            return RateLimitResult(
                allowed=False,
                tokens_remaining=0,
                retry_after_seconds=max(0, result[1]),
                limit=self.config.burst_size
            )


class TenantRateLimiter:
    """
    Rate limiter with per-tenant configuration.
    
    Different tenants can have different limits based on their plan.
    """
    
    def __init__(self, redis_client, quota_service):
        self.redis = redis_client
        self.quota_service = quota_service
        self._limiters: dict = {}
    
    async def check_rate_limit(
        self,
        tenant_id: str,
        resource: str = "api",
        tokens: int = 1
    ) -> RateLimitResult:
        """
        Check rate limit for a tenant and resource.
        """
        # Get tenant's quota configuration
        quota = await self.quota_service.get_quota(tenant_id, resource)
        
        if not quota:
            # No quota configured = unlimited (careful!)
            return RateLimitResult(allowed=True, tokens_remaining=float('inf'))
        
        # Get or create rate limiter
        limiter_key = f"ratelimit:{tenant_id}:{resource}"
        
        config = RateLimitConfig(
            requests_per_second=quota.requests_per_second,
            burst_size=quota.burst_size
        )
        
        limiter = DistributedRateLimiter(self.redis, config)
        
        result = await limiter.try_acquire(limiter_key, tokens)
        
        # Track usage for analytics
        await self._record_usage(tenant_id, resource, tokens, result.allowed)
        
        return result
    
    async def _record_usage(
        self,
        tenant_id: str,
        resource: str,
        tokens: int,
        allowed: bool
    ):
        """Record usage for monitoring and billing."""
        await self.redis.hincrby(
            f"usage:{tenant_id}:{resource}",
            "total_requests",
            tokens
        )
        
        if not allowed:
            await self.redis.hincrby(
                f"usage:{tenant_id}:{resource}",
                "throttled_requests",
                tokens
            )

4.2 Rate Limiting Middleware

# noisy_neighbor/middleware.py

"""
Middleware for enforcing rate limits on incoming requests.
"""

from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
import logging

logger = logging.getLogger(__name__)


class RateLimitMiddleware(BaseHTTPMiddleware):
    """
    Middleware that enforces rate limits per tenant.
    """
    
    def __init__(self, app, rate_limiter: TenantRateLimiter):
        super().__init__(app)
        self.rate_limiter = rate_limiter
    
    async def dispatch(self, request: Request, call_next):
        # Skip rate limiting for health checks
        if request.url.path in ["/health", "/metrics"]:
            return await call_next(request)
        
        # Get tenant from context (set by auth middleware)
        tenant_id = getattr(request.state, "tenant_id", None)
        
        if not tenant_id:
            # No tenant = no rate limit (or reject, depending on policy)
            return await call_next(request)
        
        # Determine resource type based on endpoint
        resource = self._get_resource_type(request)
        
        # Check rate limit
        result = await self.rate_limiter.check_rate_limit(
            tenant_id=tenant_id,
            resource=resource
        )
        
        if not result.allowed:
            logger.warning(
                "Rate limit exceeded",
                extra={
                    "tenant_id": tenant_id,
                    "resource": resource,
                    "path": request.url.path,
                    "retry_after": result.retry_after_seconds
                }
            )
            
            return JSONResponse(
                status_code=429,
                content={
                    "error": "rate_limit_exceeded",
                    "message": "Too many requests. Please slow down.",
                    "retry_after_seconds": result.retry_after_seconds,
                    "limit": result.limit
                },
                headers={
                    "Retry-After": str(int(result.retry_after_seconds or 1)),
                    "X-RateLimit-Limit": str(result.limit),
                    "X-RateLimit-Remaining": str(int(result.tokens_remaining)),
                }
            )
        
        # Process request
        response = await call_next(request)
        
        # Add rate limit headers to response
        response.headers["X-RateLimit-Limit"] = str(result.limit)
        response.headers["X-RateLimit-Remaining"] = str(int(result.tokens_remaining))
        
        return response
    
    def _get_resource_type(self, request: Request) -> str:
        """
        Determine resource type for rate limiting.
        
        Different endpoints might have different limits.
        """
        path = request.url.path
        
        if path.startswith("/api/search"):
            return "search"  # Search might have lower limits
        elif path.startswith("/api/export"):
            return "export"  # Export might have much lower limits
        elif path.startswith("/api/bulk"):
            return "bulk"    # Bulk operations limited
        else:
            return "api"     # Default API limit

Chapter 5: Resource Quota Implementation

5.1 Quota Service

# noisy_neighbor/quota_service.py

"""
Service for managing and enforcing tenant resource quotas.
"""

from dataclasses import dataclass, field
from typing import Dict, Optional, List
from datetime import datetime, timedelta
from enum import Enum
import logging

logger = logging.getLogger(__name__)


class QuotaType(Enum):
    RATE = "rate"           # Per time window
    CONCURRENT = "concurrent"  # Simultaneous
    VOLUME = "volume"       # Total amount
    COMPUTE = "compute"     # Processing time


@dataclass
class QuotaDefinition:
    """Definition of a quota."""
    name: str
    quota_type: QuotaType
    limit: float
    window_seconds: Optional[int] = None  # For rate quotas
    burst_size: Optional[int] = None      # For rate quotas
    
    # Soft limit for warnings (percentage of limit)
    warning_threshold: float = 0.8
    
    # Action when exceeded
    action: str = "reject"  # reject, throttle, warn


@dataclass
class QuotaUsage:
    """Current usage against a quota."""
    quota_name: str
    current: float
    limit: float
    percentage: float
    is_exceeded: bool
    is_warning: bool
    reset_at: Optional[datetime] = None


@dataclass
class TenantQuotas:
    """All quotas for a tenant."""
    tenant_id: str
    plan: str
    quotas: Dict[str, QuotaDefinition] = field(default_factory=dict)


# Default quotas by plan
DEFAULT_QUOTAS = {
    "free": {
        "api_rate": QuotaDefinition(
            name="api_rate",
            quota_type=QuotaType.RATE,
            limit=60,
            window_seconds=60,
            burst_size=10
        ),
        "storage_gb": QuotaDefinition(
            name="storage_gb",
            quota_type=QuotaType.VOLUME,
            limit=1
        ),
        "concurrent_requests": QuotaDefinition(
            name="concurrent_requests",
            quota_type=QuotaType.CONCURRENT,
            limit=5
        ),
        "query_timeout_seconds": QuotaDefinition(
            name="query_timeout_seconds",
            quota_type=QuotaType.COMPUTE,
            limit=30
        ),
        "users": QuotaDefinition(
            name="users",
            quota_type=QuotaType.VOLUME,
            limit=5
        ),
    },
    "pro": {
        "api_rate": QuotaDefinition(
            name="api_rate",
            quota_type=QuotaType.RATE,
            limit=1000,
            window_seconds=60,
            burst_size=100
        ),
        "storage_gb": QuotaDefinition(
            name="storage_gb",
            quota_type=QuotaType.VOLUME,
            limit=100
        ),
        "concurrent_requests": QuotaDefinition(
            name="concurrent_requests",
            quota_type=QuotaType.CONCURRENT,
            limit=50
        ),
        "query_timeout_seconds": QuotaDefinition(
            name="query_timeout_seconds",
            quota_type=QuotaType.COMPUTE,
            limit=120
        ),
        "users": QuotaDefinition(
            name="users",
            quota_type=QuotaType.VOLUME,
            limit=50
        ),
    },
    "enterprise": {
        "api_rate": QuotaDefinition(
            name="api_rate",
            quota_type=QuotaType.RATE,
            limit=10000,
            window_seconds=60,
            burst_size=1000
        ),
        "storage_gb": QuotaDefinition(
            name="storage_gb",
            quota_type=QuotaType.VOLUME,
            limit=10000  # 10TB, essentially unlimited
        ),
        "concurrent_requests": QuotaDefinition(
            name="concurrent_requests",
            quota_type=QuotaType.CONCURRENT,
            limit=200
        ),
        "query_timeout_seconds": QuotaDefinition(
            name="query_timeout_seconds",
            quota_type=QuotaType.COMPUTE,
            limit=300
        ),
        "users": QuotaDefinition(
            name="users",
            quota_type=QuotaType.VOLUME,
            limit=100000  # Essentially unlimited
        ),
    }
}


class QuotaService:
    """
    Service for managing tenant quotas.
    """
    
    def __init__(self, db, cache, metrics_client):
        self.db = db
        self.cache = cache
        self.metrics = metrics_client
    
    async def get_tenant_quotas(self, tenant_id: str) -> TenantQuotas:
        """
        Get all quotas for a tenant.
        """
        # Check cache
        cache_key = f"quotas:{tenant_id}"
        cached = await self.cache.get(cache_key)
        
        if cached:
            return TenantQuotas(**cached)
        
        # Load tenant plan
        tenant = await self.db.fetchone(
            "SELECT plan, custom_quotas FROM tenants WHERE id = $1",
            tenant_id
        )
        
        if not tenant:
            raise ValueError(f"Tenant not found: {tenant_id}")
        
        # Start with default quotas for plan
        plan = tenant["plan"]
        quotas = DEFAULT_QUOTAS.get(plan, DEFAULT_QUOTAS["free"]).copy()
        
        # Apply any custom overrides
        if tenant["custom_quotas"]:
            for name, override in tenant["custom_quotas"].items():
                if name in quotas:
                    quotas[name] = QuotaDefinition(**{**quotas[name].__dict__, **override})
        
        tenant_quotas = TenantQuotas(
            tenant_id=tenant_id,
            plan=plan,
            quotas=quotas
        )
        
        # Cache for 5 minutes
        await self.cache.set(cache_key, tenant_quotas.__dict__, ttl=300)
        
        return tenant_quotas
    
    async def get_quota(
        self,
        tenant_id: str,
        quota_name: str
    ) -> Optional[QuotaDefinition]:
        """Get a specific quota for a tenant."""
        tenant_quotas = await self.get_tenant_quotas(tenant_id)
        return tenant_quotas.quotas.get(quota_name)
    
    async def check_quota(
        self,
        tenant_id: str,
        quota_name: str,
        requested_amount: float = 1
    ) -> QuotaUsage:
        """
        Check if a quota allows the requested amount.
        """
        quota = await self.get_quota(tenant_id, quota_name)
        
        if not quota:
            # No quota defined = allowed
            return QuotaUsage(
                quota_name=quota_name,
                current=0,
                limit=float('inf'),
                percentage=0,
                is_exceeded=False,
                is_warning=False
            )
        
        # Get current usage based on quota type
        if quota.quota_type == QuotaType.RATE:
            current = await self._get_rate_usage(tenant_id, quota_name, quota)
        elif quota.quota_type == QuotaType.CONCURRENT:
            current = await self._get_concurrent_usage(tenant_id, quota_name)
        elif quota.quota_type == QuotaType.VOLUME:
            current = await self._get_volume_usage(tenant_id, quota_name)
        elif quota.quota_type == QuotaType.COMPUTE:
            current = await self._get_compute_usage(tenant_id, quota_name, quota)
        else:
            current = 0
        
        percentage = (current + requested_amount) / quota.limit
        is_exceeded = (current + requested_amount) > quota.limit
        is_warning = percentage >= quota.warning_threshold
        
        usage = QuotaUsage(
            quota_name=quota_name,
            current=current,
            limit=quota.limit,
            percentage=percentage,
            is_exceeded=is_exceeded,
            is_warning=is_warning
        )
        
        # Record metrics
        await self._record_quota_check(tenant_id, usage)
        
        return usage
    
    async def _get_rate_usage(
        self,
        tenant_id: str,
        quota_name: str,
        quota: QuotaDefinition
    ) -> float:
        """Get rate usage from sliding window."""
        key = f"rate:{tenant_id}:{quota_name}"
        window_start = datetime.utcnow() - timedelta(seconds=quota.window_seconds)
        
        # Count requests in window
        count = await self.cache.zcount(
            key,
            window_start.timestamp(),
            '+inf'
        )
        
        return count
    
    async def _get_concurrent_usage(
        self,
        tenant_id: str,
        quota_name: str
    ) -> float:
        """Get current concurrent operations."""
        key = f"concurrent:{tenant_id}:{quota_name}"
        return await self.cache.get(key) or 0
    
    async def _get_volume_usage(
        self,
        tenant_id: str,
        quota_name: str
    ) -> float:
        """Get total volume usage from database."""
        # Different queries based on quota type
        if quota_name == "storage_gb":
            result = await self.db.fetchone(
                """
                SELECT COALESCE(SUM(size_bytes), 0) / 1073741824.0 as usage_gb
                FROM files WHERE tenant_id = $1
                """,
                tenant_id
            )
            return result["usage_gb"]
        
        elif quota_name == "users":
            result = await self.db.fetchone(
                "SELECT COUNT(*) as count FROM users WHERE tenant_id = $1",
                tenant_id
            )
            return result["count"]
        
        return 0
    
    async def _get_compute_usage(
        self,
        tenant_id: str,
        quota_name: str,
        quota: QuotaDefinition
    ) -> float:
        """Get compute usage in current window."""
        # Compute usage tracked per query, not cumulative
        # For query timeout, we return the timeout limit itself
        return 0  # Single query can use up to the limit
    
    async def _record_quota_check(self, tenant_id: str, usage: QuotaUsage):
        """Record quota check for monitoring."""
        self.metrics.gauge(
            "quota_usage_percentage",
            usage.percentage * 100,
            tags={
                "tenant_id": tenant_id,
                "quota_name": usage.quota_name
            }
        )
        
        if usage.is_warning:
            logger.warning(
                "Quota warning threshold reached",
                extra={
                    "tenant_id": tenant_id,
                    "quota_name": usage.quota_name,
                    "percentage": usage.percentage
                }
            )


class QuotaEnforcer:
    """
    Enforces quotas by rejecting or throttling operations.
    """
    
    def __init__(self, quota_service: QuotaService):
        self.quota_service = quota_service
    
    async def enforce(
        self,
        tenant_id: str,
        quota_name: str,
        requested_amount: float = 1
    ) -> Tuple[bool, Optional[str]]:
        """
        Enforce a quota.
        
        Returns (allowed, error_message).
        """
        usage = await self.quota_service.check_quota(
            tenant_id,
            quota_name,
            requested_amount
        )
        
        if usage.is_exceeded:
            quota = await self.quota_service.get_quota(tenant_id, quota_name)
            
            error_message = (
                f"Quota exceeded for {quota_name}. "
                f"Current: {usage.current:.1f}, Limit: {usage.limit:.1f}. "
                f"Please upgrade your plan or wait for quota reset."
            )
            
            return False, error_message
        
        return True, None

5.2 Concurrent Request Limiting

# noisy_neighbor/concurrency_limiter.py

"""
Concurrency limiting to prevent too many simultaneous operations.
"""

from typing import Optional
import asyncio


class ConcurrencyLimiter:
    """
    Limits concurrent operations per tenant.
    
    Prevents a tenant from overwhelming the system with
    many parallel requests.
    """
    
    def __init__(self, redis_client, default_limit: int = 10):
        self.redis = redis_client
        self.default_limit = default_limit
    
    async def try_acquire(
        self,
        tenant_id: str,
        operation_id: str,
        limit: Optional[int] = None,
        ttl_seconds: int = 300
    ) -> bool:
        """
        Try to acquire a concurrency slot.
        
        Args:
            tenant_id: Tenant identifier
            operation_id: Unique ID for this operation
            limit: Max concurrent operations (None = use default)
            ttl_seconds: Auto-release after this time (prevents leaks)
        
        Returns:
            True if slot acquired, False if limit reached
        """
        limit = limit or self.default_limit
        key = f"concurrent:{tenant_id}"
        
        # Lua script for atomic check-and-set
        script = """
        local key = KEYS[1]
        local operation_id = ARGV[1]
        local limit = tonumber(ARGV[2])
        local ttl = tonumber(ARGV[3])
        local now = tonumber(ARGV[4])
        
        -- Remove expired entries
        redis.call('ZREMRANGEBYSCORE', key, '-inf', now)
        
        -- Check current count
        local current = redis.call('ZCARD', key)
        
        if current < limit then
            -- Add this operation with expiry timestamp
            redis.call('ZADD', key, now + ttl, operation_id)
            return 1
        else
            return 0
        end
        """
        
        import time
        now = time.time()
        
        result = await self.redis.eval(
            script,
            keys=[key],
            args=[operation_id, limit, ttl_seconds, now]
        )
        
        return result == 1
    
    async def release(self, tenant_id: str, operation_id: str):
        """
        Release a concurrency slot.
        
        Call this when operation completes.
        """
        key = f"concurrent:{tenant_id}"
        await self.redis.zrem(key, operation_id)
    
    async def get_current_count(self, tenant_id: str) -> int:
        """Get current concurrent operation count."""
        key = f"concurrent:{tenant_id}"
        
        import time
        now = time.time()
        
        # Clean up expired and count
        await self.redis.zremrangebyscore(key, '-inf', now)
        return await self.redis.zcard(key)


class ConcurrencyContext:
    """
    Context manager for concurrent operations.
    
    Usage:
        async with ConcurrencyContext(limiter, tenant_id, op_id) as acquired:
            if acquired:
                # Do operation
            else:
                # Handle limit exceeded
    """
    
    def __init__(
        self,
        limiter: ConcurrencyLimiter,
        tenant_id: str,
        operation_id: str,
        limit: Optional[int] = None
    ):
        self.limiter = limiter
        self.tenant_id = tenant_id
        self.operation_id = operation_id
        self.limit = limit
        self.acquired = False
    
    async def __aenter__(self) -> bool:
        self.acquired = await self.limiter.try_acquire(
            self.tenant_id,
            self.operation_id,
            self.limit
        )
        return self.acquired
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.acquired:
            await self.limiter.release(self.tenant_id, self.operation_id)
        return False

Chapter 6: Query Complexity and Timeout Management

6.1 Query Analyzer

# noisy_neighbor/query_analyzer.py

"""
Analyze query complexity to prevent expensive operations.
"""

from dataclasses import dataclass
from typing import List, Optional
from enum import Enum
import re


class QueryComplexity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    VERY_HIGH = "very_high"
    BLOCKED = "blocked"


@dataclass
class QueryAnalysis:
    """Result of query analysis."""
    complexity: QueryComplexity
    estimated_rows: int
    estimated_time_ms: int
    warnings: List[str]
    blocked_reason: Optional[str] = None


class QueryComplexityAnalyzer:
    """
    Analyzes SQL queries for complexity.
    
    Used to prevent queries that would consume too many resources.
    """
    
    # Patterns that indicate expensive queries
    EXPENSIVE_PATTERNS = [
        (r'SELECT\s+\*', 'SELECT * can be expensive, specify columns'),
        (r'(?i)CROSS\s+JOIN', 'CROSS JOIN can explode row count'),
        (r'(?i)(?<!NOT\s)LIKE\s+[\'"]%', 'Leading wildcard LIKE prevents index use'),
        (r'(?i)ORDER\s+BY.*RANDOM', 'ORDER BY RANDOM is expensive'),
        (r'(?i)GROUP\s+BY.*HAVING', 'Complex GROUP BY with HAVING'),
    ]
    
    # Patterns that should be blocked entirely
    BLOCKED_PATTERNS = [
        (r'(?i)SELECT.*FROM.*,.*,.*,.*,', 'Too many table joins'),
        (r'(?i)(DELETE|UPDATE).*(?!WHERE)', 'DELETE/UPDATE without WHERE'),
    ]
    
    def __init__(self, db):
        self.db = db
    
    async def analyze(
        self,
        query: str,
        tenant_id: str
    ) -> QueryAnalysis:
        """
        Analyze a query for complexity.
        """
        warnings = []
        
        # Check for blocked patterns
        for pattern, reason in self.BLOCKED_PATTERNS:
            if re.search(pattern, query):
                return QueryAnalysis(
                    complexity=QueryComplexity.BLOCKED,
                    estimated_rows=0,
                    estimated_time_ms=0,
                    warnings=[],
                    blocked_reason=reason
                )
        
        # Check for expensive patterns
        for pattern, warning in self.EXPENSIVE_PATTERNS:
            if re.search(pattern, query):
                warnings.append(warning)
        
        # Use EXPLAIN to estimate cost
        explain_result = await self._explain_query(query, tenant_id)
        
        estimated_rows = explain_result.get("rows", 0)
        estimated_cost = explain_result.get("cost", 0)
        
        # Determine complexity based on estimates
        if estimated_rows > 10_000_000 or estimated_cost > 100000:
            complexity = QueryComplexity.VERY_HIGH
        elif estimated_rows > 1_000_000 or estimated_cost > 10000:
            complexity = QueryComplexity.HIGH
        elif estimated_rows > 100_000 or estimated_cost > 1000:
            complexity = QueryComplexity.MEDIUM
        else:
            complexity = QueryComplexity.LOW
        
        # Estimate time (very rough)
        estimated_time_ms = int(estimated_cost * 0.1)  # Rough estimate
        
        return QueryAnalysis(
            complexity=complexity,
            estimated_rows=estimated_rows,
            estimated_time_ms=estimated_time_ms,
            warnings=warnings
        )
    
    async def _explain_query(self, query: str, tenant_id: str) -> dict:
        """Run EXPLAIN on the query."""
        try:
            # Add tenant filter if not present
            explain_query = f"EXPLAIN (FORMAT JSON) {query}"
            
            result = await self.db.fetchone(explain_query)
            
            if result:
                plan = result[0][0]["Plan"]
                return {
                    "rows": plan.get("Plan Rows", 0),
                    "cost": plan.get("Total Cost", 0)
                }
        except Exception as e:
            # If EXPLAIN fails, assume worst case
            return {"rows": 1_000_000, "cost": 10000}
        
        return {"rows": 0, "cost": 0}


class QueryGuard:
    """
    Guards query execution with complexity and timeout limits.
    """
    
    def __init__(
        self,
        analyzer: QueryComplexityAnalyzer,
        quota_service: QuotaService
    ):
        self.analyzer = analyzer
        self.quota_service = quota_service
    
    async def guard_query(
        self,
        query: str,
        tenant_id: str
    ) -> tuple[bool, Optional[str], Optional[int]]:
        """
        Check if query should be allowed.
        
        Returns:
            (allowed, error_message, timeout_seconds)
        """
        # Get tenant's query timeout quota
        timeout_quota = await self.quota_service.get_quota(
            tenant_id,
            "query_timeout_seconds"
        )
        timeout = int(timeout_quota.limit) if timeout_quota else 30
        
        # Analyze query complexity
        analysis = await self.analyzer.analyze(query, tenant_id)
        
        if analysis.blocked_reason:
            return False, f"Query blocked: {analysis.blocked_reason}", None
        
        if analysis.complexity == QueryComplexity.VERY_HIGH:
            # Check if tenant has permission for very expensive queries
            tenant_quotas = await self.quota_service.get_tenant_quotas(tenant_id)
            
            if tenant_quotas.plan != "enterprise":
                return (
                    False,
                    "Query too complex for your plan. Estimated rows: "
                    f"{analysis.estimated_rows:,}. Consider adding filters or "
                    "upgrading to Enterprise.",
                    None
                )
        
        # Add warnings to response headers
        for warning in analysis.warnings:
            # These would be added to response headers
            pass
        
        return True, None, timeout

6.2 Database Connection Pool per Tenant

# noisy_neighbor/connection_pool.py

"""
Per-tenant connection pool management.

Prevents one tenant from exhausting all database connections.
"""

from typing import Dict, Optional
import asyncpg
import logging

logger = logging.getLogger(__name__)


@dataclass
class PoolConfig:
    """Configuration for a connection pool."""
    min_size: int
    max_size: int
    statement_timeout_ms: int


class TenantConnectionPoolManager:
    """
    Manages database connection pools per tenant.
    
    Each tenant gets their own pool with limits, preventing
    one tenant from exhausting shared connections.
    """
    
    # Pool configurations by plan
    POOL_CONFIGS = {
        "free": PoolConfig(min_size=1, max_size=5, statement_timeout_ms=30000),
        "pro": PoolConfig(min_size=2, max_size=20, statement_timeout_ms=120000),
        "enterprise": PoolConfig(min_size=5, max_size=50, statement_timeout_ms=300000),
    }
    
    def __init__(self, dsn: str, tenant_service):
        self.dsn = dsn
        self.tenant_service = tenant_service
        self._pools: Dict[str, asyncpg.Pool] = {}
        self._lock = asyncio.Lock()
    
    async def get_pool(self, tenant_id: str) -> asyncpg.Pool:
        """
        Get connection pool for a tenant.
        
        Creates pool on first access.
        """
        if tenant_id in self._pools:
            return self._pools[tenant_id]
        
        async with self._lock:
            # Double-check after acquiring lock
            if tenant_id in self._pools:
                return self._pools[tenant_id]
            
            # Get tenant's plan
            tenant = await self.tenant_service.get_tenant(tenant_id)
            plan = tenant.plan if tenant else "free"
            config = self.POOL_CONFIGS.get(plan, self.POOL_CONFIGS["free"])
            
            # Create pool with tenant-specific limits
            pool = await asyncpg.create_pool(
                self.dsn,
                min_size=config.min_size,
                max_size=config.max_size,
                command_timeout=config.statement_timeout_ms / 1000,
                setup=self._setup_connection
            )
            
            self._pools[tenant_id] = pool
            
            logger.info(
                f"Created connection pool for tenant {tenant_id}",
                extra={
                    "tenant_id": tenant_id,
                    "plan": plan,
                    "max_connections": config.max_size
                }
            )
            
            return pool
    
    async def _setup_connection(self, conn):
        """Setup each connection with statement timeout."""
        # Note: Timeout is set at pool creation via command_timeout
        pass
    
    async def get_pool_stats(self, tenant_id: str) -> dict:
        """Get statistics for a tenant's pool."""
        pool = self._pools.get(tenant_id)
        
        if not pool:
            return {"exists": False}
        
        return {
            "exists": True,
            "size": pool.get_size(),
            "free_size": pool.get_idle_size(),
            "used_size": pool.get_size() - pool.get_idle_size(),
            "min_size": pool.get_min_size(),
            "max_size": pool.get_max_size(),
        }
    
    async def close_pool(self, tenant_id: str):
        """Close a tenant's connection pool."""
        pool = self._pools.pop(tenant_id, None)
        
        if pool:
            await pool.close()
            logger.info(f"Closed connection pool for tenant {tenant_id}")
    
    async def close_all(self):
        """Close all connection pools."""
        for tenant_id, pool in list(self._pools.items()):
            await pool.close()
        
        self._pools.clear()

Chapter 7: Fair Scheduling

7.1 Weighted Fair Queue

# noisy_neighbor/fair_scheduler.py

"""
Fair scheduling to ensure all tenants get reasonable service.
"""

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable
from datetime import datetime
import asyncio
import heapq
import logging

logger = logging.getLogger(__name__)


@dataclass
class ScheduledTask:
    """A task scheduled for execution."""
    task_id: str
    tenant_id: str
    priority: int
    enqueued_at: datetime
    payload: Any
    callback: Callable
    
    def __lt__(self, other):
        # Lower priority number = higher priority
        # If same priority, earlier enqueue time wins
        if self.priority != other.priority:
            return self.priority < other.priority
        return self.enqueued_at < other.enqueued_at


class FairScheduler:
    """
    Fair scheduler that ensures no tenant monopolizes resources.
    
    Uses weighted fair queuing:
    - Each tenant gets a weight based on their plan
    - Tasks are scheduled based on weight and wait time
    - Prevents starvation of any tenant
    """
    
    # Weight by plan (higher = more priority)
    PLAN_WEIGHTS = {
        "free": 1,
        "pro": 5,
        "enterprise": 20,
    }
    
    def __init__(self, max_concurrent: int = 10):
        self.max_concurrent = max_concurrent
        self._queue: List[ScheduledTask] = []
        self._tenant_usage: Dict[str, int] = {}  # Running tasks per tenant
        self._tenant_weights: Dict[str, int] = {}
        self._running = 0
        self._lock = asyncio.Lock()
        self._condition = asyncio.Condition()
    
    async def submit(
        self,
        tenant_id: str,
        task_id: str,
        payload: Any,
        callback: Callable,
        plan: str = "free"
    ):
        """
        Submit a task for fair scheduling.
        """
        weight = self.PLAN_WEIGHTS.get(plan, 1)
        self._tenant_weights[tenant_id] = weight
        
        # Calculate priority based on weight and current usage
        # Lower number = higher priority
        current_usage = self._tenant_usage.get(tenant_id, 0)
        priority = (current_usage + 1) / weight  # More usage = lower priority
        
        task = ScheduledTask(
            task_id=task_id,
            tenant_id=tenant_id,
            priority=int(priority * 1000),  # Scale for heap
            enqueued_at=datetime.utcnow(),
            payload=payload,
            callback=callback
        )
        
        async with self._lock:
            heapq.heappush(self._queue, task)
        
        # Signal that new work is available
        async with self._condition:
            self._condition.notify()
    
    async def run(self):
        """
        Main scheduler loop.
        """
        while True:
            task = await self._get_next_task()
            
            if task:
                # Execute task in background
                asyncio.create_task(self._execute_task(task))
    
    async def _get_next_task(self) -> Optional[ScheduledTask]:
        """Get the next task to execute."""
        async with self._condition:
            while True:
                # Wait if at max concurrency
                while self._running >= self.max_concurrent:
                    await self._condition.wait()
                
                # Try to get a task
                async with self._lock:
                    if self._queue:
                        task = heapq.heappop(self._queue)
                        self._running += 1
                        self._tenant_usage[task.tenant_id] = \
                            self._tenant_usage.get(task.tenant_id, 0) + 1
                        return task
                
                # No tasks, wait for signal
                await self._condition.wait()
    
    async def _execute_task(self, task: ScheduledTask):
        """Execute a task and update counts."""
        try:
            await task.callback(task.payload)
        except Exception as e:
            logger.error(
                f"Task failed: {e}",
                extra={
                    "task_id": task.task_id,
                    "tenant_id": task.tenant_id
                }
            )
        finally:
            async with self._condition:
                self._running -= 1
                self._tenant_usage[task.tenant_id] = \
                    max(0, self._tenant_usage.get(task.tenant_id, 0) - 1)
                self._condition.notify()
    
    def get_stats(self) -> dict:
        """Get scheduler statistics."""
        return {
            "queue_length": len(self._queue),
            "running": self._running,
            "max_concurrent": self.max_concurrent,
            "tenant_usage": dict(self._tenant_usage)
        }


class TenantPriorityQueue:
    """
    Priority queue with per-tenant fairness.
    
    Ensures that a tenant with many queued tasks doesn't
    starve tenants with fewer tasks.
    """
    
    def __init__(self, max_per_tenant: int = 100):
        self.max_per_tenant = max_per_tenant
        self._queues: Dict[str, List[Any]] = {}  # Per-tenant queues
        self._round_robin_index = 0
        self._tenant_order: List[str] = []
        self._lock = asyncio.Lock()
    
    async def enqueue(
        self,
        tenant_id: str,
        item: Any
    ) -> bool:
        """
        Enqueue an item for a tenant.
        
        Returns False if tenant's queue is full.
        """
        async with self._lock:
            if tenant_id not in self._queues:
                self._queues[tenant_id] = []
                self._tenant_order.append(tenant_id)
            
            if len(self._queues[tenant_id]) >= self.max_per_tenant:
                return False
            
            self._queues[tenant_id].append(item)
            return True
    
    async def dequeue(self) -> Optional[tuple[str, Any]]:
        """
        Dequeue an item using round-robin across tenants.
        
        Returns (tenant_id, item) or None if empty.
        """
        async with self._lock:
            if not self._tenant_order:
                return None
            
            # Try each tenant in round-robin order
            attempts = len(self._tenant_order)
            
            for _ in range(attempts):
                tenant_id = self._tenant_order[self._round_robin_index]
                self._round_robin_index = \
                    (self._round_robin_index + 1) % len(self._tenant_order)
                
                if self._queues[tenant_id]:
                    item = self._queues[tenant_id].pop(0)
                    
                    # Remove tenant if queue empty
                    if not self._queues[tenant_id]:
                        del self._queues[tenant_id]
                        self._tenant_order.remove(tenant_id)
                        if self._round_robin_index >= len(self._tenant_order):
                            self._round_robin_index = 0
                    
                    return tenant_id, item
            
            return None
    
    def get_queue_depth(self, tenant_id: str) -> int:
        """Get queue depth for a tenant."""
        return len(self._queues.get(tenant_id, []))
    
    def get_total_depth(self) -> int:
        """Get total queue depth across all tenants."""
        return sum(len(q) for q in self._queues.values())

Part III: Real-World Application

Chapter 8: Case Studies

8.1 Salesforce Governor Limits

SALESFORCE GOVERNOR LIMITS

Salesforce pioneered aggressive resource limiting in multi-tenant SaaS:

PER-TRANSACTION LIMITS:
├── Total SOQL queries: 100
├── Total SOQL query rows: 50,000
├── Total DML statements: 150
├── Total DML rows: 10,000
├── CPU time: 10,000 ms
├── Heap size: 6 MB (sync) / 12 MB (async)
└── Callouts: 100

PER-DAY LIMITS (varies by edition):
├── API calls: 15,000 - 1,000,000+
├── Batch Apex: 250,000 executions
├── Email sends: 1,000 - 5,000
└── Data storage: 10 GB - unlimited

HOW THEY ENFORCE:
├── Hard limits - transaction fails immediately
├── Soft limits - warnings in logs
├── Monitoring - usage dashboards in UI
└── Governor limit exceptions with stack trace

RESULTS:
├── 150K+ customers on shared infrastructure
├── Predictable performance
├── Forces efficient code
├── Clear upgrade path (higher limits = higher tier)

LESSONS FOR US:
├── Hard limits are better than soft limits
├── Make limits visible to developers
├── Provide clear error messages
├── Design tiered limits into pricing
└── Limits become a feature, not a bug

8.2 AWS Service Quotas

AWS SERVICE QUOTA MODEL

AWS uses quotas to protect shared services:

EC2 QUOTAS (per region):
├── Running On-Demand instances: varies by type
├── EBS snapshots: 100,000
├── Elastic IPs: 5
└── Security groups per VPC: 2,500

HOW AWS HANDLES QUOTAS:
├── Default quotas per account
├── Quotas dashboard in console
├── API to check current usage
├── Request quota increase via support
├── Automatic increase for some quotas
└── Service Quotas service for management

QUOTA INCREASE PROCESS:
1. Customer requests increase via console
2. AWS reviews request (automated + manual)
3. Increase approved or denied with reason
4. Some increases require account review

KEY INSIGHTS:
├── Quotas are per-account AND per-region
├── Most quotas can be increased
├── Some hard limits exist (physics/security)
├── Transparency builds trust
└── Self-service where possible

LESSONS FOR US:
├── Make quota increases possible
├── Provide self-service for simple increases
├── Document why limits exist
├── Regional quotas for data residency
└── API for quota checking

8.3 Stripe Rate Limiting

STRIPE RATE LIMITING MODEL

Stripe uses sophisticated rate limiting:

DEFAULT LIMITS:
├── Test mode: 25 requests/second
├── Live mode: 100 requests/second (can be increased)
├── Webhooks: Not counted toward limit
└── Files API: Separate limits

RATE LIMIT HEADERS:
├── RateLimit-Limit: Max requests
├── RateLimit-Remaining: Remaining requests
├── RateLimit-Reset: Unix timestamp for reset

RESPONSE ON LIMIT:
├── HTTP 429 Too Many Requests
├── Clear retry-after header
├── Detailed error message
└── Request ID for support

INTELLIGENT FEATURES:
├── Burst allowance (short spikes OK)
├── Per-API limits (not all endpoints equal)
├── Automatic backoff in SDKs
├── Usage-based limit increases
└── Enterprise custom limits

LESSONS FOR US:
├── Include rate limit headers in ALL responses
├── Make SDK handle retries automatically
├── Different limits for different operations
├── Burst allowance improves UX
└── Clear communication when limited

Chapter 9: Common Mistakes

9.1 Noisy Neighbor Prevention Anti-Patterns

COMMON MISTAKES

❌ MISTAKE 1: No Limits at All

Wrong:
  # Accept any request, any size, any frequency
  @app.post("/api/import")
  async def import_data(data: List[Record]):
      for record in data:  # Could be millions
          await db.insert(record)

Problem:
  One import of 10M records blocks everything

Right:
  @app.post("/api/import")
  async def import_data(data: List[Record]):
      if len(data) > 10000:
          raise HTTPException(400, "Max 10,000 records per import")
      
      # Use background job for large imports
      job_id = await queue_import_job(data)
      return {"job_id": job_id}


❌ MISTAKE 2: Global Limits Instead of Per-Tenant

Wrong:
  # One rate limiter for all tenants
  rate_limiter = RateLimiter(requests_per_second=1000)
  
  @app.middleware
  async def rate_limit(request, call_next):
      if not rate_limiter.allow():
          return Response(status_code=429)
      return await call_next(request)

Problem:
  One tenant doing 900 req/sec leaves only 100 for everyone else

Right:
  @app.middleware
  async def rate_limit(request, call_next):
      tenant_id = request.state.tenant_id
      if not await tenant_rate_limiter.allow(tenant_id):
          return Response(status_code=429)
      return await call_next(request)


❌ MISTAKE 3: No Query Timeouts

Wrong:
  @app.get("/api/reports")
  async def get_report():
      # No timeout - can run forever
      result = await db.fetch("SELECT * FROM events WHERE ...")
      return result

Problem:
  Query runs for 30 minutes, holding connections

Right:
  @app.get("/api/reports")
  async def get_report():
      try:
          async with asyncio.timeout(30):  # 30 second limit
              result = await db.fetch("SELECT * FROM events WHERE ...")
          return result
      except asyncio.TimeoutError:
          raise HTTPException(408, "Query timed out. Try adding filters.")


❌ MISTAKE 4: Silent Throttling

Wrong:
  if rate_limited:
      # Just delay the request silently
      await asyncio.sleep(10)
      return await process_request()

Problem:
  User doesn't know why things are slow
  Holding connections during sleep

Right:
  if rate_limited:
      return JSONResponse(
          status_code=429,
          content={
              "error": "rate_limit_exceeded",
              "message": "You've exceeded your rate limit of 100 req/min",
              "retry_after": 30,
              "upgrade_url": "https://example.com/pricing"
          },
          headers={"Retry-After": "30"}
      )


❌ MISTAKE 5: Same Limits for All Operations

Wrong:
  # 1000 requests/minute for everything
  API_RATE_LIMIT = 1000

Problem:
  1000 export requests = database meltdown
  1000 health checks = no big deal

Right:
  RATE_LIMITS = {
      "default": 1000,
      "search": 100,     # Search is expensive
      "export": 10,      # Export is very expensive
      "bulk_create": 5,  # Bulk operations limited
      "health": 10000,   # Health checks are cheap
  }

Part IV: Interview Preparation

Chapter 10: Interview Tips

10.1 Noisy Neighbor Discussion Framework

DISCUSSING NOISY NEIGHBORS IN INTERVIEWS

When the topic comes up:

1. IDENTIFY THE RESOURCES AT RISK
   "In this system, the main resources that could be exhausted are:
    - Database connections and CPU
    - API server capacity
    - Network bandwidth
    - Storage I/O"

2. PROPOSE MULTI-LAYER DEFENSE
   "I'd implement protection at multiple layers:
    - API Gateway: Request rate limiting
    - Application: Concurrency limits, query analysis
    - Database: Connection pools, statement timeouts
    - Infrastructure: Resource quotas per tenant"

3. EXPLAIN QUOTA DESIGN
   "Quotas would vary by pricing tier:
    - Free: 60 API calls/minute, 5 concurrent
    - Pro: 1000 API calls/minute, 50 concurrent
    - Enterprise: 10,000 API calls/minute, 200 concurrent
    This creates upgrade incentive while protecting the platform."

4. ADDRESS USER EXPERIENCE
   "When limits are hit, clear communication is crucial:
    - HTTP 429 with retry-after header
    - Dashboard showing current usage
    - Proactive alerts before hitting limits
    - Clear upgrade path"

10.2 Key Phrases

NOISY NEIGHBOR KEY PHRASES

On Rate Limiting:
"For rate limiting, I'd use token bucket algorithm with per-tenant
buckets. This allows short bursts while maintaining overall limits.
The key is making limits visible - rate limit headers in every response."

On Resource Quotas:
"Resource quotas need to be enforced at multiple levels. API rate limits
catch volume attacks, but a single expensive query can still cause damage.
That's why we also need query timeouts, connection limits, and compute quotas."

On Fair Scheduling:
"To ensure fairness, I'd use weighted fair queuing. Each tenant gets
a weight based on their plan, and tasks are scheduled to prevent any
tenant from monopolizing resources. This is especially important for
background job processing."

On Graceful Degradation:
"When a tenant hits their limit, the system should degrade gracefully.
Rather than error immediately, we can queue requests, return cached data,
or provide reduced functionality. The goal is to never completely block
a paying customer."

On Monitoring:
"Visibility is crucial for noisy neighbor prevention. I'd track
per-tenant resource usage in real-time, with dashboards for both
operators and customers. Alerts would fire when tenants approach
80% of their quota, allowing proactive intervention."

Chapter 11: Practice Problems

Problem 1: Database Connection Exhaustion

Scenario: Your multi-tenant SaaS has 1,000 tenants sharing a PostgreSQL database with max 500 connections. One tenant opens a connection for each API request and doesn't close them properly.

Questions:

  1. How do you detect this is happening?
  2. How do you prevent it from affecting other tenants?
  3. What's your long-term solution?
  • Monitor connections per tenant
  • Per-tenant connection pools with max size
  • Connection timeout/idle timeout
  • Circuit breaker when pool exhausted
  • Alert on connection leak patterns

Problem 2: Export Feature Abuse

Scenario: You have an "Export to CSV" feature. A tenant triggers exports of their entire 10GB dataset every 5 minutes, causing database performance issues.

Questions:

  1. How do you limit exports without breaking the feature?
  2. What's the user experience when limited?
  3. How would you handle legitimate large exports?
  • Rate limit exports (e.g., 5/hour)
  • Row limits per export
  • Background job for large exports
  • Incremental/streaming exports
  • Caching of repeated exports
  • Charge for large exports

Chapter 12: Sample Interview Dialogue

Interviewer: "In the system you're designing, how would you prevent one customer from affecting others?"

You: "Great question — this is the noisy neighbor problem. I'd implement protection at multiple layers.

First, at the API layer, per-tenant rate limiting using token bucket algorithm. Different limits by plan — maybe 60/minute for free, 1000/minute for pro. Every response includes rate limit headers so clients can track their usage.

Second, at the application layer, I'd add concurrency limits. A tenant might be within their rate limit but running 50 expensive operations simultaneously. I'd limit concurrent requests to maybe 10 for free tier, 50 for pro.

Third, at the database layer, per-tenant connection pools and query timeouts. Free tier gets max 5 connections with 30-second timeout. Enterprise gets 50 connections with 5-minute timeout. This prevents both connection exhaustion and runaway queries.

Let me draw the flow..."

Request → API Gateway (rate limit check)
              ↓
        Application (concurrency check)
              ↓
        Query Guard (complexity analysis)
              ↓
        Connection Pool (per-tenant limit)
              ↓
        Database (statement timeout)

Interviewer: "What happens when a customer hits their limit?"

You: "The user experience is critical. When rate limited, they get HTTP 429 with a clear message: 'You've made 60 requests in the last minute. Your limit is 60/minute. Retry in 30 seconds.' The response includes Retry-After header.

For proactive communication, the dashboard shows current usage against limits. We'd send email alerts at 80% usage: 'You're approaching your API limit. Consider upgrading or optimizing your integration.'

We'd also have a soft limit before the hard limit — at 80%, requests still succeed but include a warning header. This gives developers time to react before hitting the wall."

Interviewer: "How would you handle a sudden spike in legitimate usage?"

You: "I'd design for burst tolerance. Token bucket allows short bursts — if a tenant normally uses 50% of their limit, they've accumulated tokens and can burst to maybe 150% briefly.

For sustained increases, we could offer temporary quota increases via API. The customer calls an endpoint to request a 2x quota for the next hour, explaining why. Simple requests auto-approve; large requests queue for review.

For enterprise customers, we might offer dedicated resources — their own database or worker pool — so their spikes only affect themselves."


Summary

DAY 2 KEY TAKEAWAYS

NOISY NEIGHBOR CATEGORIES:
├── CPU-bound (expensive queries)
├── Memory-bound (large operations)
├── I/O-bound (bulk imports)
├── Network-bound (high request volume)
├── Connection-bound (pool exhaustion)
└── Storage-bound (data accumulation)

DEFENSE LAYERS:
├── API Gateway: Rate limiting, payload limits
├── Application: Concurrency, query analysis
├── Database: Timeouts, connection pools
└── Infrastructure: CPU/memory quotas

RATE LIMITING:
├── Token bucket for smooth limiting
├── Per-tenant, not global
├── Different limits for different operations
├── Include headers in every response
└── Burst allowance for UX

QUOTA DESIGN:
├── Rate quotas (per time window)
├── Concurrency quotas (simultaneous)
├── Volume quotas (total amount)
├── Compute quotas (processing time)
└── Tiered by pricing plan

FAIR SCHEDULING:
├── Weighted by plan
├── Round-robin across tenants
├── Prevent starvation
└── Priority for paying customers

USER EXPERIENCE:
├── Clear error messages
├── Retry-After headers
├── Usage dashboards
├── Proactive alerts
└── Upgrade path

DEFAULT APPROACH:
├── Start with per-tenant rate limits
├── Add query timeouts early
├── Monitor before you enforce
└── Communicate limits clearly

Further Reading

Documentation:

Engineering Blogs:

  • Stripe: "Scaling Stripe's Rate Limiter"
  • Figma: "How Figma's multiplayer technology works"
  • Cloudflare: "Rate Limiting at Cloudflare"

Tools:

  • Redis Cell (rate limiting module)
  • resilience4j (Java circuit breaker/rate limiter)

End of Day 2: Noisy Neighbor Prevention

Tomorrow: Day 3 — Data Residency and GDPR. We'll learn how to keep EU data in the EU while running a global platform.