Week 6 — Day 4: Scale, Reliability & Edge Cases
System Design Mastery Series — Practical Application Week
Introduction
We've built a solid notification system. Now let's break it.
Today we explore everything that goes wrong at scale — and how to handle it. This is the knowledge that separates engineers who've built toy systems from those who've operated production platforms.
Today's Theme: "What breaks at scale? Everything."
Part I: Scaling Challenges
Chapter 1: Campaign Mode — 10M Notifications in 1 Hour
Marketing wants to send 10 million notifications in an hour. Your system handles 5,000/sec normally. This is 2,800/sec sustained for an hour. What breaks?
1.1 The Problem
CAMPAIGN MODE CHALLENGES
Normal load: 5,000 notifications/sec
Campaign load: 2,800/sec additional (10M in 1 hour)
Combined: 7,800/sec (56% increase)
But campaigns don't spread evenly:
- Marketing clicks "Send" at 10:00:00 AM
- All 10M notifications hit ingestion simultaneously
- Initial burst: 100,000+/sec for first few seconds
What breaks:
1. API servers get overwhelmed
2. Kafka partitions become unbalanced
3. Database connection pools exhaust
4. Provider rate limits hit immediately
5. Workers can't keep up → queue depth grows
6. Memory pressure from buffered messages
1.2 Solution: Campaign Ingestion Service
# services/campaign_ingestion.py
import asyncio
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, AsyncIterator
import logging
logger = logging.getLogger(__name__)
@dataclass
class Campaign:
campaign_id: str
name: str
template: str
variables: dict
# Target audience
user_ids: list[str] = None
segment_query: str = None # For dynamic segments
# Throttling
target_send_rate: int = 1000 # per second
max_duration_hours: int = 4
# Scheduling
scheduled_start: Optional[datetime] = None
# Progress
total_recipients: int = 0
sent_count: int = 0
failed_count: int = 0
status: str = "pending"
class CampaignIngestionService:
"""
Handles high-volume campaign sends with controlled ingestion.
Key strategies:
1. Rate-limited ingestion (don't flood the system)
2. Batch database reads (don't query 10M users one by one)
3. Chunked processing (resume on failure)
4. Progress tracking (know where you are)
"""
def __init__(
self,
user_repo,
notification_service,
campaign_repo,
default_rate: int = 1000
):
self.users = user_repo
self.notifications = notification_service
self.campaigns = campaign_repo
self.default_rate = default_rate
async def start_campaign(self, campaign: Campaign):
"""Start processing a campaign with controlled rate."""
campaign.status = "running"
await self.campaigns.update(campaign)
logger.info(
f"Starting campaign {campaign.campaign_id} "
f"at {campaign.target_send_rate}/sec"
)
try:
# Process in controlled batches
async for batch in self._get_recipient_batches(campaign):
await self._process_batch(campaign, batch)
# Rate limiting between batches
await self._throttle(campaign, len(batch))
# Check for pause/cancel
if await self._should_stop(campaign.campaign_id):
campaign.status = "paused"
break
if campaign.status == "running":
campaign.status = "completed"
except Exception as e:
logger.error(f"Campaign {campaign.campaign_id} failed: {e}")
campaign.status = "failed"
await self.campaigns.update(campaign)
async def _get_recipient_batches(
self,
campaign: Campaign,
batch_size: int = 1000
) -> AsyncIterator[list[str]]:
"""
Stream recipient user IDs in batches.
Uses cursor-based pagination to handle millions of users
without loading all into memory.
"""
if campaign.user_ids:
# Static list of user IDs
for i in range(0, len(campaign.user_ids), batch_size):
yield campaign.user_ids[i:i + batch_size]
elif campaign.segment_query:
# Dynamic segment - paginate through database
cursor = None
while True:
users, cursor = await self.users.query_segment(
campaign.segment_query,
limit=batch_size,
cursor=cursor
)
if not users:
break
yield [u.user_id for u in users]
if not cursor:
break
async def _process_batch(self, campaign: Campaign, user_ids: list[str]):
"""Process a batch of recipients."""
tasks = []
for user_id in user_ids:
task = self.notifications.send({
"user_id": user_id,
"type": "marketing",
"template": campaign.template,
"variables": campaign.variables,
"metadata": {
"campaign_id": campaign.campaign_id,
}
})
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Update counts
for result in results:
if isinstance(result, Exception):
campaign.failed_count += 1
else:
campaign.sent_count += 1
# Persist progress periodically
if campaign.sent_count % 10000 == 0:
await self.campaigns.update(campaign)
logger.info(
f"Campaign {campaign.campaign_id} progress: "
f"{campaign.sent_count}/{campaign.total_recipients}"
)
async def _throttle(self, campaign: Campaign, batch_size: int):
"""Rate limit to target send rate."""
# Calculate delay to achieve target rate
target_rate = campaign.target_send_rate
delay = batch_size / target_rate
await asyncio.sleep(delay)
async def _should_stop(self, campaign_id: str) -> bool:
"""Check if campaign should stop (paused or cancelled)."""
campaign = await self.campaigns.get(campaign_id)
return campaign.status in ["pausing", "cancelled"]
class CampaignRateLimiter:
"""
Global rate limiter for campaigns.
Ensures total campaign throughput doesn't overwhelm the system,
even with multiple concurrent campaigns.
"""
def __init__(
self,
redis_client,
global_limit: int = 5000, # Total campaign sends/sec
window_seconds: int = 1
):
self.redis = redis_client
self.global_limit = global_limit
self.window = window_seconds
async def acquire(self, count: int = 1) -> bool:
"""
Try to acquire capacity for sending.
Returns True if allowed, False if rate limited.
"""
key = "campaign:rate_limit"
now = datetime.utcnow().timestamp()
window_start = now - self.window
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count current window
pipe.zcard(key)
results = await pipe.execute()
current_count = results[1]
if current_count + count > self.global_limit:
return False
# Add new entries
await self.redis.zadd(key, {f"{now}:{count}": now})
await self.redis.expire(key, self.window + 1)
return True
async def wait_for_capacity(self, count: int = 1):
"""Wait until capacity is available."""
while not await self.acquire(count):
await asyncio.sleep(0.1)
1.3 Priority Isolation
# services/priority_isolation.py
"""
Priority isolation ensures campaigns don't affect critical notifications.
Strategy:
1. Separate Kafka topics per priority
2. Separate worker pools per priority
3. Critical notifications bypass campaign rate limits
4. Dedicated resources for transaction/security notifications
"""
class PriorityIsolationConfig:
"""Configuration for priority-based resource isolation."""
KAFKA_TOPICS = {
"critical": {
"topic": "notifications.critical",
"partitions": 32,
"replication": 3,
"retention_hours": 168,
},
"high": {
"topic": "notifications.high",
"partitions": 64,
"replication": 3,
"retention_hours": 72,
},
"medium": {
"topic": "notifications.medium",
"partitions": 32,
"replication": 2,
"retention_hours": 24,
},
"low": {
"topic": "notifications.low",
"partitions": 16,
"replication": 2,
"retention_hours": 24,
},
}
WORKER_POOLS = {
"critical": {
"min_workers": 10,
"max_workers": 50,
"dedicated": True, # Never shared with other priorities
},
"high": {
"min_workers": 20,
"max_workers": 100,
"dedicated": True,
},
"low": {
"min_workers": 5,
"max_workers": 200, # Can scale for campaigns
"dedicated": False,
},
}
RATE_LIMITS = {
"critical": None, # No rate limit
"high": 10000, # 10K/sec
"medium": 5000, # 5K/sec
"low": 2000, # 2K/sec (campaigns)
}
Chapter 2: The Hot User Problem
A celebrity with 1 million followers posts something. You need to notify all followers. This is the "fan-out" problem.
2.1 The Problem
HOT USER SCENARIO
Celebrity posts at 10:00:00 AM
├── 1,000,000 followers need notification
├── All notifications created simultaneously
├── All partition to same user's data
└── All hit same database rows for preferences
What breaks:
1. Single partition becomes hot (all fan-out to one key)
2. Database row locks (everyone reading same user)
3. Provider rate limits (1M push notifications at once)
4. Memory exhaustion (buffering 1M notifications)
Traditional approach fails:
- "Notify all followers of user X"
- Generates 1M notification requests instantly
- System drowns
2.2 Solution: Async Fan-Out with Batching
# services/fanout.py
import asyncio
from datetime import datetime
from typing import AsyncIterator
import logging
logger = logging.getLogger(__name__)
class FanoutService:
"""
Handles fan-out for notifications to many recipients.
Strategies:
1. Async processing (don't block the triggering event)
2. Batched retrieval (paginate through followers)
3. Rate-limited sending (don't overwhelm downstream)
4. Progress tracking (resume on failure)
"""
def __init__(
self,
follower_repo,
notification_service,
fanout_repo,
batch_size: int = 1000,
rate_limit: int = 5000 # per second
):
self.followers = follower_repo
self.notifications = notification_service
self.fanouts = fanout_repo
self.batch_size = batch_size
self.rate_limit = rate_limit
async def trigger_fanout(
self,
source_user_id: str,
notification_type: str,
template: str,
variables: dict
) -> str:
"""
Trigger fan-out to all followers.
Returns fanout_id for tracking.
Actual sending happens asynchronously.
"""
# Get follower count
follower_count = await self.followers.get_count(source_user_id)
# Create fanout job
fanout = {
"fanout_id": str(uuid.uuid4()),
"source_user_id": source_user_id,
"notification_type": notification_type,
"template": template,
"variables": variables,
"total_recipients": follower_count,
"processed_count": 0,
"status": "pending",
"created_at": datetime.utcnow(),
}
await self.fanouts.save(fanout)
# Queue for async processing
await self._queue_fanout(fanout["fanout_id"])
logger.info(
f"Triggered fanout {fanout['fanout_id']} "
f"to {follower_count} recipients"
)
return fanout["fanout_id"]
async def process_fanout(self, fanout_id: str):
"""Process a fanout job (called by worker)."""
fanout = await self.fanouts.get(fanout_id)
if not fanout:
return
fanout["status"] = "processing"
await self.fanouts.update(fanout)
try:
# Process in batches with rate limiting
tokens_per_second = self.rate_limit
token_bucket = tokens_per_second
last_refill = datetime.utcnow()
async for batch in self._get_follower_batches(fanout["source_user_id"]):
# Rate limiting with token bucket
now = datetime.utcnow()
elapsed = (now - last_refill).total_seconds()
token_bucket = min(tokens_per_second, token_bucket + elapsed * tokens_per_second)
last_refill = now
if token_bucket < len(batch):
wait_time = (len(batch) - token_bucket) / tokens_per_second
await asyncio.sleep(wait_time)
token_bucket = 0
else:
token_bucket -= len(batch)
# Send batch
await self._send_batch(fanout, batch)
fanout["processed_count"] += len(batch)
# Checkpoint progress
if fanout["processed_count"] % 10000 == 0:
await self.fanouts.update(fanout)
fanout["status"] = "completed"
except Exception as e:
logger.error(f"Fanout {fanout_id} failed: {e}")
fanout["status"] = "failed"
fanout["error"] = str(e)
fanout["completed_at"] = datetime.utcnow()
await self.fanouts.update(fanout)
async def _get_follower_batches(self, user_id: str) -> AsyncIterator[list[str]]:
"""Stream follower IDs in batches."""
cursor = None
while True:
followers, cursor = await self.followers.get_followers(
user_id,
limit=self.batch_size,
cursor=cursor
)
if not followers:
break
yield [f.follower_id for f in followers]
if not cursor:
break
async def _send_batch(self, fanout: dict, follower_ids: list[str]):
"""Send notifications to a batch of followers."""
tasks = [
self.notifications.send({
"user_id": follower_id,
"type": fanout["notification_type"],
"template": fanout["template"],
"variables": fanout["variables"],
"metadata": {
"fanout_id": fanout["fanout_id"],
"source_user_id": fanout["source_user_id"],
}
})
for follower_id in follower_ids
]
await asyncio.gather(*tasks, return_exceptions=True)
async def _queue_fanout(self, fanout_id: str):
"""Queue fanout for async processing."""
# Publish to Kafka fanout topic
pass
2.3 Database Hot Spot Mitigation
# services/hot_spot_mitigation.py
"""
Strategies to avoid database hot spots during fan-out.
"""
class CachedPreferencesService:
"""
Cache user preferences to avoid DB hot spots.
During fan-out, we'd hit the source user's preferences
millions of times. Cache aggressively.
"""
def __init__(self, preference_repo, cache, default_ttl: int = 300):
self.preferences = preference_repo
self.cache = cache
self.default_ttl = default_ttl
async def get_preferences_batch(
self,
user_ids: list[str]
) -> dict[str, UserPreferences]:
"""
Get preferences for multiple users efficiently.
Uses batch cache lookup and single DB query for misses.
"""
# Try cache first
cache_keys = [f"prefs:{uid}" for uid in user_ids]
cached = await self.cache.mget(cache_keys)
result = {}
missing_ids = []
for i, user_id in enumerate(user_ids):
if cached[i]:
result[user_id] = UserPreferences.from_dict(cached[i])
else:
missing_ids.append(user_id)
# Batch fetch from DB
if missing_ids:
db_results = await self.preferences.get_batch(missing_ids)
# Cache and add to result
to_cache = {}
for user_id, prefs in db_results.items():
result[user_id] = prefs
to_cache[f"prefs:{user_id}"] = prefs.to_dict()
if to_cache:
await self.cache.mset(to_cache, ttl=self.default_ttl)
return result
class PartitionAwareNotificationService:
"""
Avoid Kafka partition hot spots during fan-out.
Problem: If we partition by source_user_id, all fan-out
notifications go to same partition.
Solution: Partition by recipient_user_id instead.
"""
async def send(self, notification: dict):
# Use recipient as partition key, not source
partition_key = notification["user_id"] # recipient
# NOT notification["metadata"]["source_user_id"]
await self.kafka.send(
topic=self._get_topic(notification),
key=partition_key.encode(),
value=notification
)
Chapter 3: Provider Rate Limits
Every provider has rate limits. You need to respect them while maximizing throughput.
3.1 Provider Rate Limit Configuration
# config/provider_limits.py
PROVIDER_RATE_LIMITS = {
"fcm": {
"requests_per_second": 1000, # Per project
"batch_size": 500, # Messages per batch request
"concurrent_connections": 100,
},
"apns": {
"requests_per_second": 4000, # Apple is generous
"concurrent_connections": 500,
"notification_per_connection": 1,
},
"sendgrid": {
"requests_per_second": 100, # Depends on plan
"emails_per_request": 1000, # Batch API
"daily_limit": 100000, # Plan dependent
},
"twilio": {
"messages_per_second": 100, # Per phone number
"concurrent_connections": 50,
# Different limits per number type:
"toll_free_per_second": 30,
"short_code_per_second": 100,
"10dlc_per_second": 15, # Varies by trust score
},
}
3.2 Provider Rate Limiter
# services/provider_rate_limiter.py
import asyncio
from datetime import datetime
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class ProviderRateLimiter:
"""
Rate limiter for external notification providers.
Features:
- Per-provider rate limiting
- Token bucket algorithm
- Backpressure signaling
- Automatic recovery
"""
def __init__(self, redis_client, config: dict):
self.redis = redis_client
self.config = config
self.local_buckets = {} # For fast path
async def acquire(
self,
provider: str,
count: int = 1,
timeout: float = 5.0
) -> bool:
"""
Acquire rate limit tokens for a provider.
Returns True if acquired, False if timed out.
"""
limit = self.config.get(provider, {}).get("requests_per_second", 100)
start = datetime.utcnow()
while True:
if await self._try_acquire(provider, count, limit):
return True
# Check timeout
elapsed = (datetime.utcnow() - start).total_seconds()
if elapsed >= timeout:
return False
# Wait before retry
await asyncio.sleep(0.05)
async def _try_acquire(
self,
provider: str,
count: int,
limit: int
) -> bool:
"""Try to acquire tokens using Redis."""
key = f"rate_limit:provider:{provider}"
now = datetime.utcnow().timestamp()
window_start = now - 1.0 # 1 second window
# Lua script for atomic rate limiting
script = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window_start = tonumber(ARGV[2])
local count = tonumber(ARGV[3])
local limit = tonumber(ARGV[4])
-- Remove old entries
redis.call('ZREMRANGEBYSCORE', key, 0, window_start)
-- Get current count
local current = redis.call('ZCARD', key)
-- Check if we can acquire
if current + count > limit then
return 0
end
-- Add new entries
for i = 1, count do
redis.call('ZADD', key, now, now .. ':' .. i .. ':' .. math.random())
end
redis.call('EXPIRE', key, 2)
return 1
"""
result = await self.redis.eval(
script,
keys=[key],
args=[now, window_start, count, limit]
)
return result == 1
async def get_available_capacity(self, provider: str) -> int:
"""Get remaining capacity for a provider."""
limit = self.config.get(provider, {}).get("requests_per_second", 100)
key = f"rate_limit:provider:{provider}"
now = datetime.utcnow().timestamp()
window_start = now - 1.0
# Count current usage
current = await self.redis.zcount(key, window_start, now)
return max(0, limit - current)
async def report_rate_limit_error(self, provider: str, retry_after: int = 60):
"""
Report that provider returned rate limit error.
Temporarily reduces our rate to avoid further errors.
"""
key = f"rate_limit:backoff:{provider}"
# Set backoff flag
await self.redis.setex(key, retry_after, "1")
logger.warning(
f"Provider {provider} rate limited, backing off for {retry_after}s"
)
async def is_backed_off(self, provider: str) -> bool:
"""Check if we're in backoff for a provider."""
key = f"rate_limit:backoff:{provider}"
return await self.redis.exists(key)
Part II: Reliability Patterns
Chapter 4: Circuit Breakers
When a provider is failing, stop sending to it temporarily.
4.1 Circuit Breaker Implementation
# services/circuit_breaker.py
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, Callable
import asyncio
import logging
logger = logging.getLogger(__name__)
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # Failures before opening
success_threshold: int = 3 # Successes to close from half-open
timeout_seconds: int = 30 # Time before half-open
half_open_max_calls: int = 3 # Calls allowed in half-open
@dataclass
class CircuitStats:
state: CircuitState = CircuitState.CLOSED
failure_count: int = 0
success_count: int = 0
last_failure_time: Optional[datetime] = None
last_state_change: datetime = None
half_open_calls: int = 0
class CircuitBreaker:
"""
Circuit breaker for provider calls.
States:
- CLOSED: Normal operation, track failures
- OPEN: Rejecting calls, wait for timeout
- HALF_OPEN: Testing with limited calls
Week 2 Concept: Circuit Breakers
"""
def __init__(
self,
name: str,
config: CircuitBreakerConfig = None,
on_state_change: Callable = None
):
self.name = name
self.config = config or CircuitBreakerConfig()
self.stats = CircuitStats(last_state_change=datetime.utcnow())
self.on_state_change = on_state_change
self._lock = asyncio.Lock()
async def call(self, func: Callable, *args, **kwargs):
"""
Execute function with circuit breaker protection.
Raises CircuitOpenError if circuit is open.
"""
# Check if call is allowed
if not await self._can_call():
raise CircuitOpenError(
f"Circuit {self.name} is open, request rejected"
)
try:
result = await func(*args, **kwargs)
await self._record_success()
return result
except Exception as e:
await self._record_failure()
raise
async def _can_call(self) -> bool:
"""Check if a call is allowed."""
async with self._lock:
if self.stats.state == CircuitState.CLOSED:
return True
if self.stats.state == CircuitState.OPEN:
# Check if timeout elapsed
if self._timeout_elapsed():
self._transition_to(CircuitState.HALF_OPEN)
return True
return False
if self.stats.state == CircuitState.HALF_OPEN:
# Allow limited calls
if self.stats.half_open_calls < self.config.half_open_max_calls:
self.stats.half_open_calls += 1
return True
return False
return False
async def _record_success(self):
"""Record a successful call."""
async with self._lock:
if self.stats.state == CircuitState.HALF_OPEN:
self.stats.success_count += 1
if self.stats.success_count >= self.config.success_threshold:
self._transition_to(CircuitState.CLOSED)
elif self.stats.state == CircuitState.CLOSED:
# Reset failure count on success
self.stats.failure_count = 0
async def _record_failure(self):
"""Record a failed call."""
async with self._lock:
self.stats.failure_count += 1
self.stats.last_failure_time = datetime.utcnow()
if self.stats.state == CircuitState.HALF_OPEN:
# Any failure in half-open goes back to open
self._transition_to(CircuitState.OPEN)
elif self.stats.state == CircuitState.CLOSED:
if self.stats.failure_count >= self.config.failure_threshold:
self._transition_to(CircuitState.OPEN)
def _transition_to(self, new_state: CircuitState):
"""Transition to a new state."""
old_state = self.stats.state
self.stats.state = new_state
self.stats.last_state_change = datetime.utcnow()
# Reset counters
if new_state == CircuitState.CLOSED:
self.stats.failure_count = 0
self.stats.success_count = 0
elif new_state == CircuitState.HALF_OPEN:
self.stats.half_open_calls = 0
self.stats.success_count = 0
logger.warning(
f"Circuit {self.name} transitioned: {old_state.value} -> {new_state.value}"
)
if self.on_state_change:
self.on_state_change(self.name, old_state, new_state)
def _timeout_elapsed(self) -> bool:
"""Check if timeout has elapsed since last state change."""
elapsed = datetime.utcnow() - self.stats.last_state_change
return elapsed.total_seconds() >= self.config.timeout_seconds
@property
def is_open(self) -> bool:
return self.stats.state == CircuitState.OPEN
class CircuitOpenError(Exception):
pass
class CircuitBreakerRegistry:
"""Registry of circuit breakers for all providers."""
def __init__(self):
self.breakers: dict[str, CircuitBreaker] = {}
def get_or_create(
self,
name: str,
config: CircuitBreakerConfig = None
) -> CircuitBreaker:
"""Get or create a circuit breaker."""
if name not in self.breakers:
self.breakers[name] = CircuitBreaker(
name=name,
config=config,
on_state_change=self._on_state_change
)
return self.breakers[name]
def _on_state_change(self, name: str, old_state: CircuitState, new_state: CircuitState):
"""Handle circuit state changes."""
# Could publish metrics, send alerts, etc.
pass
def get_all_status(self) -> dict:
"""Get status of all circuit breakers."""
return {
name: {
"state": cb.stats.state.value,
"failure_count": cb.stats.failure_count,
"last_failure": cb.stats.last_failure_time.isoformat() if cb.stats.last_failure_time else None,
}
for name, cb in self.breakers.items()
}
4.2 Provider with Circuit Breaker
# providers/fcm_with_circuit_breaker.py
class FCMProviderWithCircuitBreaker:
"""FCM provider with circuit breaker protection."""
def __init__(
self,
fcm_provider: FCMProvider,
circuit_breaker: CircuitBreaker
):
self.provider = fcm_provider
self.circuit = circuit_breaker
async def send(self, *args, **kwargs) -> DeliveryResult:
"""Send with circuit breaker protection."""
try:
return await self.circuit.call(
self.provider.send,
*args,
**kwargs
)
except CircuitOpenError:
return DeliveryResult(
status=DeliveryStatus.FAILED,
error_code="CIRCUIT_OPEN",
error_message="Provider circuit breaker is open",
should_retry=True,
should_fallback=True
)
Chapter 5: Provider Failover
When one provider fails, automatically switch to backup.
5.1 Multi-Provider Manager
# services/provider_manager.py
from dataclasses import dataclass
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class ProviderConfig:
name: str
priority: int # Lower = higher priority
weight: int = 100 # For load balancing
is_fallback: bool = False
class ProviderManager:
"""
Manages multiple providers per channel with failover.
Features:
- Primary/fallback configuration
- Automatic failover on circuit break
- Health-based routing
- Load balancing across providers
"""
def __init__(
self,
providers: dict, # name -> provider instance
circuit_registry: CircuitBreakerRegistry,
health_checker
):
self.providers = providers
self.circuits = circuit_registry
self.health = health_checker
# Provider configs per channel
self.configs = {
"push": [
ProviderConfig("fcm", priority=1),
ProviderConfig("apns", priority=1),
],
"email": [
ProviderConfig("sendgrid", priority=1),
ProviderConfig("ses", priority=2, is_fallback=True),
],
"sms": [
ProviderConfig("twilio", priority=1),
ProviderConfig("sns", priority=2, is_fallback=True),
],
}
async def get_provider(
self,
channel: str,
exclude: list[str] = None
) -> Optional[tuple[str, any]]:
"""
Get the best available provider for a channel.
Returns (provider_name, provider_instance) or None if all unavailable.
"""
exclude = exclude or []
configs = self.configs.get(channel, [])
# Sort by priority
sorted_configs = sorted(configs, key=lambda c: c.priority)
for config in sorted_configs:
if config.name in exclude:
continue
# Check circuit breaker
circuit = self.circuits.get_or_create(config.name)
if circuit.is_open:
continue
# Check health
if not await self.health.is_healthy(config.name):
continue
provider = self.providers.get(config.name)
if provider:
return (config.name, provider)
return None
async def send_with_failover(
self,
channel: str,
notification: dict,
max_attempts: int = 3
) -> DeliveryResult:
"""
Send notification with automatic failover.
Tries providers in order until one succeeds.
"""
attempted = []
last_result = None
for attempt in range(max_attempts):
provider_info = await self.get_provider(channel, exclude=attempted)
if not provider_info:
break
provider_name, provider = provider_info
attempted.append(provider_name)
try:
result = await provider.send(notification)
if result.status == DeliveryStatus.SUCCESS:
return result
if not result.should_fallback:
return result
last_result = result
logger.info(
f"Provider {provider_name} failed, trying fallback"
)
except Exception as e:
logger.error(f"Provider {provider_name} error: {e}")
last_result = DeliveryResult(
status=DeliveryStatus.FAILED,
error_code="PROVIDER_ERROR",
error_message=str(e)
)
# All providers failed
return last_result or DeliveryResult(
status=DeliveryStatus.FAILED,
error_code="NO_PROVIDERS",
error_message="All providers unavailable"
)
Part III: Edge Cases
Chapter 6: The Edge Case Encyclopedia
Every edge case you'll encounter in production.
6.1 Device Token Issues
# edge_cases/device_tokens.py
"""
Device Token Edge Cases
1. Token Rotation
- iOS/Android periodically rotate device tokens
- Old token becomes invalid
- Solution: Handle InvalidToken, listen for new token events
2. Multiple Devices
- User has phone + tablet + web
- Each has different token
- Solution: Send to all active tokens
3. Stale Tokens
- User uninstalls app
- Token remains in our database
- Solution: Mark invalid on InvalidToken, periodic cleanup
4. Token Collision
- Same token registered for multiple users (rare but possible)
- Solution: Associate token with single user, update on conflict
"""
class DeviceTokenManager:
"""Handles device token lifecycle and edge cases."""
async def handle_invalid_token(
self,
token: str,
error_code: str
):
"""Handle token invalidation from provider."""
if error_code in ["UNREGISTERED", "InvalidToken", "NotRegistered"]:
# Token is permanently invalid
await self.tokens.mark_invalid(
token,
reason=error_code,
invalidated_at=datetime.utcnow()
)
logger.info(f"Marked token as invalid: {token[:20]}...")
async def handle_token_refresh(
self,
user_id: str,
old_token: str,
new_token: str,
platform: str
):
"""Handle token rotation from client."""
# Validate new token format
if not self._is_valid_token_format(new_token, platform):
raise ValueError("Invalid token format")
# Check if new token already exists for different user
existing = await self.tokens.get_by_token(new_token)
if existing and existing.user_id != user_id:
# Token collision - reassign to new user
await self.tokens.delete(existing.token_id)
# Update or create
if old_token:
await self.tokens.update_token(
user_id=user_id,
old_token=old_token,
new_token=new_token
)
else:
await self.tokens.create(
user_id=user_id,
token=new_token,
platform=platform
)
async def cleanup_stale_tokens(self, days_inactive: int = 90):
"""Remove tokens that haven't been used in a while."""
cutoff = datetime.utcnow() - timedelta(days=days_inactive)
deleted = await self.tokens.delete_inactive(before=cutoff)
logger.info(f"Cleaned up {deleted} stale device tokens")
6.2 Email Bounce Handling
# edge_cases/email_bounces.py
"""
Email Bounce Edge Cases
1. Hard Bounce
- Email address doesn't exist
- Solution: Mark as invalid, never send again
2. Soft Bounce
- Temporary issue (mailbox full, server down)
- Solution: Retry with backoff, mark invalid after N failures
3. Complaint (Spam Report)
- User marked as spam
- Solution: Immediate unsubscribe, legal requirement
4. Unsubscribe
- User clicked unsubscribe link
- Solution: Update preferences, honor immediately
"""
class EmailBounceHandler:
"""Handles email bounces and complaints."""
HARD_BOUNCE_CODES = [
"550", # User unknown
"551", # User not local
"552", # Mailbox full (sometimes hard)
"553", # Invalid address
"554", # Transaction failed
]
async def handle_bounce(
self,
email: str,
bounce_type: str,
bounce_code: str,
diagnostic: str
):
"""Handle email bounce webhook."""
email_record = await self.emails.get_by_email(email)
if not email_record:
return
if bounce_type == "hard" or bounce_code in self.HARD_BOUNCE_CODES:
# Permanent failure - mark as invalid
await self.emails.update(
email_record.email_id,
{
"status": "bounced",
"bounce_type": "hard",
"bounce_count": email_record.bounce_count + 1,
"last_bounce_at": datetime.utcnow(),
}
)
logger.warning(f"Hard bounce for {email}: {diagnostic}")
else:
# Soft bounce - increment counter
await self.emails.update(
email_record.email_id,
{
"bounce_count": email_record.bounce_count + 1,
"last_bounce_at": datetime.utcnow(),
}
)
# Mark as invalid after too many soft bounces
if email_record.bounce_count >= 5:
await self.emails.update(
email_record.email_id,
{"status": "bounced", "bounce_type": "soft"}
)
async def handle_complaint(self, email: str, feedback_type: str):
"""Handle spam complaint - MUST unsubscribe immediately."""
email_record = await self.emails.get_by_email(email)
if not email_record:
return
# Mark as complained
await self.emails.update(
email_record.email_id,
{"status": "complained"}
)
# Unsubscribe from all marketing
await self.preferences.unsubscribe(
email_record.user_id,
category="marketing"
)
logger.warning(
f"Spam complaint from {email}, unsubscribed from marketing"
)
6.3 Timezone and DST Edge Cases
# edge_cases/timezones.py
"""
Timezone Edge Cases
1. DST Transitions
- 2 AM becomes 3 AM (spring forward)
- 2 AM happens twice (fall back)
- Solution: Store in UTC, convert at display/send time
2. Timezone Changes
- User travels to different timezone
- Solution: Use device timezone or explicit user preference
3. Quiet Hours Across DST
- User sets quiet hours 10 PM - 8 AM
- DST changes, quiet hours might be wrong for a day
- Solution: Re-evaluate on each notification
4. Scheduled Notifications
- User schedules for "9 AM tomorrow"
- Tomorrow has DST change
- Solution: Store timezone with scheduled time
"""
import pytz
from datetime import datetime
class TimezoneAwareScheduler:
"""Handles timezone edge cases for scheduling."""
def get_next_occurrence(
self,
user_timezone: str,
target_time: time,
after: datetime = None
) -> datetime:
"""
Get next occurrence of a time in user's timezone.
Handles DST correctly.
"""
tz = pytz.timezone(user_timezone)
after = after or datetime.utcnow()
# Convert 'after' to user's timezone
after_local = after.astimezone(tz)
# Create candidate datetime in user's timezone
candidate = tz.localize(
datetime.combine(after_local.date(), target_time)
)
# If candidate is in past, move to next day
if candidate <= after_local:
candidate = tz.localize(
datetime.combine(
after_local.date() + timedelta(days=1),
target_time
)
)
# Handle DST: pytz.localize handles ambiguous times
# For non-existent times (spring forward), it raises exception
# We catch and adjust
try:
# Verify the time exists (isn't skipped by DST)
tz.localize(candidate.replace(tzinfo=None), is_dst=None)
except pytz.exceptions.NonExistentTimeError:
# Time doesn't exist (DST spring forward)
# Move forward by the DST offset (typically 1 hour)
candidate = candidate + timedelta(hours=1)
except pytz.exceptions.AmbiguousTimeError:
# Time exists twice (DST fall back)
# Use the first occurrence (DST=True means standard time)
candidate = tz.localize(
candidate.replace(tzinfo=None),
is_dst=True
)
# Convert to UTC for storage
return candidate.astimezone(pytz.UTC).replace(tzinfo=None)
def is_quiet_hours(
self,
user_timezone: str,
quiet_start: time,
quiet_end: time,
check_time: datetime = None
) -> bool:
"""Check if current time is in quiet hours, handling DST."""
tz = pytz.timezone(user_timezone)
check_time = check_time or datetime.utcnow()
# Get current time in user's timezone
local_time = check_time.astimezone(tz).time()
# Handle overnight quiet hours (e.g., 10 PM - 8 AM)
if quiet_start > quiet_end:
return local_time >= quiet_start or local_time <= quiet_end
else:
return quiet_start <= local_time <= quiet_end
6.4 Race Conditions
# edge_cases/race_conditions.py
"""
Race Condition Edge Cases
1. Duplicate Sends
- Same notification queued twice
- Solution: Idempotency keys, deduplication window
2. Preference Update During Send
- User opts out while notification is in queue
- Solution: Check preferences at delivery time, not queue time
3. User Deletion During Send
- User deletes account while notification queued
- Solution: Check user exists before delivery
4. Concurrent Preference Updates
- Two devices update preferences simultaneously
- Solution: Optimistic locking with version
"""
class RaceConditionGuards:
"""Guards against common race conditions."""
async def safe_deliver(
self,
notification: dict,
preferences_service,
user_service
) -> DeliveryResult:
"""Deliver with race condition checks."""
user_id = notification["user_id"]
# Check 1: User still exists
user = await user_service.get(user_id)
if not user or user.status == "deleted":
return DeliveryResult(
status=DeliveryStatus.FAILED,
error_code="USER_DELETED",
error_message="User account has been deleted"
)
# Check 2: Re-fetch preferences (might have changed since queue time)
prefs = await preferences_service.get_preferences(user_id)
category = notification.get("category", notification["type"])
channel = notification["channel"]
if not preferences_service.is_notification_allowed(prefs, category, channel):
return DeliveryResult(
status=DeliveryStatus.SKIPPED,
error_code="USER_OPTED_OUT",
error_message="User has opted out of this notification type"
)
# Check 3: Deduplication
idempotency_key = notification.get("idempotency_key")
if idempotency_key:
if await self._is_duplicate(idempotency_key):
return DeliveryResult(
status=DeliveryStatus.SKIPPED,
error_code="DUPLICATE",
error_message="Notification already sent"
)
# All checks passed, proceed with delivery
return await self._do_deliver(notification)
async def _is_duplicate(self, idempotency_key: str) -> bool:
"""Check if this is a duplicate send."""
# Use Redis SET NX with TTL for deduplication window
key = f"dedup:{idempotency_key}"
result = await self.redis.set(
key,
"1",
nx=True, # Only set if not exists
ex=3600 # 1 hour deduplication window
)
return result is None # None means key already existed
Part IV: Failure Scenarios
Chapter 7: What Happens When X Fails?
7.1 Failure Matrix
FAILURE SCENARIO MATRIX
Component │ Impact │ Recovery
───────────────────┼──────────────────────────────────┼───────────────────────
API Server │ Requests rejected │ Auto-scale, LB removes
PostgreSQL Primary │ Writes fail │ Promote replica
PostgreSQL Replica │ Read latency increases │ Use other replicas
Redis │ Cache miss, rate limit issues │ Fall back to DB
Kafka Broker │ Some partitions unavailable │ Rebalance consumers
Kafka (all) │ Cannot queue notifications │ Buffer in memory/disk
FCM │ Push delivery fails │ Circuit break, fallback
SendGrid │ Email delivery fails │ Fallback to SES
Twilio │ SMS delivery fails │ Fallback to SNS
Worker │ Partition processing stops │ Kafka rebalances
Network (internal) │ Service communication fails │ Timeouts, retries
Network (external) │ Provider unreachable │ Circuit break
7.2 Graceful Degradation
# services/degradation.py
from enum import Enum
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class SystemMode(Enum):
NORMAL = "normal"
DEGRADED = "degraded" # Non-critical features disabled
CRITICAL_ONLY = "critical" # Only critical notifications
class GracefulDegradationService:
"""
Manages graceful degradation during failures.
When system is overloaded or failing:
1. Shed low-priority load
2. Disable non-essential features
3. Preserve critical path
"""
def __init__(self, redis_client, config):
self.redis = redis_client
self.config = config
async def get_system_mode(self) -> SystemMode:
"""Get current system operating mode."""
mode = await self.redis.get("system:mode")
return SystemMode(mode.decode()) if mode else SystemMode.NORMAL
async def set_system_mode(self, mode: SystemMode, reason: str):
"""Set system operating mode."""
await self.redis.set("system:mode", mode.value)
logger.warning(f"System mode changed to {mode.value}: {reason}")
async def should_process(
self,
notification: dict,
channel: str
) -> tuple[bool, Optional[str]]:
"""
Determine if notification should be processed given current mode.
Returns (should_process, skip_reason).
"""
mode = await self.get_system_mode()
priority = notification.get("priority", "medium")
notification_type = notification.get("type")
if mode == SystemMode.NORMAL:
return (True, None)
if mode == SystemMode.CRITICAL_ONLY:
# Only process critical notifications
if priority == "critical" or notification_type == "security":
return (True, None)
return (False, "System in critical-only mode")
if mode == SystemMode.DEGRADED:
# Skip low priority and some features
if priority == "low":
return (False, "Low priority skipped in degraded mode")
# Disable non-essential channels
if channel == "email" and notification_type == "marketing":
return (False, "Marketing emails disabled in degraded mode")
return (True, None)
return (True, None)
async def check_and_degrade(self, metrics: dict):
"""
Check metrics and degrade if necessary.
Called periodically by health checker.
"""
current_mode = await self.get_system_mode()
# Check queue depth
queue_depth = metrics.get("kafka_lag", 0)
max_acceptable = self.config.get("max_queue_depth", 1000000)
if queue_depth > max_acceptable * 2:
if current_mode != SystemMode.CRITICAL_ONLY:
await self.set_system_mode(
SystemMode.CRITICAL_ONLY,
f"Queue depth critical: {queue_depth}"
)
return
if queue_depth > max_acceptable:
if current_mode == SystemMode.NORMAL:
await self.set_system_mode(
SystemMode.DEGRADED,
f"Queue depth high: {queue_depth}"
)
return
# Check error rate
error_rate = metrics.get("error_rate", 0)
if error_rate > 0.2: # >20% errors
if current_mode == SystemMode.NORMAL:
await self.set_system_mode(
SystemMode.DEGRADED,
f"Error rate high: {error_rate:.1%}"
)
return
# Recover if metrics are good
if current_mode != SystemMode.NORMAL:
if queue_depth < max_acceptable * 0.5 and error_rate < 0.05:
await self.set_system_mode(
SystemMode.NORMAL,
"Metrics recovered"
)
Part V: Cost Optimization
Chapter 8: Reducing Notification Costs
SMS costs $3M+/month. Let's fix that.
8.1 Channel Cost Model
# services/cost_optimization.py
from dataclasses import dataclass
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class ChannelCost:
"""Cost per notification for each channel."""
channel: str
cost_per_unit: float # USD
unit_name: str
monthly_budget: float
CHANNEL_COSTS = {
"push": ChannelCost("push", 0.0, "notification", float("inf")),
"in_app": ChannelCost("in_app", 0.0, "notification", float("inf")),
"email": ChannelCost("email", 0.0001, "email", 15000), # $0.10/1000
"sms": ChannelCost("sms", 0.01, "message", 100000), # $0.01/msg, $100K budget
}
class CostAwareChannelSelector:
"""
Selects channels with cost awareness.
Strategies:
1. Prefer free channels (push, in-app)
2. Fall back to email before SMS
3. SMS only for critical or explicit user preference
4. Track and cap spend per channel
"""
def __init__(self, redis_client, channel_costs: dict = None):
self.redis = redis_client
self.costs = channel_costs or CHANNEL_COSTS
async def select_channel(
self,
available_channels: list[str],
notification: dict,
user_preferences: dict
) -> Optional[str]:
"""
Select the most cost-effective channel.
Prioritizes:
1. User's preferred channel (if set)
2. Cheapest available channel
3. Budget constraints
"""
priority = notification.get("priority", "medium")
notification_type = notification.get("type")
# Critical notifications: user preference or all channels
if priority == "critical":
return self._select_for_critical(available_channels, user_preferences)
# Sort by cost
sorted_channels = sorted(
available_channels,
key=lambda c: self.costs.get(c, ChannelCost(c, 1.0, "", 0)).cost_per_unit
)
for channel in sorted_channels:
# Check budget
if not await self._has_budget(channel):
continue
# Check if appropriate for notification type
if self._is_appropriate(channel, notification_type):
return channel
# Fallback to any available
return sorted_channels[0] if sorted_channels else None
def _select_for_critical(
self,
available_channels: list[str],
preferences: dict
) -> str:
"""Select channel for critical notifications."""
preferred = preferences.get("critical_channel")
if preferred and preferred in available_channels:
return preferred
# Default priority for critical: push, then SMS
for channel in ["push", "sms", "email"]:
if channel in available_channels:
return channel
return available_channels[0] if available_channels else None
def _is_appropriate(self, channel: str, notification_type: str) -> bool:
"""Check if channel is appropriate for notification type."""
APPROPRIATE_CHANNELS = {
"security": ["push", "sms", "email"],
"transaction": ["push", "in_app", "email"],
"marketing": ["email", "push"],
"reminder": ["push", "email"],
"social": ["push", "in_app"],
}
allowed = APPROPRIATE_CHANNELS.get(notification_type, ["push", "email"])
return channel in allowed
async def _has_budget(self, channel: str) -> bool:
"""Check if channel has remaining budget."""
cost = self.costs.get(channel)
if not cost or cost.monthly_budget == float("inf"):
return True
# Get current month's spend
month_key = datetime.utcnow().strftime("%Y-%m")
spend_key = f"channel_spend:{channel}:{month_key}"
current_spend = await self.redis.get(spend_key)
current_spend = float(current_spend or 0)
return current_spend < cost.monthly_budget
async def record_send(self, channel: str, count: int = 1):
"""Record a send for budget tracking."""
cost = self.costs.get(channel)
if not cost:
return
month_key = datetime.utcnow().strftime("%Y-%m")
spend_key = f"channel_spend:{channel}:{month_key}"
spend = cost.cost_per_unit * count
await self.redis.incrbyfloat(spend_key, spend)
await self.redis.expire(spend_key, 86400 * 35) # Keep 35 days
8.2 SMS Optimization Strategies
# services/sms_optimization.py
"""
SMS Cost Optimization Strategies
SMS is 100x more expensive than email. Use sparingly.
Strategies:
1. Only use for critical notifications
2. Push first, SMS as fallback only if push fails
3. Combine multiple messages into one
4. Use short codes for high volume (better rates)
5. Regional pricing (some countries are cheaper)
"""
class SMSOptimizer:
"""Optimizes SMS usage to reduce costs."""
# Countries with high SMS costs (>$0.05/msg)
EXPENSIVE_COUNTRIES = ["US", "CA", "AU", "GB", "DE", "FR"]
# Countries with low SMS costs (<$0.01/msg)
CHEAP_COUNTRIES = ["IN", "PH", "ID", "VN", "NG"]
def should_use_sms(
self,
notification: dict,
user: dict,
push_available: bool
) -> tuple[bool, str]:
"""
Determine if SMS should be used.
Returns (should_use, reason).
"""
notification_type = notification.get("type")
priority = notification.get("priority")
country = user.get("country_code")
# Always use SMS for security alerts
if notification_type == "security" and priority == "critical":
return (True, "Critical security notification")
# If push is available and working, don't use SMS
if push_available and notification_type not in ["security"]:
return (False, "Push notification available")
# Check user's explicit preference
if user.get("sms_preference") == "never":
return (False, "User opted out of SMS")
# Cost-based decision for expensive countries
if country in self.EXPENSIVE_COUNTRIES:
if notification_type in ["marketing", "social"]:
return (False, f"SMS too expensive for {notification_type} in {country}")
# Fallback: use SMS if it's the only option for important notifications
if priority in ["critical", "high"]:
return (True, "Fallback for high priority notification")
return (False, "SMS not justified for this notification")
def optimize_message(self, message: str) -> str:
"""
Optimize SMS message to fit in single segment.
Standard SMS: 160 chars (GSM-7) or 70 chars (Unicode)
Multi-segment costs more!
"""
# Check if Unicode needed
if self._needs_unicode(message):
max_length = 70
else:
max_length = 160
if len(message) <= max_length:
return message
# Truncate with ellipsis
return message[:max_length - 3] + "..."
def _needs_unicode(self, text: str) -> bool:
"""Check if text requires Unicode encoding."""
try:
text.encode('gsm03.38')
return False
except (UnicodeEncodeError, LookupError):
return True
Summary
What We Covered Today
DAY 4 SUMMARY: SCALE, RELIABILITY & EDGE CASES
SCALING CHALLENGES
├── Campaign mode (10M in 1 hour)
│ ├── Rate-limited ingestion
│ ├── Batched processing
│ └── Priority isolation
├── Hot user problem (fan-out)
│ ├── Async fan-out
│ ├── Batched follower queries
│ └── Partition-aware routing
└── Provider rate limits
├── Token bucket limiter
├── Backoff on rate limit errors
└── Per-provider configuration
RELIABILITY PATTERNS
├── Circuit breakers
│ ├── CLOSED → OPEN → HALF_OPEN cycle
│ ├── Failure threshold triggers
│ └── Automatic recovery testing
├── Provider failover
│ ├── Primary/fallback configuration
│ ├── Health-based routing
│ └── Automatic fallover
└── Graceful degradation
├── System modes (normal/degraded/critical)
├── Load shedding
└── Auto-recovery
EDGE CASES
├── Device tokens
│ ├── Token rotation
│ ├── Multiple devices
│ ├── Stale token cleanup
│ └── Token collision
├── Email bounces
│ ├── Hard vs soft bounces
│ ├── Spam complaints
│ └── Unsubscribe handling
├── Timezones
│ ├── DST transitions
│ ├── Quiet hours across DST
│ └── Scheduled notification DST
└── Race conditions
├── Duplicate sends
├── Preference changes during delivery
├── User deletion during delivery
└── Concurrent updates
COST OPTIMIZATION
├── Channel cost model
├── Cost-aware channel selection
├── SMS optimization strategies
└── Budget tracking and caps
Interview Tip of the Day
INTERVIEW TIP: SHOW YOU'VE OPERATED REAL SYSTEMS
When asked "what can go wrong?", show operational experience:
"At scale, several things break:
1. Campaign spikes - Marketing sends 10M at once. We rate-limit
ingestion and use priority queues so campaigns don't affect
transaction notifications.
2. Provider failures - We have circuit breakers per provider.
If SendGrid fails, we automatically failover to SES. The
circuit opens after 5 failures, tests recovery every 30 seconds.
3. Hot users - Celebrity notifications fan out to millions.
We process these asynchronously with rate limiting to avoid
overwhelming downstream systems.
4. The worst edge case I've seen: a user changes their email
while we're mid-send. The notification was queued for old
email. Now we re-check preferences at delivery time, not
just queue time."
This shows you've dealt with real production issues.
End of Week 6, Day 4
Tomorrow: Day 5 — Operations, Monitoring & Interview Mastery