Himanshu Kukreja
0%
LearnSystem DesignWeek 2Webhook Delivery
Day 04

Week 2 — Day 4: Webhook Delivery System

System Design Mastery Series


Preface: From Synchronous to Asynchronous

For the past three days, we've been working in a synchronous world:

User clicks "Pay" → Your server → Bank API → Response → User sees result

Everything happens in a single request-response cycle. The user waits while you call downstream services. We added:

  • Day 1: Timeouts (don't wait forever)
  • Day 2: Idempotency (safe to retry)
  • Day 3: Circuit breakers (fail fast when broken)

But what happens when you need to notify EXTERNAL systems about events?

Order completed in your system
  → Need to notify: Warehouse, Accounting, Analytics, Partner systems
  → These systems are outside your control
  → They might be slow, down, or misconfigured
  → You can't make the user wait for all of them

This is where webhooks come in.

What Are Webhooks?

Webhooks are HTTP callbacks. When an event occurs in your system, you make an HTTP request to a URL that someone else has registered:

Traditional API (Pull):
  Partner: "Do you have any new orders?"
  You: "Yes, here's order #123"
  Partner: "Do you have any new orders?"
  You: "No"
  Partner: "Do you have any new orders?"
  You: "Yes, here's order #124"
  
  Problem: Wasteful polling, delayed updates

Webhooks (Push):
  Partner: "Call me at https://partner.com/webhook when you have new orders"
  You: [Order #123 created]
  You: POST https://partner.com/webhook {"event": "order.created", "order": {...}}
  Partner: "Got it, thanks!"
  
  Benefit: Real-time, efficient

Why Webhooks Are Hard

Webhooks seem simple, but they're deceptively difficult:

  1. Receivers are unreliable: They go down, timeout, return errors
  2. Networks are unreliable: Requests get lost, connections drop
  3. Scale is challenging: Sending millions of webhooks quickly
  4. Ordering is tricky: Events might arrive out of order
  5. Security is critical: Anyone could send fake webhooks
  6. Observability is hard: Did the webhook arrive? Did they process it?

Today, we'll design a production webhook delivery system that handles all of these challenges.


Part I: Foundations

Chapter 1: Delivery Guarantees

1.1 The Three Guarantees

In distributed systems, there are three delivery guarantees:

┌─────────────────────────────────────────────────────────────────────────┐
│                        Delivery Guarantees                               │
│                                                                          │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐         │
│  │                 │  │                 │  │                 │         │
│  │  AT-MOST-ONCE   │  │  AT-LEAST-ONCE  │  │  EXACTLY-ONCE   │         │
│  │                 │  │                 │  │                 │         │
│  │  Send once,     │  │  Retry until    │  │  Delivered      │         │
│  │  don't retry    │  │  acknowledged   │  │  exactly once   │         │
│  │                 │  │                 │  │                 │         │
│  │  May lose       │  │  May duplicate  │  │  No loss, no    │         │
│  │  messages       │  │  messages       │  │  duplicates     │         │
│  │                 │  │                 │  │                 │         │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘         │
│                                                                          │
│  Easiest             Most Common           Impossible*                  │
│                                                                          │
│  * True exactly-once is impossible in distributed systems.              │
│    We approximate it with at-least-once + idempotent receivers.         │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

Let's understand each guarantee in depth:

At-Most-Once

The simplest approach: try once and forget.

def send_webhook_at_most_once(url, payload):
    try:
        response = requests.post(url, json=payload, timeout=5)
        return response.status_code == 200
    except:
        return False  # Don't retry, accept the loss

When to use at-most-once:

  • Analytics and tracking events (losing a few is acceptable)
  • Logging and debugging notifications
  • Non-critical status updates

When NOT to use:

  • Payment notifications
  • Order confirmations
  • Any event where loss causes business impact

At-Least-Once

The standard for webhooks: retry until acknowledged.

def send_webhook_at_least_once(url, payload, max_retries=5):
    for attempt in range(max_retries):
        try:
            response = requests.post(url, json=payload, timeout=5)
            if response.status_code == 200:
                return True  # Success!
        except:
            pass
        
        # Exponential backoff before retry
        time.sleep(2 ** attempt)
    
    return False  # Move to dead letter queue

The duplicate problem:

Timeline:
  T=0:    You send webhook
  T=29s:  Receiver processes it, sends "200 OK"
  T=30s:  Your timeout fires (before 200 arrives)
  T=30s:  You think it failed, schedule retry
  T=31s:  You receive the 200, but already scheduled retry
  T=60s:  You retry - duplicate!

This is unavoidable. The receiver MUST handle duplicates.

Exactly-Once (The Myth)

True exactly-once is impossible in distributed systems. Here's the fundamental problem:

The Two Generals Problem:

General A wants to coordinate attack with General B.
- A sends messenger: "Attack at dawn"
- B receives message
- B sends messenger back: "Confirmed"
- A receives confirmation

But what if B's messenger is captured?
- B thinks A knows
- A doesn't know B confirmed
- A sends another messenger
- Now B has two messages

No amount of acknowledgments solves this.
After N confirmations, you still don't know if the Nth was received.

The practical solution: At-least-once delivery + idempotent receivers.

This is exactly what we learned in Day 2 — idempotency keys make operations safe to repeat. The webhook system delivers at-least-once, and the receiver uses the event ID for idempotency.

1.2 Why At-Least-Once Is the Standard

For business-critical webhooks, at-least-once is almost always correct:

Failure Mode At-Most-Once Impact At-Least-Once Impact
Network blip Lost forever Retried, delivered
Receiver slow Lost forever Retried, delivered
Receiver crash Lost forever Retried when back
Your crash Lost forever Recovered from queue

The cost of handling a duplicate is almost always lower than the cost of losing an event.

Example: Order confirmation webhook

At-most-once: Customer doesn't get email, thinks order failed, contacts support, places duplicate order. Cost: Support time + customer frustration + inventory issues.

At-least-once: Receiver sees same order twice, uses order_id for idempotency, sends one email. Cost: One database lookup.

1.3 The Receiver's Contract

When providing at-least-once delivery, document the receiver's responsibilities:

# Webhook Receiver Requirements

1. **Idempotency**: Your endpoint must handle duplicate deliveries.
   Use the `event_id` field to deduplicate.

2. **Response**: Return 200 status code to acknowledge receipt.
   Any other status or timeout will trigger retry.

3. **Speed**: Respond within 30 seconds.
   Do heavy processing asynchronously.

4. **Verification**: Verify webhook signatures.
   Check the X-Webhook-Signature header.

Example receiver implementation:

@app.route('/webhook', methods=['POST'])
def handle_webhook():
    # Verify signature first
    if not verify_signature(request):
        abort(401)
    
    event = request.json
    event_id = event['id']
    
    # Idempotency check
    if db.exists(f"processed:{event_id}"):
        return jsonify({"status": "already_processed"}), 200
    
    # Mark as processing (prevent concurrent duplicates)
    if not db.setnx(f"processing:{event_id}", "1", ex=300):
        return jsonify({"status": "processing"}), 200
    
    try:
        # Process the event
        process_event(event)
        
        # Mark as complete
        db.set(f"processed:{event_id}", "1", ex=86400*7)
        db.delete(f"processing:{event_id}")
        
        return jsonify({"status": "processed"}), 200
    
    except Exception as e:
        db.delete(f"processing:{event_id}")
        raise

Chapter 2: System Architecture

2.1 High-Level Components

A production webhook system needs these components:

┌─────────────────────────────────────────────────────────────────────────┐
│                      Webhook System Architecture                         │
│                                                                          │
│   ┌──────────┐    ┌──────────┐    ┌──────────────────────┐              │
│   │          │    │          │    │                      │              │
│   │  Event   │───▶│  Event   │───▶│  Webhook Dispatcher  │              │
│   │  Sources │    │  Bus     │    │  (Fan-out)           │              │
│   │          │    │ (Kafka)  │    │                      │              │
│   └──────────┘    └──────────┘    └──────────┬───────────┘              │
│                                              │                          │
│                    ┌─────────────────────────┼───────────┐              │
│                    │                         │           │              │
│                    ▼                         ▼           ▼              │
│            ┌──────────────┐          ┌──────────────┐                  │
│            │   Delivery   │          │   Delivery   │    ...           │
│            │   Queue P1   │          │   Queue P2   │                  │
│            └──────┬───────┘          └──────┬───────┘                  │
│                   │                         │                          │
│                   ▼                         ▼                          │
│            ┌──────────────┐          ┌──────────────┐                  │
│            │   Workers    │          │   Workers    │                  │
│            │   (Pool)     │          │   (Pool)     │                  │
│            └──────┬───────┘          └──────┬───────┘                  │
│                   │                         │                          │
│                   └────────────┬────────────┘                          │
│                                │                                        │
│                                ▼                                        │
│                    ┌──────────────────────┐                            │
│                    │   External Receivers │                            │
│                    └──────────────────────┘                            │
│                                                                          │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐                  │
│  │  Retry       │  │  Dead Letter │  │  Delivery    │                  │
│  │  Scheduler   │  │  Queue       │  │  Logs        │                  │
│  └──────────────┘  └──────────────┘  └──────────────┘                  │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

Let's understand each component:

Event Sources

These are the services in your system that generate events:

class OrderService:
    def create_order(self, order_data):
        order = self.db.create(order_data)
        
        # Emit event for webhook system
        self.event_bus.publish(
            event_type="order.created",
            data=order.to_dict()
        )
        
        return order

The key insight: event production should be decoupled from webhook delivery. The OrderService doesn't know about webhooks — it just publishes events.

Event Bus (Kafka/SQS/RabbitMQ)

The event bus provides:

  • Durability: Events survive crashes
  • Decoupling: Producers and consumers are independent
  • Replay: Can reprocess events if needed
  • Ordering: Within partitions, events stay ordered

Why Kafka is popular for this:

  • High throughput (millions/second)
  • Strong durability guarantees
  • Consumer groups for parallel processing
  • Retention for replay

Webhook Dispatcher (Fan-out)

This is the brain of the system. It:

  1. Consumes events from the bus
  2. Looks up subscriptions for each event type
  3. Creates delivery tasks for each subscription
class WebhookDispatcher:
    async def handle_event(self, event):
        # Find all subscriptions for this event type
        subscriptions = await self.db.find_subscriptions(
            event_type=event['type']
        )
        
        # Create delivery task for each
        for sub in subscriptions:
            if sub.active:
                await self.queue.enqueue({
                    'subscription_id': sub.id,
                    'event': event
                })

This is the "fan-out" — one event becomes N delivery tasks.

Delivery Queues (Partitioned)

Partitioning is key for:

  • Parallelism: Different partitions processed simultaneously
  • Ordering: Events for same subscription stay ordered
  • Isolation: Slow subscriber doesn't block others
def get_partition(subscription_id: str) -> int:
    """Consistent hashing for partition assignment."""
    return hash(subscription_id) % NUM_PARTITIONS

Delivery Workers

Workers pull tasks and make HTTP requests:

class DeliveryWorker:
    async def process_task(self, task):
        subscription = await self.db.get_subscription(task['subscription_id'])
        
        try:
            response = await self.http.post(
                subscription.url,
                json=self.build_payload(task['event']),
                headers=self.sign_headers(subscription, task['event']),
                timeout=30
            )
            
            if response.status == 200:
                await self.log_success(task)
            else:
                await self.schedule_retry(task)
        
        except Exception as e:
            await self.schedule_retry(task)

Dead Letter Queue (DLQ)

When all retries are exhausted:

async def move_to_dlq(self, task, error):
    await self.dlq.push({
        'task': task,
        'error': error,
        'failed_at': datetime.utcnow(),
        'attempts': task['attempt_count']
    })
    
    # Alert operations
    await self.alert_service.notify(
        f"Webhook failed after {task['attempt_count']} attempts",
        subscription_id=task['subscription_id']
    )

2.2 Data Flow Example

Let's trace an order through the system:

1. Order Created
   └── OrderService.create_order()
       └── event_bus.publish("order.created", {...})

2. Event Published to Kafka
   Topic: events
   Partition: hash("order.created") % 10 = 3
   Message: {"id": "evt_123", "type": "order.created", "data": {...}}

3. Dispatcher Consumes Event
   └── find_subscriptions("order.created")
       └── Returns: [sub_A, sub_B, sub_C]

4. Delivery Tasks Created
   └── Queue partition 7: {"subscription_id": "sub_A", "event": {...}}
   └── Queue partition 2: {"subscription_id": "sub_B", "event": {...}}
   └── Queue partition 5: {"subscription_id": "sub_C", "event": {...}}

5. Workers Process Tasks (in parallel)
   └── Worker-1: POST https://a.com/webhook {...}
   └── Worker-2: POST https://b.com/webhook {...}
   └── Worker-3: POST https://c.com/webhook {...}

6. Results
   └── sub_A: 200 OK → logged as success
   └── sub_B: timeout → scheduled retry at T+60s
   └── sub_C: 500 Error → scheduled retry at T+60s

2.3 Payload Design

A well-designed webhook payload:

{
    "id": "evt_1234567890abcdef",
    "type": "order.created",
    "api_version": "2024-01-15",
    "created_at": "2024-01-15T10:30:00.123Z",
    "data": {
        "object": "order",
        "id": "ord_abc123",
        "amount": 9900,
        "currency": "usd",
        "status": "pending",
        "customer": {
            "id": "cus_xyz",
            "email": "customer@example.com"
        },
        "items": [...]
    },
    "request_id": "req_original_request_id"
}

Key design decisions:

Field Purpose Why It Matters
id Unique event ID Idempotency key for receivers
type Event type Routing and filtering
api_version Payload version Backward compatibility
created_at Event timestamp Not delivery time!
data.object Object type Helps parsing
request_id Original request Correlation/debugging

Chapter 3: Retry Strategies

3.1 The Retry Schedule

A typical exponential backoff schedule:

Attempt 1: Immediate
Attempt 2: 1 minute later
Attempt 3: 5 minutes later
Attempt 4: 15 minutes later
Attempt 5: 1 hour later
Attempt 6: 2 hours later
Attempt 7: 4 hours later
Attempt 8: 8 hours later
Attempt 9: 24 hours later (final)

Total window: ~35 hours

Why this schedule works:

  • Early retries catch transient failures (network blips, deploys)
  • Longer delays give time for manual intervention
  • 24-hour final catches overnight outages

3.2 What To Retry

Not all failures should be retried:

def should_retry(response_or_error) -> bool:
    if isinstance(response_or_error, Exception):
        # Network errors - always retry
        if isinstance(response_or_error, (Timeout, ConnectionError)):
            return True
        # Unknown errors - retry cautiously
        return True
    
    status = response_or_error.status_code
    
    # Success - don't retry
    if status == 200:
        return False
    
    # Client errors - don't retry (except rate limit)
    if 400 <= status < 500:
        return status == 429  # Retry rate limits
    
    # Server errors - retry
    if status >= 500:
        return True
    
    # Redirects - don't retry
    if 300 <= status < 400:
        return False
    
    return False

The 4xx rule: If the receiver returns a 4xx error, your request is malformed. Retrying won't help. Log it and move on (or to DLQ for investigation).

Exception: 429 Too Many Requests

Rate limiting is special — it's a temporary condition:

def handle_rate_limit(response, task):
    retry_after = response.headers.get('Retry-After')
    
    if retry_after:
        delay = int(retry_after)
    else:
        delay = 60  # Default backoff
    
    schedule_retry(task, delay=delay)

3.3 Retry Implementation

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import random

@dataclass
class RetryConfig:
    max_attempts: int = 9
    base_delay: int = 60  # 1 minute
    max_delay: int = 86400  # 24 hours
    exponential_base: float = 2.0
    jitter: float = 0.1  # ±10%

class RetryScheduler:
    def __init__(self, config: RetryConfig, queue):
        self.config = config
        self.queue = queue
    
    def calculate_delay(self, attempt: int) -> int:
        """Calculate delay with exponential backoff and jitter."""
        
        # Exponential backoff
        delay = self.config.base_delay * (self.config.exponential_base ** attempt)
        
        # Cap at max delay
        delay = min(delay, self.config.max_delay)
        
        # Add jitter to prevent thundering herd
        jitter_range = delay * self.config.jitter
        delay += random.uniform(-jitter_range, jitter_range)
        
        return int(delay)
    
    async def schedule_retry(
        self,
        task: dict,
        error: str,
        force_delay: Optional[int] = None
    ) -> bool:
        """
        Schedule a retry for failed delivery.
        Returns True if retry scheduled, False if moved to DLQ.
        """
        
        attempt = task.get('attempt', 0)
        
        if attempt >= self.config.max_attempts:
            await self.move_to_dlq(task, error)
            return False
        
        delay = force_delay or self.calculate_delay(attempt)
        
        task['attempt'] = attempt + 1
        task['last_error'] = error
        task['retry_at'] = (datetime.utcnow() + timedelta(seconds=delay)).isoformat()
        
        await self.queue.enqueue_delayed(task, delay)
        
        return True
    
    async def move_to_dlq(self, task: dict, error: str):
        """Move to dead letter queue after exhausting retries."""
        
        dlq_entry = {
            'task': task,
            'final_error': error,
            'attempts': task.get('attempt', 0),
            'first_attempt': task.get('first_attempt'),
            'last_attempt': datetime.utcnow().isoformat(),
            'status': 'pending_review'
        }
        
        await self.dlq.push(dlq_entry)
        await self.metrics.increment('webhooks_dlq_total')
        await self.alert_ops(task, error)

3.4 Handling Long Outages

When an endpoint is down for hours:

Scenario:
  Endpoint goes down at 2:00 PM
  100 events/hour accumulate
  Endpoint recovers at 4:00 PM
  200 events need delivery

Problem:
  Each event is at different retry stage
  Some scheduled for 5:00 PM, some for 6:00 PM, etc.
  Recovery is slow even though endpoint is healthy

Solution: Health-based retry acceleration

class AdaptiveRetryScheduler:
    def __init__(self):
        self.endpoint_health = {}  # endpoint -> health status
    
    async def check_endpoint_health(self, endpoint: str) -> bool:
        """Periodic health check for failing endpoints."""
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.head(endpoint, timeout=5) as resp:
                    return resp.status < 500
        except:
            return False
    
    async def on_endpoint_recovery(self, endpoint: str):
        """When endpoint recovers, accelerate pending retries."""
        
        # Find all pending retries for this endpoint
        pending = await self.queue.get_pending_for_endpoint(endpoint)
        
        # Move them to immediate queue
        for task in pending:
            await self.queue.move_to_immediate(task)
        
        logger.info(f"Accelerated {len(pending)} retries for {endpoint}")
    
    async def run_health_checker(self):
        """Background task to check unhealthy endpoints."""
        
        while True:
            for endpoint, status in self.endpoint_health.items():
                if not status.healthy:
                    is_healthy = await self.check_endpoint_health(endpoint)
                    
                    if is_healthy:
                        logger.info(f"Endpoint recovered: {endpoint}")
                        await self.on_endpoint_recovery(endpoint)
                        status.healthy = True
            
            await asyncio.sleep(30)  # Check every 30 seconds

Chapter 4: Security

Webhook security is critical. Without proper security:

  • Attackers can send fake webhooks to your customers
  • Man-in-the-middle attacks can intercept sensitive data
  • Replay attacks can cause duplicate processing

4.1 Signature Verification

The standard approach: HMAC-SHA256 signatures.

Sender side:

import hmac
import hashlib
import time
import json

class WebhookSigner:
    def sign_payload(self, payload: dict, secret: str) -> dict:
        """Generate signature headers for webhook."""
        
        timestamp = int(time.time())
        
        # Create signed content: timestamp.payload
        payload_str = json.dumps(payload, separators=(',', ':'), sort_keys=True)
        signed_content = f"{timestamp}.{payload_str}"
        
        # Generate HMAC-SHA256 signature
        signature = hmac.new(
            secret.encode('utf-8'),
            signed_content.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        
        return {
            'X-Webhook-Timestamp': str(timestamp),
            'X-Webhook-Signature': f"sha256={signature}",
            'Content-Type': 'application/json'
        }

Receiver side:

class WebhookVerifier:
    def verify(
        self,
        payload_body: str,
        signature_header: str,
        timestamp_header: str,
        secret: str,
        tolerance: int = 300  # 5 minutes
    ) -> bool:
        """Verify webhook signature."""
        
        # Check timestamp is recent (prevent replay attacks)
        try:
            timestamp = int(timestamp_header)
        except (ValueError, TypeError):
            return False
        
        if abs(time.time() - timestamp) > tolerance:
            return False  # Too old or too far in future
        
        # Verify signature
        if not signature_header.startswith('sha256='):
            return False
        
        received_sig = signature_header[7:]
        
        signed_content = f"{timestamp}.{payload_body}"
        expected_sig = hmac.new(
            secret.encode('utf-8'),
            signed_content.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        
        # Constant-time comparison (prevent timing attacks)
        return hmac.compare_digest(received_sig, expected_sig)

4.2 Why Timestamp Matters

Without timestamp verification, attackers can replay old webhooks:

Attack scenario:
  1. Attacker intercepts webhook for "order.refunded"
  2. Attacker waits 6 months
  3. Attacker replays same webhook
  4. Receiver processes refund again!

With timestamp verification:
  1. Webhook has timestamp from 6 months ago
  2. Receiver checks: |now - timestamp| > 5 minutes
  3. Receiver rejects: "Timestamp too old"

4.3 Secret Rotation

Secrets should be rotated periodically:

class SecretManager:
    def rotate_secret(self, subscription_id: str) -> dict:
        """Rotate webhook secret with grace period."""
        
        subscription = self.db.get(subscription_id)
        new_secret = secrets.token_hex(32)
        
        # Keep old secret valid for 24 hours
        self.db.update(subscription_id, {
            'secret': new_secret,
            'previous_secret': subscription.secret,
            'previous_secret_expires': datetime.utcnow() + timedelta(hours=24)
        })
        
        return {
            'new_secret': new_secret,
            'old_secret_valid_until': (datetime.utcnow() + timedelta(hours=24)).isoformat()
        }
    
    def verify_with_rotation(
        self,
        subscription_id: str,
        payload: str,
        signature: str,
        timestamp: str
    ) -> bool:
        """Verify allowing both current and previous secrets."""
        
        subscription = self.db.get(subscription_id)
        
        # Try current secret
        if self.verifier.verify(payload, signature, timestamp, subscription.secret):
            return True
        
        # Try previous secret if still valid
        if (subscription.previous_secret and 
            subscription.previous_secret_expires > datetime.utcnow()):
            if self.verifier.verify(payload, signature, timestamp, subscription.previous_secret):
                return True
        
        return False

4.4 Additional Security Measures

class WebhookSecurityManager:
    def validate_endpoint(self, url: str) -> bool:
        """Validate webhook URL before registration."""
        
        parsed = urllib.parse.urlparse(url)
        
        # Require HTTPS
        if parsed.scheme != 'https':
            raise ValueError("Webhook URL must use HTTPS")
        
        # Block private IPs (SSRF prevention)
        try:
            ip = socket.gethostbyname(parsed.hostname)
            if ipaddress.ip_address(ip).is_private:
                raise ValueError("Webhook URL cannot point to private IP")
        except socket.gaierror:
            raise ValueError("Cannot resolve webhook hostname")
        
        # Block localhost
        if parsed.hostname in ['localhost', '127.0.0.1', '0.0.0.0']:
            raise ValueError("Webhook URL cannot be localhost")
        
        return True
    
    def test_endpoint(self, url: str, secret: str) -> bool:
        """Send test webhook to verify endpoint works."""
        
        test_event = {
            'id': f'evt_test_{uuid.uuid4().hex[:8]}',
            'type': 'webhook.test',
            'created_at': datetime.utcnow().isoformat()
        }
        
        try:
            response = requests.post(
                url,
                json=test_event,
                headers=self.signer.sign_payload(test_event, secret),
                timeout=10
            )
            return response.status_code == 200
        except:
            return False

Chapter 5: Scaling to 1M Webhooks/Hour

5.1 Understanding the Numbers

1,000,000 webhooks/hour
= 16,667 webhooks/minute
= 278 webhooks/second

With average 3 subscriptions per event:
= 833 delivery attempts/second

With 50% retry rate:
= ~1,250 total HTTP requests/second

Peak (2x average):
= 2,500 requests/second

5.2 Worker Pool Sizing

# Assumptions:
# - Average HTTP request takes 500ms
# - We want to handle 2,500 requests/second at peak
# - Each worker can handle 1/0.5 = 2 requests/second

# Required workers = 2,500 / 2 = 1,250 workers

# With async workers (10 concurrent requests each):
# Required workers = 2,500 / 20 = 125 workers

# With safety margin (2x):
# Deploy 250 async workers

5.3 Connection Pooling

class WebhookHTTPClient:
    def __init__(self):
        # Connection pool settings
        self.connector = aiohttp.TCPConnector(
            limit=1000,           # Total connections
            limit_per_host=10,    # Per-endpoint limit (fairness)
            ttl_dns_cache=300,    # DNS cache TTL
            keepalive_timeout=30  # Keep connections alive
        )
        
        self.timeout = aiohttp.ClientTimeout(
            total=30,      # Total request timeout
            connect=5,     # Connection timeout
            sock_read=25   # Read timeout
        )
        
        self.session = aiohttp.ClientSession(
            connector=self.connector,
            timeout=self.timeout
        )

5.4 Rate Limiting Per Endpoint

Don't overwhelm receivers:

class EndpointRateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.default_limit = 100  # requests per minute
    
    async def acquire(self, endpoint: str, limit: int = None) -> bool:
        """Try to acquire a rate limit slot."""
        
        limit = limit or self.default_limit
        key = f"ratelimit:{endpoint}"
        
        current = await self.redis.incr(key)
        
        if current == 1:
            await self.redis.expire(key, 60)
        
        if current > limit:
            return False
        
        return True
    
    async def wait_for_slot(self, endpoint: str, timeout: float = 30) -> bool:
        """Wait until rate limit slot available."""
        
        start = time.time()
        
        while time.time() - start < timeout:
            if await self.acquire(endpoint):
                return True
            await asyncio.sleep(0.1)
        
        return False

5.5 Queue Partitioning

class PartitionedQueue:
    """
    Partition by subscription for:
    - Parallel processing of different subscriptions
    - Ordered delivery per subscription
    - Isolation (slow subscriber doesn't block others)
    """
    
    NUM_PARTITIONS = 64
    
    def get_partition(self, subscription_id: str) -> int:
        """Consistent hash to partition."""
        hash_val = int(hashlib.md5(subscription_id.encode()).hexdigest(), 16)
        return hash_val % self.NUM_PARTITIONS
    
    async def enqueue(self, task: dict):
        """Add task to appropriate partition."""
        partition = self.get_partition(task['subscription_id'])
        stream = f"webhooks:p{partition}"
        await self.redis.xadd(stream, task)
    
    async def start_workers(self):
        """Start workers for all partitions."""
        tasks = []
        
        for partition in range(self.NUM_PARTITIONS):
            # Multiple workers per partition for throughput
            for worker_num in range(4):
                task = asyncio.create_task(
                    self.partition_worker(partition, worker_num)
                )
                tasks.append(task)
        
        await asyncio.gather(*tasks)

Part II: The Design Problem

Chapter 6: Design Challenge - Receiver Down for 2 Hours

6.1 The Scenario

Timeline:
  2:00 PM - Partner's webhook endpoint goes down
  2:00 PM to 4:00 PM - You generate 10,000 events for them
  4:00 PM - Endpoint comes back up

Questions:
  1. What happens to those 10,000 events?
  2. How long until they're all delivered?
  3. What's the customer experience?
  4. How do you optimize recovery?

6.2 Default Behavior (Without Optimization)

2:00 PM - Event 1 fails, scheduled retry at 2:01 PM
2:01 PM - Retry fails, scheduled at 2:06 PM
2:06 PM - Retry fails, scheduled at 2:21 PM
2:21 PM - Retry fails, scheduled at 3:21 PM
3:21 PM - Retry fails, scheduled at 5:21 PM

4:00 PM - Endpoint recovers
          But Event 1 won't retry until 5:21 PM!

Meanwhile:
  Event 1000: Scheduled retry at 5:30 PM
  Event 5000: Scheduled retry at 7:00 PM
  Event 10000: Scheduled retry at 9:00 PM

Full recovery takes ~7+ hours after endpoint returns!

6.3 Solution 1: Health Monitoring + Accelerated Retry

class SmartRetryManager:
    async def run_health_monitor(self):
        """Monitor unhealthy endpoints and trigger recovery."""
        
        while True:
            # Get all endpoints with recent failures
            unhealthy = await self.db.get_unhealthy_endpoints()
            
            for endpoint in unhealthy:
                # Check if recovered
                if await self.check_health(endpoint.url):
                    logger.info(f"Endpoint recovered: {endpoint.url}")
                    
                    # Accelerate all pending retries
                    await self.accelerate_retries(endpoint.subscription_id)
                    
                    # Update status
                    endpoint.healthy = True
                    endpoint.consecutive_failures = 0
                    await self.db.update(endpoint)
            
            await asyncio.sleep(30)  # Check every 30 seconds
    
    async def accelerate_retries(self, subscription_id: str):
        """Move all pending retries to immediate queue."""
        
        # Find all scheduled retries for this subscription
        pending = await self.queue.get_scheduled_for_subscription(subscription_id)
        
        logger.info(f"Accelerating {len(pending)} retries for {subscription_id}")
        
        for task in pending:
            # Move from delayed queue to immediate queue
            await self.queue.cancel_delayed(task['id'])
            await self.queue.enqueue_immediate(task)

Result:

4:00 PM - Health check detects recovery
4:00 PM - 10,000 pending retries moved to immediate queue
4:01 PM - Workers start delivering (833/second)
4:13 PM - All 10,000 events delivered!

Recovery time: 13 minutes (vs 7+ hours)

6.4 Solution 2: Gradual Ramp-Up

Don't overwhelm a recovering endpoint:

class GradualRecoveryManager:
    async def deliver_with_rampup(
        self,
        subscription_id: str,
        pending_tasks: List[dict]
    ):
        """Deliver pending webhooks with gradual ramp-up."""
        
        # Start slow, increase over time
        initial_rate = 1    # per second
        max_rate = 100      # per second
        ramp_duration = 300  # 5 minutes to reach max
        
        start_time = time.time()
        delivered = 0
        
        for task in pending_tasks:
            # Calculate current rate based on time
            elapsed = time.time() - start_time
            progress = min(1.0, elapsed / ramp_duration)
            current_rate = initial_rate + (max_rate - initial_rate) * progress
            
            # Deliver
            await self.deliver(task)
            delivered += 1
            
            # Rate limit
            await asyncio.sleep(1.0 / current_rate)
            
            # Log progress
            if delivered % 100 == 0:
                logger.info(f"Recovery: {delivered}/{len(pending_tasks)} at {current_rate:.1f}/sec")

6.5 Solution 3: Dead Letter Queue + Manual Recovery

For events that exhaust all retries:

class DLQManager:
    async def get_failed_webhooks(
        self,
        subscription_id: str = None,
        limit: int = 100
    ) -> List[dict]:
        """List failed webhooks for review."""
        
        query = """
            SELECT * FROM webhook_dlq 
            WHERE status = 'pending'
        """
        
        if subscription_id:
            query += f" AND subscription_id = '{subscription_id}'"
        
        query += f" ORDER BY created_at DESC LIMIT {limit}"
        
        return await self.db.fetch(query)
    
    async def retry_dlq_entry(self, entry_id: str, user_id: str) -> dict:
        """Manually retry a DLQ entry."""
        
        entry = await self.db.get_dlq_entry(entry_id)
        
        if not entry:
            raise NotFoundError("DLQ entry not found")
        
        # Mark as retrying
        await self.db.update_dlq_status(entry_id, 'retrying')
        
        # Attempt delivery
        result = await self.delivery_service.deliver(
            entry['subscription_id'],
            entry['event']
        )
        
        if result.success:
            await self.db.update_dlq_status(
                entry_id, 
                'resolved',
                resolved_by=user_id
            )
            return {'status': 'delivered'}
        else:
            await self.db.update_dlq_status(entry_id, 'pending')
            return {'status': 'failed', 'error': result.error}
    
    async def retry_all_for_subscription(self, subscription_id: str, user_id: str):
        """Retry all DLQ entries for a subscription."""
        
        entries = await self.get_failed_webhooks(subscription_id, limit=10000)
        
        results = {'success': 0, 'failed': 0}
        
        for entry in entries:
            result = await self.retry_dlq_entry(entry['id'], user_id)
            
            if result['status'] == 'delivered':
                results['success'] += 1
            else:
                results['failed'] += 1
        
        return results

6.6 Admin Dashboard

from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.get("/admin/webhooks/subscriptions/{subscription_id}/health")
async def get_subscription_health(subscription_id: str):
    """Get health status for a subscription."""
    
    subscription = await db.get_subscription(subscription_id)
    
    return {
        "subscription_id": subscription_id,
        "endpoint": subscription.url,
        "healthy": subscription.healthy,
        "consecutive_failures": subscription.consecutive_failures,
        "last_success": subscription.last_success,
        "last_failure": subscription.last_failure,
        "pending_retries": await queue.count_pending(subscription_id),
        "dlq_count": await dlq.count_for_subscription(subscription_id)
    }

@app.get("/admin/webhooks/dlq")
async def list_dlq(subscription_id: str = None, limit: int = 100):
    """List DLQ entries."""
    
    entries = await dlq_manager.get_failed_webhooks(subscription_id, limit)
    
    return {
        "entries": entries,
        "total": len(entries)
    }

@app.post("/admin/webhooks/dlq/{entry_id}/retry")
async def retry_dlq_entry(entry_id: str, current_user: User):
    """Retry a single DLQ entry."""
    
    result = await dlq_manager.retry_dlq_entry(entry_id, current_user.id)
    return result

@app.post("/admin/webhooks/subscriptions/{subscription_id}/retry-all-dlq")
async def retry_all_dlq(subscription_id: str, current_user: User):
    """Retry all DLQ entries for a subscription."""
    
    result = await dlq_manager.retry_all_for_subscription(
        subscription_id, 
        current_user.id
    )
    return result

@app.get("/admin/webhooks/stats")
async def get_webhook_stats():
    """Get overall webhook system stats."""
    
    return {
        "queue_depth": await queue.total_size(),
        "retry_queue_depth": await retry_queue.size(),
        "dlq_depth": await dlq.size(),
        "deliveries_last_hour": await metrics.get_deliveries_count(hours=1),
        "success_rate_last_hour": await metrics.get_success_rate(hours=1),
        "unhealthy_endpoints": await db.count_unhealthy_endpoints()
    }

Chapter 7: Monitoring and Observability

7.1 Key Metrics

from prometheus_client import Counter, Histogram, Gauge

# Delivery metrics
webhook_deliveries_total = Counter(
    'webhook_deliveries_total',
    'Total webhook deliveries',
    ['status', 'event_type']  # success, failed, retrying
)

webhook_delivery_duration = Histogram(
    'webhook_delivery_duration_seconds',
    'Webhook delivery duration',
    buckets=[0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30]
)

# Queue metrics
webhook_queue_depth = Gauge(
    'webhook_queue_depth',
    'Webhooks pending delivery',
    ['partition']
)

webhook_retry_queue_depth = Gauge(
    'webhook_retry_queue_depth',
    'Webhooks pending retry'
)

webhook_dlq_depth = Gauge(
    'webhook_dlq_depth',
    'Webhooks in dead letter queue'
)

# Endpoint health
webhook_endpoint_healthy = Gauge(
    'webhook_endpoint_healthy',
    'Endpoint health status',
    ['subscription_id']
)

webhook_endpoint_consecutive_failures = Gauge(
    'webhook_endpoint_consecutive_failures',
    'Consecutive failures per endpoint',
    ['subscription_id']
)

7.2 Alert Rules

groups:
  - name: webhook_alerts
    rules:
      # High failure rate
      - alert: WebhookHighFailureRate
        expr: |
          sum(rate(webhook_deliveries_total{status="failed"}[5m])) /
          sum(rate(webhook_deliveries_total[5m])) > 0.1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Webhook failure rate above 10%"
          description: "{{ $value | humanizePercentage }} of webhooks failing"
      
      # Queue backlog
      - alert: WebhookQueueBacklog
        expr: sum(webhook_queue_depth) > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Webhook queue backlog: {{ $value }}"
      
      # DLQ growing
      - alert: WebhookDLQGrowing
        expr: webhook_dlq_depth > 100
        for: 15m
        labels:
          severity: warning
        annotations:
          summary: "Webhook DLQ has {{ $value }} entries"
      
      # Endpoint down
      - alert: WebhookEndpointDown
        expr: webhook_endpoint_consecutive_failures > 10
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "Webhook endpoint has {{ $value }} consecutive failures"

7.3 Structured Logging

import structlog

logger = structlog.get_logger()

class WebhookLogger:
    def log_delivery_attempt(
        self,
        subscription_id: str,
        event_id: str,
        event_type: str,
        attempt: int,
        result: DeliveryResult
    ):
        logger.info(
            "webhook_delivery_attempt",
            subscription_id=subscription_id,
            event_id=event_id,
            event_type=event_type,
            attempt=attempt,
            success=result.success,
            status_code=result.status_code,
            duration_ms=result.duration_ms,
            error=result.error
        )
    
    def log_moved_to_dlq(
        self,
        subscription_id: str,
        event_id: str,
        error: str,
        attempts: int
    ):
        logger.error(
            "webhook_moved_to_dlq",
            subscription_id=subscription_id,
            event_id=event_id,
            error=error,
            total_attempts=attempts
        )
    
    def log_endpoint_status(
        self,
        subscription_id: str,
        healthy: bool,
        consecutive_failures: int
    ):
        logger.info(
            "webhook_endpoint_status",
            subscription_id=subscription_id,
            healthy=healthy,
            consecutive_failures=consecutive_failures
        )

Part III: Interview Questions

Chapter 8: The Hard Questions

8.1 "Design a webhook system for 1M events/hour. Receiver is down for 2 hours."

Strong Answer:

"Let me walk through the architecture and then address the failure scenario.

Architecture for 1M events/hour:

The math: 1M events/hour = 278/second. With average 3 subscriptions per event, that's 834 webhooks/second. At peak (2x), 1,668/second.

Components:

  • Event bus: Kafka for durability and replay capability
  • Dispatcher: Consumes events, fans out to delivery queues
  • Partitioned queues: By subscription ID for parallel processing
  • Async workers: 100+ workers with connection pooling
  • Per-endpoint rate limiting: Prevent overwhelming receivers

The 2-hour outage scenario:

Default behavior is exponential backoff: 1min, 5min, 15min, 1hr, 2hr... After 2 hours of failures, webhooks are scattered across the retry schedule.

My optimization: Active health monitoring

Background job pings unhealthy endpoints every 30 seconds. When one recovers, we immediately accelerate all pending retries instead of waiting for their scheduled time.

Without this: Recovery takes 7+ hours after endpoint returns With this: Recovery takes ~15 minutes after endpoint returns

I'd also implement:

  • Gradual ramp-up to avoid overwhelming recovering endpoints
  • DLQ with admin dashboard for manual retry
  • Alerting when endpoints are persistently unhealthy

Connection to previous days:

This ties to Day 2 (idempotency) because receivers must handle duplicates. We're doing at-least-once delivery, so they need to use the event_id for deduplication."

8.2 "How do you ensure webhook security?"

Strong Answer:

"Multiple layers of security:

1. HMAC-SHA256 signatures

Every webhook is signed with a shared secret:

  • Include timestamp in signature (prevent replay attacks)
  • Use constant-time comparison (prevent timing attacks)
  • Document verification clearly for receivers

2. Timestamp verification

Receivers should reject webhooks with timestamps more than 5 minutes old. This prevents attackers from replaying intercepted webhooks.

3. HTTPS only

We only allow HTTPS webhook URLs. We validate this during registration and refuse HTTP endpoints.

4. Secret rotation

Support rotating secrets with a grace period:

  • Generate new secret
  • Keep old secret valid for 24 hours
  • Both secrets accepted during transition
  • Old secret expires automatically

5. SSRF prevention

When users register webhook URLs, validate they're not pointing to:

  • Private IPs (10.x, 192.168.x, etc.)
  • Localhost
  • Internal hostnames

6. IP whitelisting option

For security-conscious receivers, publish our webhook source IPs so they can whitelist."

8.3 "How do you handle ordering of webhooks?"

Strong Answer:

"Ordering is tricky in distributed systems. Let me explain the challenge and solutions.

The problem:

Events occur: A → B → C But due to retries and parallelism, receiver might see: A → C → B

Solution 1: Per-entity ordering (recommended)

Partition by entity ID (order_id, customer_id):

  • All events for same entity go to same partition
  • Single worker per partition ensures order
  • Different entities processed in parallel

This gives you ordering where it matters most.

Solution 2: Sequence numbers

Include sequence in payload:

{"sequence": 42, "entity_id": "order_123", ...}

Receiver tracks last processed sequence and handles gaps.

Solution 3: Accept eventual consistency

Document that strict global ordering isn't guaranteed. Many systems don't actually need it — they need idempotency and eventual consistency.

My recommendation:

Per-entity partitioning plus sequence numbers in payloads. This handles 95% of use cases. For the rare case needing strict global ordering, I'd push back on the requirement or use a different mechanism."


Chapter 9: Session Summary

What You've Learned

Topic Key Concepts
Delivery Guarantees At-least-once is standard; exactly-once is impossible
Architecture Event bus → Dispatcher → Queues → Workers → DLQ
Retry Strategy Exponential backoff with health-based acceleration
Security HMAC signatures, timestamp validation, HTTPS
Scale Partitioning, connection pooling, rate limiting
Recovery Health monitoring, gradual ramp-up, DLQ

Connection to Previous Days

Day Connection
Day 1 (Timeouts) Webhook requests need timeouts
Day 2 (Idempotency) Receivers must be idempotent
Day 3 (Circuit Breakers) Per-endpoint circuit breakers
Day 4 (Webhooks) Complete async delivery system

Key Trade-offs

Decision Trade-off
At-least-once vs at-most-once Reliability vs simplicity
Long retry window Higher delivery rate vs older events
Health monitoring Faster recovery vs more complexity
Per-endpoint rate limiting Fairness vs throughput

Part IV: Further Learning

Resources

Documentation

Open Source Projects

Books

  • "Designing Data-Intensive Applications" by Martin Kleppmann - Chapter 11 on streaming
  • "Building Microservices" by Sam Newman - Event-driven architecture

Standards


Exercises

Exercise 1: Implement Webhook Signature Verification

Build complete signature generation and verification with:

  • HMAC-SHA256 signing
  • Timestamp validation
  • Replay attack prevention
  • Secret rotation support

Exercise 2: Build Adaptive Retry System

Create a retry manager that:

  • Uses exponential backoff
  • Monitors endpoint health
  • Accelerates retries on recovery
  • Implements gradual ramp-up

Exercise 3: Design DLQ Dashboard

Create admin interface with:

  • List failed webhooks by subscription
  • Single and batch retry functionality
  • Failure analytics and trends
  • Alerting configuration

Appendix: Production Implementation

Complete Webhook Delivery Service

"""
Production webhook delivery system.
Building on Days 1-4: Timeouts, Idempotency, Circuit Breakers, Webhooks.
"""

import asyncio
import hashlib
import hmac
import json
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
from enum import Enum
import aiohttp
import structlog
from prometheus_client import Counter, Histogram, Gauge

logger = structlog.get_logger()

# =============================================================================
# Configuration
# =============================================================================

@dataclass
class WebhookConfig:
    # Delivery
    delivery_timeout: int = 30
    max_retries: int = 9
    retry_delays: List[int] = field(default_factory=lambda: [
        60, 300, 900, 3600, 7200, 14400, 28800, 86400
    ])
    
    # Rate limiting
    max_concurrent_per_endpoint: int = 10
    max_requests_per_minute: int = 100
    
    # Health monitoring
    health_check_interval: int = 30
    circuit_breaker_threshold: int = 10
    
    # Security
    signature_tolerance: int = 300

# =============================================================================
# Models
# =============================================================================

class DeliveryStatus(Enum):
    PENDING = "pending"
    SUCCESS = "success" 
    FAILED = "failed"
    RETRYING = "retrying"
    DLQ = "dlq"

@dataclass
class WebhookSubscription:
    id: str
    user_id: str
    url: str
    secret: str
    events: List[str]
    active: bool = True
    healthy: bool = True
    consecutive_failures: int = 0
    last_success: Optional[datetime] = None
    last_failure: Optional[datetime] = None
    created_at: datetime = field(default_factory=datetime.utcnow)

@dataclass
class WebhookEvent:
    id: str
    type: str
    data: Dict[str, Any]
    timestamp: datetime = field(default_factory=datetime.utcnow)

@dataclass 
class DeliveryResult:
    success: bool
    status_code: Optional[int] = None
    duration_ms: Optional[int] = None
    error: Optional[str] = None
    should_retry: bool = False

# =============================================================================
# Metrics
# =============================================================================

deliveries_total = Counter(
    'webhook_deliveries_total',
    'Total deliveries',
    ['status', 'event_type']
)

delivery_duration = Histogram(
    'webhook_delivery_duration_seconds', 
    'Delivery duration',
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30]
)

queue_depth = Gauge('webhook_queue_depth', 'Queue depth')
dlq_depth = Gauge('webhook_dlq_depth', 'DLQ depth')

# =============================================================================
# Webhook Signer
# =============================================================================

class WebhookSigner:
    @staticmethod
    def sign(payload: dict, secret: str, timestamp: int = None) -> Dict[str, str]:
        timestamp = timestamp or int(time.time())
        payload_str = json.dumps(payload, separators=(',', ':'), sort_keys=True)
        signed_content = f"{timestamp}.{payload_str}"
        
        signature = hmac.new(
            secret.encode(),
            signed_content.encode(),
            hashlib.sha256
        ).hexdigest()
        
        return {
            'X-Webhook-Timestamp': str(timestamp),
            'X-Webhook-Signature': f"sha256={signature}",
            'Content-Type': 'application/json'
        }
    
    @staticmethod
    def verify(payload: str, signature: str, timestamp: str, 
               secret: str, tolerance: int = 300) -> bool:
        try:
            ts = int(timestamp)
            if abs(time.time() - ts) > tolerance:
                return False
            
            if not signature.startswith('sha256='):
                return False
            
            expected = hmac.new(
                secret.encode(),
                f"{ts}.{payload}".encode(),
                hashlib.sha256
            ).hexdigest()
            
            return hmac.compare_digest(signature[7:], expected)
        except:
            return False

# =============================================================================
# Retry Manager
# =============================================================================

class RetryManager:
    def __init__(self, config: WebhookConfig, queue, dlq):
        self.config = config
        self.queue = queue
        self.dlq = dlq
    
    async def schedule_retry(self, task: dict, error: str) -> bool:
        attempt = task.get('attempt', 0)
        
        if attempt >= self.config.max_retries:
            await self.move_to_dlq(task, error)
            return False
        
        delay = self.config.retry_delays[min(attempt, len(self.config.retry_delays) - 1)]
        delay += int(delay * 0.1 * (2 * hash(task['event']['id']) % 100 - 50) / 100)
        
        task['attempt'] = attempt + 1
        task['last_error'] = error
        task['retry_at'] = (datetime.utcnow() + timedelta(seconds=delay)).isoformat()
        
        await self.queue.enqueue_delayed(task, delay)
        
        logger.info("retry_scheduled",
            subscription_id=task['subscription_id'],
            event_id=task['event']['id'],
            attempt=task['attempt'],
            delay=delay
        )
        
        return True
    
    async def move_to_dlq(self, task: dict, error: str):
        dlq_entry = {
            'id': str(uuid.uuid4()),
            'task': task,
            'error': error,
            'attempts': task.get('attempt', 0),
            'failed_at': datetime.utcnow().isoformat(),
            'status': 'pending'
        }
        
        await self.dlq.push(dlq_entry)
        dlq_depth.inc()
        
        logger.error("moved_to_dlq",
            subscription_id=task['subscription_id'],
            event_id=task['event']['id'],
            error=error,
            attempts=task.get('attempt', 0)
        )

# =============================================================================
# Delivery Service
# =============================================================================

class WebhookDeliveryService:
    def __init__(self, config: WebhookConfig, subscription_store, 
                 retry_manager: RetryManager):
        self.config = config
        self.subscriptions = subscription_store
        self.retry_manager = retry_manager
        self.signer = WebhookSigner()
        self.session: Optional[aiohttp.ClientSession] = None
        self.endpoint_semaphores: Dict[str, asyncio.Semaphore] = {}
    
    async def start(self):
        connector = aiohttp.TCPConnector(
            limit=1000,
            limit_per_host=self.config.max_concurrent_per_endpoint,
            ttl_dns_cache=300
        )
        timeout = aiohttp.ClientTimeout(total=self.config.delivery_timeout)
        self.session = aiohttp.ClientSession(connector=connector, timeout=timeout)
    
    async def stop(self):
        if self.session:
            await self.session.close()
    
    async def deliver(self, task: dict) -> DeliveryResult:
        subscription_id = task['subscription_id']
        event = task['event']
        attempt = task.get('attempt', 0)
        
        subscription = await self.subscriptions.get(subscription_id)
        if not subscription or not subscription.active:
            return DeliveryResult(success=False, error="Subscription inactive")
        
        # Build signed payload
        payload = {
            'id': event['id'],
            'type': event['type'],
            'data': event['data'],
            'created_at': event['timestamp']
        }
        headers = self.signer.sign(payload, subscription.secret)
        
        # Rate limit per endpoint
        semaphore = self.endpoint_semaphores.setdefault(
            subscription.url,
            asyncio.Semaphore(self.config.max_concurrent_per_endpoint)
        )
        
        start = time.time()
        
        async with semaphore:
            try:
                async with self.session.post(
                    subscription.url,
                    json=payload,
                    headers=headers
                ) as response:
                    duration_ms = int((time.time() - start) * 1000)
                    
                    delivery_duration.observe(duration_ms / 1000)
                    
                    if response.status == 200:
                        deliveries_total.labels(status='success', event_type=event['type']).inc()
                        await self._record_success(subscription)
                        return DeliveryResult(
                            success=True,
                            status_code=200,
                            duration_ms=duration_ms
                        )
                    
                    elif 400 <= response.status < 500 and response.status != 429:
                        deliveries_total.labels(status='client_error', event_type=event['type']).inc()
                        return DeliveryResult(
                            success=False,
                            status_code=response.status,
                            error=f"Client error: {response.status}",
                            should_retry=False
                        )
                    
                    else:
                        deliveries_total.labels(status='server_error', event_type=event['type']).inc()
                        await self._record_failure(subscription)
                        return DeliveryResult(
                            success=False,
                            status_code=response.status,
                            error=f"Server error: {response.status}",
                            should_retry=True,
                            duration_ms=duration_ms
                        )
            
            except asyncio.TimeoutError:
                deliveries_total.labels(status='timeout', event_type=event['type']).inc()
                await self._record_failure(subscription)
                return DeliveryResult(
                    success=False,
                    error="Timeout",
                    should_retry=True
                )
            
            except Exception as e:
                deliveries_total.labels(status='error', event_type=event['type']).inc()
                await self._record_failure(subscription)
                return DeliveryResult(
                    success=False,
                    error=str(e),
                    should_retry=True
                )
    
    async def _record_success(self, subscription: WebhookSubscription):
        subscription.healthy = True
        subscription.consecutive_failures = 0
        subscription.last_success = datetime.utcnow()
        await self.subscriptions.update(subscription)
    
    async def _record_failure(self, subscription: WebhookSubscription):
        subscription.consecutive_failures += 1
        subscription.last_failure = datetime.utcnow()
        
        if subscription.consecutive_failures >= self.config.circuit_breaker_threshold:
            subscription.healthy = False
        
        await self.subscriptions.update(subscription)

# =============================================================================
# Worker
# =============================================================================

class WebhookWorker:
    def __init__(self, worker_id: str, delivery_service: WebhookDeliveryService,
                 retry_manager: RetryManager, queue):
        self.worker_id = worker_id
        self.delivery = delivery_service
        self.retry_manager = retry_manager
        self.queue = queue
        self.running = False
    
    async def run(self):
        self.running = True
        logger.info("worker_started", worker_id=self.worker_id)
        
        while self.running:
            try:
                tasks = await self.queue.dequeue(count=10, timeout=5)
                
                if tasks:
                    await asyncio.gather(*[
                        self.process_task(task) for task in tasks
                    ])
            except Exception as e:
                logger.error("worker_error", error=str(e))
                await asyncio.sleep(1)
    
    async def process_task(self, task: dict):
        result = await self.delivery.deliver(task)
        
        if result.success:
            logger.info("delivery_success",
                subscription_id=task['subscription_id'],
                event_id=task['event']['id'],
                duration_ms=result.duration_ms
            )
        elif result.should_retry:
            await self.retry_manager.schedule_retry(task, result.error)
        else:
            logger.warning("delivery_permanent_failure",
                subscription_id=task['subscription_id'],
                event_id=task['event']['id'],
                error=result.error
            )
    
    async def stop(self):
        self.running = False

# =============================================================================
# Health Checker
# =============================================================================

class HealthChecker:
    def __init__(self, subscription_store, queue, check_interval: int = 30):
        self.subscriptions = subscription_store
        self.queue = queue
        self.check_interval = check_interval
        self.running = False
    
    async def run(self):
        self.running = True
        
        while self.running:
            try:
                unhealthy = await self.subscriptions.find_unhealthy()
                
                for sub in unhealthy:
                    if await self._check_health(sub.url):
                        logger.info("endpoint_recovered", 
                            subscription_id=sub.id, url=sub.url)
                        
                        sub.healthy = True
                        sub.consecutive_failures = 0
                        await self.subscriptions.update(sub)
                        
                        await self._accelerate_retries(sub.id)
                
                await asyncio.sleep(self.check_interval)
            except Exception as e:
                logger.error("health_check_error", error=str(e))
                await asyncio.sleep(self.check_interval)
    
    async def _check_health(self, url: str) -> bool:
        try:
            async with aiohttp.ClientSession() as session:
                async with session.head(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
                    return resp.status < 500
        except:
            return False
    
    async def _accelerate_retries(self, subscription_id: str):
        pending = await self.queue.get_pending_for_subscription(subscription_id)
        
        for task in pending:
            await self.queue.move_to_immediate(task)
        
        if pending:
            logger.info("retries_accelerated",
                subscription_id=subscription_id,
                count=len(pending)
            )
    
    async def stop(self):
        self.running = False

# =============================================================================
# DLQ Manager
# =============================================================================

class DLQManager:
    def __init__(self, dlq, delivery_service: WebhookDeliveryService):
        self.dlq = dlq
        self.delivery = delivery_service
    
    async def list_entries(self, subscription_id: str = None, 
                          status: str = 'pending', limit: int = 100) -> List[dict]:
        return await self.dlq.find(
            subscription_id=subscription_id,
            status=status,
            limit=limit
        )
    
    async def retry_entry(self, entry_id: str, user_id: str) -> dict:
        entry = await self.dlq.get(entry_id)
        if not entry:
            raise ValueError("Entry not found")
        
        await self.dlq.update_status(entry_id, 'retrying')
        
        result = await self.delivery.deliver(entry['task'])
        
        if result.success:
            await self.dlq.update_status(entry_id, 'resolved', resolved_by=user_id)
            dlq_depth.dec()
            return {'status': 'delivered'}
        else:
            await self.dlq.update_status(entry_id, 'pending')
            return {'status': 'failed', 'error': result.error}
    
    async def retry_all(self, subscription_id: str, user_id: str) -> dict:
        entries = await self.list_entries(subscription_id, limit=10000)
        
        results = {'success': 0, 'failed': 0}
        
        for entry in entries:
            result = await self.retry_entry(entry['id'], user_id)
            if result['status'] == 'delivered':
                results['success'] += 1
            else:
                results['failed'] += 1
        
        return results
    
    async def discard_entry(self, entry_id: str, user_id: str, reason: str):
        await self.dlq.update_status(entry_id, 'discarded', 
            resolved_by=user_id, resolution_note=reason)
        dlq_depth.dec()

End of Day 4: Webhook Delivery System

Tomorrow: Day 5 — Distributed Cron. How do you ensure a scheduled job runs exactly once across multiple servers? We'll cover leader election, fencing tokens, and why ZooKeeper/etcd exist.