Week 2 — Day 4: Webhook Delivery System
System Design Mastery Series
Preface: From Synchronous to Asynchronous
For the past three days, we've been working in a synchronous world:
User clicks "Pay" → Your server → Bank API → Response → User sees result
Everything happens in a single request-response cycle. The user waits while you call downstream services. We added:
- Day 1: Timeouts (don't wait forever)
- Day 2: Idempotency (safe to retry)
- Day 3: Circuit breakers (fail fast when broken)
But what happens when you need to notify EXTERNAL systems about events?
Order completed in your system
→ Need to notify: Warehouse, Accounting, Analytics, Partner systems
→ These systems are outside your control
→ They might be slow, down, or misconfigured
→ You can't make the user wait for all of them
This is where webhooks come in.
What Are Webhooks?
Webhooks are HTTP callbacks. When an event occurs in your system, you make an HTTP request to a URL that someone else has registered:
Traditional API (Pull):
Partner: "Do you have any new orders?"
You: "Yes, here's order #123"
Partner: "Do you have any new orders?"
You: "No"
Partner: "Do you have any new orders?"
You: "Yes, here's order #124"
Problem: Wasteful polling, delayed updates
Webhooks (Push):
Partner: "Call me at https://partner.com/webhook when you have new orders"
You: [Order #123 created]
You: POST https://partner.com/webhook {"event": "order.created", "order": {...}}
Partner: "Got it, thanks!"
Benefit: Real-time, efficient
Why Webhooks Are Hard
Webhooks seem simple, but they're deceptively difficult:
- Receivers are unreliable: They go down, timeout, return errors
- Networks are unreliable: Requests get lost, connections drop
- Scale is challenging: Sending millions of webhooks quickly
- Ordering is tricky: Events might arrive out of order
- Security is critical: Anyone could send fake webhooks
- Observability is hard: Did the webhook arrive? Did they process it?
Today, we'll design a production webhook delivery system that handles all of these challenges.
Part I: Foundations
Chapter 1: Delivery Guarantees
1.1 The Three Guarantees
In distributed systems, there are three delivery guarantees:
┌─────────────────────────────────────────────────────────────────────────┐
│ Delivery Guarantees │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ │ │ │ │ │ │
│ │ AT-MOST-ONCE │ │ AT-LEAST-ONCE │ │ EXACTLY-ONCE │ │
│ │ │ │ │ │ │ │
│ │ Send once, │ │ Retry until │ │ Delivered │ │
│ │ don't retry │ │ acknowledged │ │ exactly once │ │
│ │ │ │ │ │ │ │
│ │ May lose │ │ May duplicate │ │ No loss, no │ │
│ │ messages │ │ messages │ │ duplicates │ │
│ │ │ │ │ │ │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
│ Easiest Most Common Impossible* │
│ │
│ * True exactly-once is impossible in distributed systems. │
│ We approximate it with at-least-once + idempotent receivers. │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Let's understand each guarantee in depth:
At-Most-Once
The simplest approach: try once and forget.
def send_webhook_at_most_once(url, payload):
try:
response = requests.post(url, json=payload, timeout=5)
return response.status_code == 200
except:
return False # Don't retry, accept the loss
When to use at-most-once:
- Analytics and tracking events (losing a few is acceptable)
- Logging and debugging notifications
- Non-critical status updates
When NOT to use:
- Payment notifications
- Order confirmations
- Any event where loss causes business impact
At-Least-Once
The standard for webhooks: retry until acknowledged.
def send_webhook_at_least_once(url, payload, max_retries=5):
for attempt in range(max_retries):
try:
response = requests.post(url, json=payload, timeout=5)
if response.status_code == 200:
return True # Success!
except:
pass
# Exponential backoff before retry
time.sleep(2 ** attempt)
return False # Move to dead letter queue
The duplicate problem:
Timeline:
T=0: You send webhook
T=29s: Receiver processes it, sends "200 OK"
T=30s: Your timeout fires (before 200 arrives)
T=30s: You think it failed, schedule retry
T=31s: You receive the 200, but already scheduled retry
T=60s: You retry - duplicate!
This is unavoidable. The receiver MUST handle duplicates.
Exactly-Once (The Myth)
True exactly-once is impossible in distributed systems. Here's the fundamental problem:
The Two Generals Problem:
General A wants to coordinate attack with General B.
- A sends messenger: "Attack at dawn"
- B receives message
- B sends messenger back: "Confirmed"
- A receives confirmation
But what if B's messenger is captured?
- B thinks A knows
- A doesn't know B confirmed
- A sends another messenger
- Now B has two messages
No amount of acknowledgments solves this.
After N confirmations, you still don't know if the Nth was received.
The practical solution: At-least-once delivery + idempotent receivers.
This is exactly what we learned in Day 2 — idempotency keys make operations safe to repeat. The webhook system delivers at-least-once, and the receiver uses the event ID for idempotency.
1.2 Why At-Least-Once Is the Standard
For business-critical webhooks, at-least-once is almost always correct:
| Failure Mode | At-Most-Once Impact | At-Least-Once Impact |
|---|---|---|
| Network blip | Lost forever | Retried, delivered |
| Receiver slow | Lost forever | Retried, delivered |
| Receiver crash | Lost forever | Retried when back |
| Your crash | Lost forever | Recovered from queue |
The cost of handling a duplicate is almost always lower than the cost of losing an event.
Example: Order confirmation webhook
At-most-once: Customer doesn't get email, thinks order failed, contacts support, places duplicate order. Cost: Support time + customer frustration + inventory issues.
At-least-once: Receiver sees same order twice, uses order_id for idempotency, sends one email. Cost: One database lookup.
1.3 The Receiver's Contract
When providing at-least-once delivery, document the receiver's responsibilities:
# Webhook Receiver Requirements
1. **Idempotency**: Your endpoint must handle duplicate deliveries.
Use the `event_id` field to deduplicate.
2. **Response**: Return 200 status code to acknowledge receipt.
Any other status or timeout will trigger retry.
3. **Speed**: Respond within 30 seconds.
Do heavy processing asynchronously.
4. **Verification**: Verify webhook signatures.
Check the X-Webhook-Signature header.
Example receiver implementation:
@app.route('/webhook', methods=['POST'])
def handle_webhook():
# Verify signature first
if not verify_signature(request):
abort(401)
event = request.json
event_id = event['id']
# Idempotency check
if db.exists(f"processed:{event_id}"):
return jsonify({"status": "already_processed"}), 200
# Mark as processing (prevent concurrent duplicates)
if not db.setnx(f"processing:{event_id}", "1", ex=300):
return jsonify({"status": "processing"}), 200
try:
# Process the event
process_event(event)
# Mark as complete
db.set(f"processed:{event_id}", "1", ex=86400*7)
db.delete(f"processing:{event_id}")
return jsonify({"status": "processed"}), 200
except Exception as e:
db.delete(f"processing:{event_id}")
raise
Chapter 2: System Architecture
2.1 High-Level Components
A production webhook system needs these components:
┌─────────────────────────────────────────────────────────────────────────┐
│ Webhook System Architecture │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────────────┐ │
│ │ │ │ │ │ │ │
│ │ Event │───▶│ Event │───▶│ Webhook Dispatcher │ │
│ │ Sources │ │ Bus │ │ (Fan-out) │ │
│ │ │ │ (Kafka) │ │ │ │
│ └──────────┘ └──────────┘ └──────────┬───────────┘ │
│ │ │
│ ┌─────────────────────────┼───────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Delivery │ │ Delivery │ ... │
│ │ Queue P1 │ │ Queue P2 │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Workers │ │ Workers │ │
│ │ (Pool) │ │ (Pool) │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ └────────────┬────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ External Receivers │ │
│ └──────────────────────┘ │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Retry │ │ Dead Letter │ │ Delivery │ │
│ │ Scheduler │ │ Queue │ │ Logs │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Let's understand each component:
Event Sources
These are the services in your system that generate events:
class OrderService:
def create_order(self, order_data):
order = self.db.create(order_data)
# Emit event for webhook system
self.event_bus.publish(
event_type="order.created",
data=order.to_dict()
)
return order
The key insight: event production should be decoupled from webhook delivery. The OrderService doesn't know about webhooks — it just publishes events.
Event Bus (Kafka/SQS/RabbitMQ)
The event bus provides:
- Durability: Events survive crashes
- Decoupling: Producers and consumers are independent
- Replay: Can reprocess events if needed
- Ordering: Within partitions, events stay ordered
Why Kafka is popular for this:
- High throughput (millions/second)
- Strong durability guarantees
- Consumer groups for parallel processing
- Retention for replay
Webhook Dispatcher (Fan-out)
This is the brain of the system. It:
- Consumes events from the bus
- Looks up subscriptions for each event type
- Creates delivery tasks for each subscription
class WebhookDispatcher:
async def handle_event(self, event):
# Find all subscriptions for this event type
subscriptions = await self.db.find_subscriptions(
event_type=event['type']
)
# Create delivery task for each
for sub in subscriptions:
if sub.active:
await self.queue.enqueue({
'subscription_id': sub.id,
'event': event
})
This is the "fan-out" — one event becomes N delivery tasks.
Delivery Queues (Partitioned)
Partitioning is key for:
- Parallelism: Different partitions processed simultaneously
- Ordering: Events for same subscription stay ordered
- Isolation: Slow subscriber doesn't block others
def get_partition(subscription_id: str) -> int:
"""Consistent hashing for partition assignment."""
return hash(subscription_id) % NUM_PARTITIONS
Delivery Workers
Workers pull tasks and make HTTP requests:
class DeliveryWorker:
async def process_task(self, task):
subscription = await self.db.get_subscription(task['subscription_id'])
try:
response = await self.http.post(
subscription.url,
json=self.build_payload(task['event']),
headers=self.sign_headers(subscription, task['event']),
timeout=30
)
if response.status == 200:
await self.log_success(task)
else:
await self.schedule_retry(task)
except Exception as e:
await self.schedule_retry(task)
Dead Letter Queue (DLQ)
When all retries are exhausted:
async def move_to_dlq(self, task, error):
await self.dlq.push({
'task': task,
'error': error,
'failed_at': datetime.utcnow(),
'attempts': task['attempt_count']
})
# Alert operations
await self.alert_service.notify(
f"Webhook failed after {task['attempt_count']} attempts",
subscription_id=task['subscription_id']
)
2.2 Data Flow Example
Let's trace an order through the system:
1. Order Created
└── OrderService.create_order()
└── event_bus.publish("order.created", {...})
2. Event Published to Kafka
Topic: events
Partition: hash("order.created") % 10 = 3
Message: {"id": "evt_123", "type": "order.created", "data": {...}}
3. Dispatcher Consumes Event
└── find_subscriptions("order.created")
└── Returns: [sub_A, sub_B, sub_C]
4. Delivery Tasks Created
└── Queue partition 7: {"subscription_id": "sub_A", "event": {...}}
└── Queue partition 2: {"subscription_id": "sub_B", "event": {...}}
└── Queue partition 5: {"subscription_id": "sub_C", "event": {...}}
5. Workers Process Tasks (in parallel)
└── Worker-1: POST https://a.com/webhook {...}
└── Worker-2: POST https://b.com/webhook {...}
└── Worker-3: POST https://c.com/webhook {...}
6. Results
└── sub_A: 200 OK → logged as success
└── sub_B: timeout → scheduled retry at T+60s
└── sub_C: 500 Error → scheduled retry at T+60s
2.3 Payload Design
A well-designed webhook payload:
{
"id": "evt_1234567890abcdef",
"type": "order.created",
"api_version": "2024-01-15",
"created_at": "2024-01-15T10:30:00.123Z",
"data": {
"object": "order",
"id": "ord_abc123",
"amount": 9900,
"currency": "usd",
"status": "pending",
"customer": {
"id": "cus_xyz",
"email": "customer@example.com"
},
"items": [...]
},
"request_id": "req_original_request_id"
}
Key design decisions:
| Field | Purpose | Why It Matters |
|---|---|---|
id |
Unique event ID | Idempotency key for receivers |
type |
Event type | Routing and filtering |
api_version |
Payload version | Backward compatibility |
created_at |
Event timestamp | Not delivery time! |
data.object |
Object type | Helps parsing |
request_id |
Original request | Correlation/debugging |
Chapter 3: Retry Strategies
3.1 The Retry Schedule
A typical exponential backoff schedule:
Attempt 1: Immediate
Attempt 2: 1 minute later
Attempt 3: 5 minutes later
Attempt 4: 15 minutes later
Attempt 5: 1 hour later
Attempt 6: 2 hours later
Attempt 7: 4 hours later
Attempt 8: 8 hours later
Attempt 9: 24 hours later (final)
Total window: ~35 hours
Why this schedule works:
- Early retries catch transient failures (network blips, deploys)
- Longer delays give time for manual intervention
- 24-hour final catches overnight outages
3.2 What To Retry
Not all failures should be retried:
def should_retry(response_or_error) -> bool:
if isinstance(response_or_error, Exception):
# Network errors - always retry
if isinstance(response_or_error, (Timeout, ConnectionError)):
return True
# Unknown errors - retry cautiously
return True
status = response_or_error.status_code
# Success - don't retry
if status == 200:
return False
# Client errors - don't retry (except rate limit)
if 400 <= status < 500:
return status == 429 # Retry rate limits
# Server errors - retry
if status >= 500:
return True
# Redirects - don't retry
if 300 <= status < 400:
return False
return False
The 4xx rule: If the receiver returns a 4xx error, your request is malformed. Retrying won't help. Log it and move on (or to DLQ for investigation).
Exception: 429 Too Many Requests
Rate limiting is special — it's a temporary condition:
def handle_rate_limit(response, task):
retry_after = response.headers.get('Retry-After')
if retry_after:
delay = int(retry_after)
else:
delay = 60 # Default backoff
schedule_retry(task, delay=delay)
3.3 Retry Implementation
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import random
@dataclass
class RetryConfig:
max_attempts: int = 9
base_delay: int = 60 # 1 minute
max_delay: int = 86400 # 24 hours
exponential_base: float = 2.0
jitter: float = 0.1 # ±10%
class RetryScheduler:
def __init__(self, config: RetryConfig, queue):
self.config = config
self.queue = queue
def calculate_delay(self, attempt: int) -> int:
"""Calculate delay with exponential backoff and jitter."""
# Exponential backoff
delay = self.config.base_delay * (self.config.exponential_base ** attempt)
# Cap at max delay
delay = min(delay, self.config.max_delay)
# Add jitter to prevent thundering herd
jitter_range = delay * self.config.jitter
delay += random.uniform(-jitter_range, jitter_range)
return int(delay)
async def schedule_retry(
self,
task: dict,
error: str,
force_delay: Optional[int] = None
) -> bool:
"""
Schedule a retry for failed delivery.
Returns True if retry scheduled, False if moved to DLQ.
"""
attempt = task.get('attempt', 0)
if attempt >= self.config.max_attempts:
await self.move_to_dlq(task, error)
return False
delay = force_delay or self.calculate_delay(attempt)
task['attempt'] = attempt + 1
task['last_error'] = error
task['retry_at'] = (datetime.utcnow() + timedelta(seconds=delay)).isoformat()
await self.queue.enqueue_delayed(task, delay)
return True
async def move_to_dlq(self, task: dict, error: str):
"""Move to dead letter queue after exhausting retries."""
dlq_entry = {
'task': task,
'final_error': error,
'attempts': task.get('attempt', 0),
'first_attempt': task.get('first_attempt'),
'last_attempt': datetime.utcnow().isoformat(),
'status': 'pending_review'
}
await self.dlq.push(dlq_entry)
await self.metrics.increment('webhooks_dlq_total')
await self.alert_ops(task, error)
3.4 Handling Long Outages
When an endpoint is down for hours:
Scenario:
Endpoint goes down at 2:00 PM
100 events/hour accumulate
Endpoint recovers at 4:00 PM
200 events need delivery
Problem:
Each event is at different retry stage
Some scheduled for 5:00 PM, some for 6:00 PM, etc.
Recovery is slow even though endpoint is healthy
Solution: Health-based retry acceleration
class AdaptiveRetryScheduler:
def __init__(self):
self.endpoint_health = {} # endpoint -> health status
async def check_endpoint_health(self, endpoint: str) -> bool:
"""Periodic health check for failing endpoints."""
try:
async with aiohttp.ClientSession() as session:
async with session.head(endpoint, timeout=5) as resp:
return resp.status < 500
except:
return False
async def on_endpoint_recovery(self, endpoint: str):
"""When endpoint recovers, accelerate pending retries."""
# Find all pending retries for this endpoint
pending = await self.queue.get_pending_for_endpoint(endpoint)
# Move them to immediate queue
for task in pending:
await self.queue.move_to_immediate(task)
logger.info(f"Accelerated {len(pending)} retries for {endpoint}")
async def run_health_checker(self):
"""Background task to check unhealthy endpoints."""
while True:
for endpoint, status in self.endpoint_health.items():
if not status.healthy:
is_healthy = await self.check_endpoint_health(endpoint)
if is_healthy:
logger.info(f"Endpoint recovered: {endpoint}")
await self.on_endpoint_recovery(endpoint)
status.healthy = True
await asyncio.sleep(30) # Check every 30 seconds
Chapter 4: Security
Webhook security is critical. Without proper security:
- Attackers can send fake webhooks to your customers
- Man-in-the-middle attacks can intercept sensitive data
- Replay attacks can cause duplicate processing
4.1 Signature Verification
The standard approach: HMAC-SHA256 signatures.
Sender side:
import hmac
import hashlib
import time
import json
class WebhookSigner:
def sign_payload(self, payload: dict, secret: str) -> dict:
"""Generate signature headers for webhook."""
timestamp = int(time.time())
# Create signed content: timestamp.payload
payload_str = json.dumps(payload, separators=(',', ':'), sort_keys=True)
signed_content = f"{timestamp}.{payload_str}"
# Generate HMAC-SHA256 signature
signature = hmac.new(
secret.encode('utf-8'),
signed_content.encode('utf-8'),
hashlib.sha256
).hexdigest()
return {
'X-Webhook-Timestamp': str(timestamp),
'X-Webhook-Signature': f"sha256={signature}",
'Content-Type': 'application/json'
}
Receiver side:
class WebhookVerifier:
def verify(
self,
payload_body: str,
signature_header: str,
timestamp_header: str,
secret: str,
tolerance: int = 300 # 5 minutes
) -> bool:
"""Verify webhook signature."""
# Check timestamp is recent (prevent replay attacks)
try:
timestamp = int(timestamp_header)
except (ValueError, TypeError):
return False
if abs(time.time() - timestamp) > tolerance:
return False # Too old or too far in future
# Verify signature
if not signature_header.startswith('sha256='):
return False
received_sig = signature_header[7:]
signed_content = f"{timestamp}.{payload_body}"
expected_sig = hmac.new(
secret.encode('utf-8'),
signed_content.encode('utf-8'),
hashlib.sha256
).hexdigest()
# Constant-time comparison (prevent timing attacks)
return hmac.compare_digest(received_sig, expected_sig)
4.2 Why Timestamp Matters
Without timestamp verification, attackers can replay old webhooks:
Attack scenario:
1. Attacker intercepts webhook for "order.refunded"
2. Attacker waits 6 months
3. Attacker replays same webhook
4. Receiver processes refund again!
With timestamp verification:
1. Webhook has timestamp from 6 months ago
2. Receiver checks: |now - timestamp| > 5 minutes
3. Receiver rejects: "Timestamp too old"
4.3 Secret Rotation
Secrets should be rotated periodically:
class SecretManager:
def rotate_secret(self, subscription_id: str) -> dict:
"""Rotate webhook secret with grace period."""
subscription = self.db.get(subscription_id)
new_secret = secrets.token_hex(32)
# Keep old secret valid for 24 hours
self.db.update(subscription_id, {
'secret': new_secret,
'previous_secret': subscription.secret,
'previous_secret_expires': datetime.utcnow() + timedelta(hours=24)
})
return {
'new_secret': new_secret,
'old_secret_valid_until': (datetime.utcnow() + timedelta(hours=24)).isoformat()
}
def verify_with_rotation(
self,
subscription_id: str,
payload: str,
signature: str,
timestamp: str
) -> bool:
"""Verify allowing both current and previous secrets."""
subscription = self.db.get(subscription_id)
# Try current secret
if self.verifier.verify(payload, signature, timestamp, subscription.secret):
return True
# Try previous secret if still valid
if (subscription.previous_secret and
subscription.previous_secret_expires > datetime.utcnow()):
if self.verifier.verify(payload, signature, timestamp, subscription.previous_secret):
return True
return False
4.4 Additional Security Measures
class WebhookSecurityManager:
def validate_endpoint(self, url: str) -> bool:
"""Validate webhook URL before registration."""
parsed = urllib.parse.urlparse(url)
# Require HTTPS
if parsed.scheme != 'https':
raise ValueError("Webhook URL must use HTTPS")
# Block private IPs (SSRF prevention)
try:
ip = socket.gethostbyname(parsed.hostname)
if ipaddress.ip_address(ip).is_private:
raise ValueError("Webhook URL cannot point to private IP")
except socket.gaierror:
raise ValueError("Cannot resolve webhook hostname")
# Block localhost
if parsed.hostname in ['localhost', '127.0.0.1', '0.0.0.0']:
raise ValueError("Webhook URL cannot be localhost")
return True
def test_endpoint(self, url: str, secret: str) -> bool:
"""Send test webhook to verify endpoint works."""
test_event = {
'id': f'evt_test_{uuid.uuid4().hex[:8]}',
'type': 'webhook.test',
'created_at': datetime.utcnow().isoformat()
}
try:
response = requests.post(
url,
json=test_event,
headers=self.signer.sign_payload(test_event, secret),
timeout=10
)
return response.status_code == 200
except:
return False
Chapter 5: Scaling to 1M Webhooks/Hour
5.1 Understanding the Numbers
1,000,000 webhooks/hour
= 16,667 webhooks/minute
= 278 webhooks/second
With average 3 subscriptions per event:
= 833 delivery attempts/second
With 50% retry rate:
= ~1,250 total HTTP requests/second
Peak (2x average):
= 2,500 requests/second
5.2 Worker Pool Sizing
# Assumptions:
# - Average HTTP request takes 500ms
# - We want to handle 2,500 requests/second at peak
# - Each worker can handle 1/0.5 = 2 requests/second
# Required workers = 2,500 / 2 = 1,250 workers
# With async workers (10 concurrent requests each):
# Required workers = 2,500 / 20 = 125 workers
# With safety margin (2x):
# Deploy 250 async workers
5.3 Connection Pooling
class WebhookHTTPClient:
def __init__(self):
# Connection pool settings
self.connector = aiohttp.TCPConnector(
limit=1000, # Total connections
limit_per_host=10, # Per-endpoint limit (fairness)
ttl_dns_cache=300, # DNS cache TTL
keepalive_timeout=30 # Keep connections alive
)
self.timeout = aiohttp.ClientTimeout(
total=30, # Total request timeout
connect=5, # Connection timeout
sock_read=25 # Read timeout
)
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=self.timeout
)
5.4 Rate Limiting Per Endpoint
Don't overwhelm receivers:
class EndpointRateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
self.default_limit = 100 # requests per minute
async def acquire(self, endpoint: str, limit: int = None) -> bool:
"""Try to acquire a rate limit slot."""
limit = limit or self.default_limit
key = f"ratelimit:{endpoint}"
current = await self.redis.incr(key)
if current == 1:
await self.redis.expire(key, 60)
if current > limit:
return False
return True
async def wait_for_slot(self, endpoint: str, timeout: float = 30) -> bool:
"""Wait until rate limit slot available."""
start = time.time()
while time.time() - start < timeout:
if await self.acquire(endpoint):
return True
await asyncio.sleep(0.1)
return False
5.5 Queue Partitioning
class PartitionedQueue:
"""
Partition by subscription for:
- Parallel processing of different subscriptions
- Ordered delivery per subscription
- Isolation (slow subscriber doesn't block others)
"""
NUM_PARTITIONS = 64
def get_partition(self, subscription_id: str) -> int:
"""Consistent hash to partition."""
hash_val = int(hashlib.md5(subscription_id.encode()).hexdigest(), 16)
return hash_val % self.NUM_PARTITIONS
async def enqueue(self, task: dict):
"""Add task to appropriate partition."""
partition = self.get_partition(task['subscription_id'])
stream = f"webhooks:p{partition}"
await self.redis.xadd(stream, task)
async def start_workers(self):
"""Start workers for all partitions."""
tasks = []
for partition in range(self.NUM_PARTITIONS):
# Multiple workers per partition for throughput
for worker_num in range(4):
task = asyncio.create_task(
self.partition_worker(partition, worker_num)
)
tasks.append(task)
await asyncio.gather(*tasks)
Part II: The Design Problem
Chapter 6: Design Challenge - Receiver Down for 2 Hours
6.1 The Scenario
Timeline:
2:00 PM - Partner's webhook endpoint goes down
2:00 PM to 4:00 PM - You generate 10,000 events for them
4:00 PM - Endpoint comes back up
Questions:
1. What happens to those 10,000 events?
2. How long until they're all delivered?
3. What's the customer experience?
4. How do you optimize recovery?
6.2 Default Behavior (Without Optimization)
2:00 PM - Event 1 fails, scheduled retry at 2:01 PM
2:01 PM - Retry fails, scheduled at 2:06 PM
2:06 PM - Retry fails, scheduled at 2:21 PM
2:21 PM - Retry fails, scheduled at 3:21 PM
3:21 PM - Retry fails, scheduled at 5:21 PM
4:00 PM - Endpoint recovers
But Event 1 won't retry until 5:21 PM!
Meanwhile:
Event 1000: Scheduled retry at 5:30 PM
Event 5000: Scheduled retry at 7:00 PM
Event 10000: Scheduled retry at 9:00 PM
Full recovery takes ~7+ hours after endpoint returns!
6.3 Solution 1: Health Monitoring + Accelerated Retry
class SmartRetryManager:
async def run_health_monitor(self):
"""Monitor unhealthy endpoints and trigger recovery."""
while True:
# Get all endpoints with recent failures
unhealthy = await self.db.get_unhealthy_endpoints()
for endpoint in unhealthy:
# Check if recovered
if await self.check_health(endpoint.url):
logger.info(f"Endpoint recovered: {endpoint.url}")
# Accelerate all pending retries
await self.accelerate_retries(endpoint.subscription_id)
# Update status
endpoint.healthy = True
endpoint.consecutive_failures = 0
await self.db.update(endpoint)
await asyncio.sleep(30) # Check every 30 seconds
async def accelerate_retries(self, subscription_id: str):
"""Move all pending retries to immediate queue."""
# Find all scheduled retries for this subscription
pending = await self.queue.get_scheduled_for_subscription(subscription_id)
logger.info(f"Accelerating {len(pending)} retries for {subscription_id}")
for task in pending:
# Move from delayed queue to immediate queue
await self.queue.cancel_delayed(task['id'])
await self.queue.enqueue_immediate(task)
Result:
4:00 PM - Health check detects recovery
4:00 PM - 10,000 pending retries moved to immediate queue
4:01 PM - Workers start delivering (833/second)
4:13 PM - All 10,000 events delivered!
Recovery time: 13 minutes (vs 7+ hours)
6.4 Solution 2: Gradual Ramp-Up
Don't overwhelm a recovering endpoint:
class GradualRecoveryManager:
async def deliver_with_rampup(
self,
subscription_id: str,
pending_tasks: List[dict]
):
"""Deliver pending webhooks with gradual ramp-up."""
# Start slow, increase over time
initial_rate = 1 # per second
max_rate = 100 # per second
ramp_duration = 300 # 5 minutes to reach max
start_time = time.time()
delivered = 0
for task in pending_tasks:
# Calculate current rate based on time
elapsed = time.time() - start_time
progress = min(1.0, elapsed / ramp_duration)
current_rate = initial_rate + (max_rate - initial_rate) * progress
# Deliver
await self.deliver(task)
delivered += 1
# Rate limit
await asyncio.sleep(1.0 / current_rate)
# Log progress
if delivered % 100 == 0:
logger.info(f"Recovery: {delivered}/{len(pending_tasks)} at {current_rate:.1f}/sec")
6.5 Solution 3: Dead Letter Queue + Manual Recovery
For events that exhaust all retries:
class DLQManager:
async def get_failed_webhooks(
self,
subscription_id: str = None,
limit: int = 100
) -> List[dict]:
"""List failed webhooks for review."""
query = """
SELECT * FROM webhook_dlq
WHERE status = 'pending'
"""
if subscription_id:
query += f" AND subscription_id = '{subscription_id}'"
query += f" ORDER BY created_at DESC LIMIT {limit}"
return await self.db.fetch(query)
async def retry_dlq_entry(self, entry_id: str, user_id: str) -> dict:
"""Manually retry a DLQ entry."""
entry = await self.db.get_dlq_entry(entry_id)
if not entry:
raise NotFoundError("DLQ entry not found")
# Mark as retrying
await self.db.update_dlq_status(entry_id, 'retrying')
# Attempt delivery
result = await self.delivery_service.deliver(
entry['subscription_id'],
entry['event']
)
if result.success:
await self.db.update_dlq_status(
entry_id,
'resolved',
resolved_by=user_id
)
return {'status': 'delivered'}
else:
await self.db.update_dlq_status(entry_id, 'pending')
return {'status': 'failed', 'error': result.error}
async def retry_all_for_subscription(self, subscription_id: str, user_id: str):
"""Retry all DLQ entries for a subscription."""
entries = await self.get_failed_webhooks(subscription_id, limit=10000)
results = {'success': 0, 'failed': 0}
for entry in entries:
result = await self.retry_dlq_entry(entry['id'], user_id)
if result['status'] == 'delivered':
results['success'] += 1
else:
results['failed'] += 1
return results
6.6 Admin Dashboard
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/admin/webhooks/subscriptions/{subscription_id}/health")
async def get_subscription_health(subscription_id: str):
"""Get health status for a subscription."""
subscription = await db.get_subscription(subscription_id)
return {
"subscription_id": subscription_id,
"endpoint": subscription.url,
"healthy": subscription.healthy,
"consecutive_failures": subscription.consecutive_failures,
"last_success": subscription.last_success,
"last_failure": subscription.last_failure,
"pending_retries": await queue.count_pending(subscription_id),
"dlq_count": await dlq.count_for_subscription(subscription_id)
}
@app.get("/admin/webhooks/dlq")
async def list_dlq(subscription_id: str = None, limit: int = 100):
"""List DLQ entries."""
entries = await dlq_manager.get_failed_webhooks(subscription_id, limit)
return {
"entries": entries,
"total": len(entries)
}
@app.post("/admin/webhooks/dlq/{entry_id}/retry")
async def retry_dlq_entry(entry_id: str, current_user: User):
"""Retry a single DLQ entry."""
result = await dlq_manager.retry_dlq_entry(entry_id, current_user.id)
return result
@app.post("/admin/webhooks/subscriptions/{subscription_id}/retry-all-dlq")
async def retry_all_dlq(subscription_id: str, current_user: User):
"""Retry all DLQ entries for a subscription."""
result = await dlq_manager.retry_all_for_subscription(
subscription_id,
current_user.id
)
return result
@app.get("/admin/webhooks/stats")
async def get_webhook_stats():
"""Get overall webhook system stats."""
return {
"queue_depth": await queue.total_size(),
"retry_queue_depth": await retry_queue.size(),
"dlq_depth": await dlq.size(),
"deliveries_last_hour": await metrics.get_deliveries_count(hours=1),
"success_rate_last_hour": await metrics.get_success_rate(hours=1),
"unhealthy_endpoints": await db.count_unhealthy_endpoints()
}
Chapter 7: Monitoring and Observability
7.1 Key Metrics
from prometheus_client import Counter, Histogram, Gauge
# Delivery metrics
webhook_deliveries_total = Counter(
'webhook_deliveries_total',
'Total webhook deliveries',
['status', 'event_type'] # success, failed, retrying
)
webhook_delivery_duration = Histogram(
'webhook_delivery_duration_seconds',
'Webhook delivery duration',
buckets=[0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30]
)
# Queue metrics
webhook_queue_depth = Gauge(
'webhook_queue_depth',
'Webhooks pending delivery',
['partition']
)
webhook_retry_queue_depth = Gauge(
'webhook_retry_queue_depth',
'Webhooks pending retry'
)
webhook_dlq_depth = Gauge(
'webhook_dlq_depth',
'Webhooks in dead letter queue'
)
# Endpoint health
webhook_endpoint_healthy = Gauge(
'webhook_endpoint_healthy',
'Endpoint health status',
['subscription_id']
)
webhook_endpoint_consecutive_failures = Gauge(
'webhook_endpoint_consecutive_failures',
'Consecutive failures per endpoint',
['subscription_id']
)
7.2 Alert Rules
groups:
- name: webhook_alerts
rules:
# High failure rate
- alert: WebhookHighFailureRate
expr: |
sum(rate(webhook_deliveries_total{status="failed"}[5m])) /
sum(rate(webhook_deliveries_total[5m])) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "Webhook failure rate above 10%"
description: "{{ $value | humanizePercentage }} of webhooks failing"
# Queue backlog
- alert: WebhookQueueBacklog
expr: sum(webhook_queue_depth) > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Webhook queue backlog: {{ $value }}"
# DLQ growing
- alert: WebhookDLQGrowing
expr: webhook_dlq_depth > 100
for: 15m
labels:
severity: warning
annotations:
summary: "Webhook DLQ has {{ $value }} entries"
# Endpoint down
- alert: WebhookEndpointDown
expr: webhook_endpoint_consecutive_failures > 10
for: 10m
labels:
severity: critical
annotations:
summary: "Webhook endpoint has {{ $value }} consecutive failures"
7.3 Structured Logging
import structlog
logger = structlog.get_logger()
class WebhookLogger:
def log_delivery_attempt(
self,
subscription_id: str,
event_id: str,
event_type: str,
attempt: int,
result: DeliveryResult
):
logger.info(
"webhook_delivery_attempt",
subscription_id=subscription_id,
event_id=event_id,
event_type=event_type,
attempt=attempt,
success=result.success,
status_code=result.status_code,
duration_ms=result.duration_ms,
error=result.error
)
def log_moved_to_dlq(
self,
subscription_id: str,
event_id: str,
error: str,
attempts: int
):
logger.error(
"webhook_moved_to_dlq",
subscription_id=subscription_id,
event_id=event_id,
error=error,
total_attempts=attempts
)
def log_endpoint_status(
self,
subscription_id: str,
healthy: bool,
consecutive_failures: int
):
logger.info(
"webhook_endpoint_status",
subscription_id=subscription_id,
healthy=healthy,
consecutive_failures=consecutive_failures
)
Part III: Interview Questions
Chapter 8: The Hard Questions
8.1 "Design a webhook system for 1M events/hour. Receiver is down for 2 hours."
Strong Answer:
"Let me walk through the architecture and then address the failure scenario.
Architecture for 1M events/hour:
The math: 1M events/hour = 278/second. With average 3 subscriptions per event, that's 834 webhooks/second. At peak (2x), 1,668/second.
Components:
- Event bus: Kafka for durability and replay capability
- Dispatcher: Consumes events, fans out to delivery queues
- Partitioned queues: By subscription ID for parallel processing
- Async workers: 100+ workers with connection pooling
- Per-endpoint rate limiting: Prevent overwhelming receivers
The 2-hour outage scenario:
Default behavior is exponential backoff: 1min, 5min, 15min, 1hr, 2hr... After 2 hours of failures, webhooks are scattered across the retry schedule.
My optimization: Active health monitoring
Background job pings unhealthy endpoints every 30 seconds. When one recovers, we immediately accelerate all pending retries instead of waiting for their scheduled time.
Without this: Recovery takes 7+ hours after endpoint returns With this: Recovery takes ~15 minutes after endpoint returns
I'd also implement:
- Gradual ramp-up to avoid overwhelming recovering endpoints
- DLQ with admin dashboard for manual retry
- Alerting when endpoints are persistently unhealthy
Connection to previous days:
This ties to Day 2 (idempotency) because receivers must handle duplicates. We're doing at-least-once delivery, so they need to use the event_id for deduplication."
8.2 "How do you ensure webhook security?"
Strong Answer:
"Multiple layers of security:
1. HMAC-SHA256 signatures
Every webhook is signed with a shared secret:
- Include timestamp in signature (prevent replay attacks)
- Use constant-time comparison (prevent timing attacks)
- Document verification clearly for receivers
2. Timestamp verification
Receivers should reject webhooks with timestamps more than 5 minutes old. This prevents attackers from replaying intercepted webhooks.
3. HTTPS only
We only allow HTTPS webhook URLs. We validate this during registration and refuse HTTP endpoints.
4. Secret rotation
Support rotating secrets with a grace period:
- Generate new secret
- Keep old secret valid for 24 hours
- Both secrets accepted during transition
- Old secret expires automatically
5. SSRF prevention
When users register webhook URLs, validate they're not pointing to:
- Private IPs (10.x, 192.168.x, etc.)
- Localhost
- Internal hostnames
6. IP whitelisting option
For security-conscious receivers, publish our webhook source IPs so they can whitelist."
8.3 "How do you handle ordering of webhooks?"
Strong Answer:
"Ordering is tricky in distributed systems. Let me explain the challenge and solutions.
The problem:
Events occur: A → B → C But due to retries and parallelism, receiver might see: A → C → B
Solution 1: Per-entity ordering (recommended)
Partition by entity ID (order_id, customer_id):
- All events for same entity go to same partition
- Single worker per partition ensures order
- Different entities processed in parallel
This gives you ordering where it matters most.
Solution 2: Sequence numbers
Include sequence in payload:
{"sequence": 42, "entity_id": "order_123", ...}
Receiver tracks last processed sequence and handles gaps.
Solution 3: Accept eventual consistency
Document that strict global ordering isn't guaranteed. Many systems don't actually need it — they need idempotency and eventual consistency.
My recommendation:
Per-entity partitioning plus sequence numbers in payloads. This handles 95% of use cases. For the rare case needing strict global ordering, I'd push back on the requirement or use a different mechanism."
Chapter 9: Session Summary
What You've Learned
| Topic | Key Concepts |
|---|---|
| Delivery Guarantees | At-least-once is standard; exactly-once is impossible |
| Architecture | Event bus → Dispatcher → Queues → Workers → DLQ |
| Retry Strategy | Exponential backoff with health-based acceleration |
| Security | HMAC signatures, timestamp validation, HTTPS |
| Scale | Partitioning, connection pooling, rate limiting |
| Recovery | Health monitoring, gradual ramp-up, DLQ |
Connection to Previous Days
| Day | Connection |
|---|---|
| Day 1 (Timeouts) | Webhook requests need timeouts |
| Day 2 (Idempotency) | Receivers must be idempotent |
| Day 3 (Circuit Breakers) | Per-endpoint circuit breakers |
| Day 4 (Webhooks) | Complete async delivery system |
Key Trade-offs
| Decision | Trade-off |
|---|---|
| At-least-once vs at-most-once | Reliability vs simplicity |
| Long retry window | Higher delivery rate vs older events |
| Health monitoring | Faster recovery vs more complexity |
| Per-endpoint rate limiting | Fairness vs throughput |
Part IV: Further Learning
Resources
Documentation
- Stripe Webhooks: https://stripe.com/docs/webhooks - Industry standard
- GitHub Webhooks: https://docs.github.com/en/webhooks - Good security practices
- Twilio Webhooks: https://www.twilio.com/docs/usage/webhooks - Signature verification
Open Source Projects
- Svix: https://github.com/svix/svix-webhooks - Open-source webhook service
- Convoy: https://github.com/frain-dev/convoy - Webhook gateway
- Hookdeck: https://hookdeck.com - Webhook infrastructure
Books
- "Designing Data-Intensive Applications" by Martin Kleppmann - Chapter 11 on streaming
- "Building Microservices" by Sam Newman - Event-driven architecture
Standards
- CloudEvents: https://cloudevents.io - Standard event format
- Webhook.site: https://webhook.site - Testing webhooks
Exercises
Exercise 1: Implement Webhook Signature Verification
Build complete signature generation and verification with:
- HMAC-SHA256 signing
- Timestamp validation
- Replay attack prevention
- Secret rotation support
Exercise 2: Build Adaptive Retry System
Create a retry manager that:
- Uses exponential backoff
- Monitors endpoint health
- Accelerates retries on recovery
- Implements gradual ramp-up
Exercise 3: Design DLQ Dashboard
Create admin interface with:
- List failed webhooks by subscription
- Single and batch retry functionality
- Failure analytics and trends
- Alerting configuration
Appendix: Production Implementation
Complete Webhook Delivery Service
"""
Production webhook delivery system.
Building on Days 1-4: Timeouts, Idempotency, Circuit Breakers, Webhooks.
"""
import asyncio
import hashlib
import hmac
import json
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
from enum import Enum
import aiohttp
import structlog
from prometheus_client import Counter, Histogram, Gauge
logger = structlog.get_logger()
# =============================================================================
# Configuration
# =============================================================================
@dataclass
class WebhookConfig:
# Delivery
delivery_timeout: int = 30
max_retries: int = 9
retry_delays: List[int] = field(default_factory=lambda: [
60, 300, 900, 3600, 7200, 14400, 28800, 86400
])
# Rate limiting
max_concurrent_per_endpoint: int = 10
max_requests_per_minute: int = 100
# Health monitoring
health_check_interval: int = 30
circuit_breaker_threshold: int = 10
# Security
signature_tolerance: int = 300
# =============================================================================
# Models
# =============================================================================
class DeliveryStatus(Enum):
PENDING = "pending"
SUCCESS = "success"
FAILED = "failed"
RETRYING = "retrying"
DLQ = "dlq"
@dataclass
class WebhookSubscription:
id: str
user_id: str
url: str
secret: str
events: List[str]
active: bool = True
healthy: bool = True
consecutive_failures: int = 0
last_success: Optional[datetime] = None
last_failure: Optional[datetime] = None
created_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class WebhookEvent:
id: str
type: str
data: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.utcnow)
@dataclass
class DeliveryResult:
success: bool
status_code: Optional[int] = None
duration_ms: Optional[int] = None
error: Optional[str] = None
should_retry: bool = False
# =============================================================================
# Metrics
# =============================================================================
deliveries_total = Counter(
'webhook_deliveries_total',
'Total deliveries',
['status', 'event_type']
)
delivery_duration = Histogram(
'webhook_delivery_duration_seconds',
'Delivery duration',
buckets=[0.1, 0.5, 1, 2, 5, 10, 30]
)
queue_depth = Gauge('webhook_queue_depth', 'Queue depth')
dlq_depth = Gauge('webhook_dlq_depth', 'DLQ depth')
# =============================================================================
# Webhook Signer
# =============================================================================
class WebhookSigner:
@staticmethod
def sign(payload: dict, secret: str, timestamp: int = None) -> Dict[str, str]:
timestamp = timestamp or int(time.time())
payload_str = json.dumps(payload, separators=(',', ':'), sort_keys=True)
signed_content = f"{timestamp}.{payload_str}"
signature = hmac.new(
secret.encode(),
signed_content.encode(),
hashlib.sha256
).hexdigest()
return {
'X-Webhook-Timestamp': str(timestamp),
'X-Webhook-Signature': f"sha256={signature}",
'Content-Type': 'application/json'
}
@staticmethod
def verify(payload: str, signature: str, timestamp: str,
secret: str, tolerance: int = 300) -> bool:
try:
ts = int(timestamp)
if abs(time.time() - ts) > tolerance:
return False
if not signature.startswith('sha256='):
return False
expected = hmac.new(
secret.encode(),
f"{ts}.{payload}".encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature[7:], expected)
except:
return False
# =============================================================================
# Retry Manager
# =============================================================================
class RetryManager:
def __init__(self, config: WebhookConfig, queue, dlq):
self.config = config
self.queue = queue
self.dlq = dlq
async def schedule_retry(self, task: dict, error: str) -> bool:
attempt = task.get('attempt', 0)
if attempt >= self.config.max_retries:
await self.move_to_dlq(task, error)
return False
delay = self.config.retry_delays[min(attempt, len(self.config.retry_delays) - 1)]
delay += int(delay * 0.1 * (2 * hash(task['event']['id']) % 100 - 50) / 100)
task['attempt'] = attempt + 1
task['last_error'] = error
task['retry_at'] = (datetime.utcnow() + timedelta(seconds=delay)).isoformat()
await self.queue.enqueue_delayed(task, delay)
logger.info("retry_scheduled",
subscription_id=task['subscription_id'],
event_id=task['event']['id'],
attempt=task['attempt'],
delay=delay
)
return True
async def move_to_dlq(self, task: dict, error: str):
dlq_entry = {
'id': str(uuid.uuid4()),
'task': task,
'error': error,
'attempts': task.get('attempt', 0),
'failed_at': datetime.utcnow().isoformat(),
'status': 'pending'
}
await self.dlq.push(dlq_entry)
dlq_depth.inc()
logger.error("moved_to_dlq",
subscription_id=task['subscription_id'],
event_id=task['event']['id'],
error=error,
attempts=task.get('attempt', 0)
)
# =============================================================================
# Delivery Service
# =============================================================================
class WebhookDeliveryService:
def __init__(self, config: WebhookConfig, subscription_store,
retry_manager: RetryManager):
self.config = config
self.subscriptions = subscription_store
self.retry_manager = retry_manager
self.signer = WebhookSigner()
self.session: Optional[aiohttp.ClientSession] = None
self.endpoint_semaphores: Dict[str, asyncio.Semaphore] = {}
async def start(self):
connector = aiohttp.TCPConnector(
limit=1000,
limit_per_host=self.config.max_concurrent_per_endpoint,
ttl_dns_cache=300
)
timeout = aiohttp.ClientTimeout(total=self.config.delivery_timeout)
self.session = aiohttp.ClientSession(connector=connector, timeout=timeout)
async def stop(self):
if self.session:
await self.session.close()
async def deliver(self, task: dict) -> DeliveryResult:
subscription_id = task['subscription_id']
event = task['event']
attempt = task.get('attempt', 0)
subscription = await self.subscriptions.get(subscription_id)
if not subscription or not subscription.active:
return DeliveryResult(success=False, error="Subscription inactive")
# Build signed payload
payload = {
'id': event['id'],
'type': event['type'],
'data': event['data'],
'created_at': event['timestamp']
}
headers = self.signer.sign(payload, subscription.secret)
# Rate limit per endpoint
semaphore = self.endpoint_semaphores.setdefault(
subscription.url,
asyncio.Semaphore(self.config.max_concurrent_per_endpoint)
)
start = time.time()
async with semaphore:
try:
async with self.session.post(
subscription.url,
json=payload,
headers=headers
) as response:
duration_ms = int((time.time() - start) * 1000)
delivery_duration.observe(duration_ms / 1000)
if response.status == 200:
deliveries_total.labels(status='success', event_type=event['type']).inc()
await self._record_success(subscription)
return DeliveryResult(
success=True,
status_code=200,
duration_ms=duration_ms
)
elif 400 <= response.status < 500 and response.status != 429:
deliveries_total.labels(status='client_error', event_type=event['type']).inc()
return DeliveryResult(
success=False,
status_code=response.status,
error=f"Client error: {response.status}",
should_retry=False
)
else:
deliveries_total.labels(status='server_error', event_type=event['type']).inc()
await self._record_failure(subscription)
return DeliveryResult(
success=False,
status_code=response.status,
error=f"Server error: {response.status}",
should_retry=True,
duration_ms=duration_ms
)
except asyncio.TimeoutError:
deliveries_total.labels(status='timeout', event_type=event['type']).inc()
await self._record_failure(subscription)
return DeliveryResult(
success=False,
error="Timeout",
should_retry=True
)
except Exception as e:
deliveries_total.labels(status='error', event_type=event['type']).inc()
await self._record_failure(subscription)
return DeliveryResult(
success=False,
error=str(e),
should_retry=True
)
async def _record_success(self, subscription: WebhookSubscription):
subscription.healthy = True
subscription.consecutive_failures = 0
subscription.last_success = datetime.utcnow()
await self.subscriptions.update(subscription)
async def _record_failure(self, subscription: WebhookSubscription):
subscription.consecutive_failures += 1
subscription.last_failure = datetime.utcnow()
if subscription.consecutive_failures >= self.config.circuit_breaker_threshold:
subscription.healthy = False
await self.subscriptions.update(subscription)
# =============================================================================
# Worker
# =============================================================================
class WebhookWorker:
def __init__(self, worker_id: str, delivery_service: WebhookDeliveryService,
retry_manager: RetryManager, queue):
self.worker_id = worker_id
self.delivery = delivery_service
self.retry_manager = retry_manager
self.queue = queue
self.running = False
async def run(self):
self.running = True
logger.info("worker_started", worker_id=self.worker_id)
while self.running:
try:
tasks = await self.queue.dequeue(count=10, timeout=5)
if tasks:
await asyncio.gather(*[
self.process_task(task) for task in tasks
])
except Exception as e:
logger.error("worker_error", error=str(e))
await asyncio.sleep(1)
async def process_task(self, task: dict):
result = await self.delivery.deliver(task)
if result.success:
logger.info("delivery_success",
subscription_id=task['subscription_id'],
event_id=task['event']['id'],
duration_ms=result.duration_ms
)
elif result.should_retry:
await self.retry_manager.schedule_retry(task, result.error)
else:
logger.warning("delivery_permanent_failure",
subscription_id=task['subscription_id'],
event_id=task['event']['id'],
error=result.error
)
async def stop(self):
self.running = False
# =============================================================================
# Health Checker
# =============================================================================
class HealthChecker:
def __init__(self, subscription_store, queue, check_interval: int = 30):
self.subscriptions = subscription_store
self.queue = queue
self.check_interval = check_interval
self.running = False
async def run(self):
self.running = True
while self.running:
try:
unhealthy = await self.subscriptions.find_unhealthy()
for sub in unhealthy:
if await self._check_health(sub.url):
logger.info("endpoint_recovered",
subscription_id=sub.id, url=sub.url)
sub.healthy = True
sub.consecutive_failures = 0
await self.subscriptions.update(sub)
await self._accelerate_retries(sub.id)
await asyncio.sleep(self.check_interval)
except Exception as e:
logger.error("health_check_error", error=str(e))
await asyncio.sleep(self.check_interval)
async def _check_health(self, url: str) -> bool:
try:
async with aiohttp.ClientSession() as session:
async with session.head(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
return resp.status < 500
except:
return False
async def _accelerate_retries(self, subscription_id: str):
pending = await self.queue.get_pending_for_subscription(subscription_id)
for task in pending:
await self.queue.move_to_immediate(task)
if pending:
logger.info("retries_accelerated",
subscription_id=subscription_id,
count=len(pending)
)
async def stop(self):
self.running = False
# =============================================================================
# DLQ Manager
# =============================================================================
class DLQManager:
def __init__(self, dlq, delivery_service: WebhookDeliveryService):
self.dlq = dlq
self.delivery = delivery_service
async def list_entries(self, subscription_id: str = None,
status: str = 'pending', limit: int = 100) -> List[dict]:
return await self.dlq.find(
subscription_id=subscription_id,
status=status,
limit=limit
)
async def retry_entry(self, entry_id: str, user_id: str) -> dict:
entry = await self.dlq.get(entry_id)
if not entry:
raise ValueError("Entry not found")
await self.dlq.update_status(entry_id, 'retrying')
result = await self.delivery.deliver(entry['task'])
if result.success:
await self.dlq.update_status(entry_id, 'resolved', resolved_by=user_id)
dlq_depth.dec()
return {'status': 'delivered'}
else:
await self.dlq.update_status(entry_id, 'pending')
return {'status': 'failed', 'error': result.error}
async def retry_all(self, subscription_id: str, user_id: str) -> dict:
entries = await self.list_entries(subscription_id, limit=10000)
results = {'success': 0, 'failed': 0}
for entry in entries:
result = await self.retry_entry(entry['id'], user_id)
if result['status'] == 'delivered':
results['success'] += 1
else:
results['failed'] += 1
return results
async def discard_entry(self, entry_id: str, user_id: str, reason: str):
await self.dlq.update_status(entry_id, 'discarded',
resolved_by=user_id, resolution_note=reason)
dlq_depth.dec()
End of Day 4: Webhook Delivery System
Tomorrow: Day 5 — Distributed Cron. How do you ensure a scheduled job runs exactly once across multiple servers? We'll cover leader election, fencing tokens, and why ZooKeeper/etcd exist.