Himanshu Kukreja
0%
LearnSystem DesignWeek 6Scale Reliability Edge Cases
Day 04

Week 6 — Day 4: Scale, Reliability & Edge Cases

System Design Mastery Series — Practical Application Week


Introduction

We've built a solid notification system. Now let's break it.

Today we explore everything that goes wrong at scale — and how to handle it. This is the knowledge that separates engineers who've built toy systems from those who've operated production platforms.

Today's Theme: "What breaks at scale? Everything."


Part I: Scaling Challenges

Chapter 1: Campaign Mode — 10M Notifications in 1 Hour

Marketing wants to send 10 million notifications in an hour. Your system handles 5,000/sec normally. This is 2,800/sec sustained for an hour. What breaks?

1.1 The Problem

CAMPAIGN MODE CHALLENGES

Normal load:    5,000 notifications/sec
Campaign load:  2,800/sec additional (10M in 1 hour)
Combined:       7,800/sec (56% increase)

But campaigns don't spread evenly:
- Marketing clicks "Send" at 10:00:00 AM
- All 10M notifications hit ingestion simultaneously
- Initial burst: 100,000+/sec for first few seconds

What breaks:
1. API servers get overwhelmed
2. Kafka partitions become unbalanced
3. Database connection pools exhaust
4. Provider rate limits hit immediately
5. Workers can't keep up → queue depth grows
6. Memory pressure from buffered messages

1.2 Solution: Campaign Ingestion Service

# services/campaign_ingestion.py

import asyncio
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, AsyncIterator
import logging

logger = logging.getLogger(__name__)


@dataclass
class Campaign:
    campaign_id: str
    name: str
    template: str
    variables: dict
    
    # Target audience
    user_ids: list[str] = None
    segment_query: str = None  # For dynamic segments
    
    # Throttling
    target_send_rate: int = 1000  # per second
    max_duration_hours: int = 4
    
    # Scheduling
    scheduled_start: Optional[datetime] = None
    
    # Progress
    total_recipients: int = 0
    sent_count: int = 0
    failed_count: int = 0
    status: str = "pending"


class CampaignIngestionService:
    """
    Handles high-volume campaign sends with controlled ingestion.
    
    Key strategies:
    1. Rate-limited ingestion (don't flood the system)
    2. Batch database reads (don't query 10M users one by one)
    3. Chunked processing (resume on failure)
    4. Progress tracking (know where you are)
    """
    
    def __init__(
        self,
        user_repo,
        notification_service,
        campaign_repo,
        default_rate: int = 1000
    ):
        self.users = user_repo
        self.notifications = notification_service
        self.campaigns = campaign_repo
        self.default_rate = default_rate
    
    async def start_campaign(self, campaign: Campaign):
        """Start processing a campaign with controlled rate."""
        
        campaign.status = "running"
        await self.campaigns.update(campaign)
        
        logger.info(
            f"Starting campaign {campaign.campaign_id} "
            f"at {campaign.target_send_rate}/sec"
        )
        
        try:
            # Process in controlled batches
            async for batch in self._get_recipient_batches(campaign):
                await self._process_batch(campaign, batch)
                
                # Rate limiting between batches
                await self._throttle(campaign, len(batch))
                
                # Check for pause/cancel
                if await self._should_stop(campaign.campaign_id):
                    campaign.status = "paused"
                    break
            
            if campaign.status == "running":
                campaign.status = "completed"
                
        except Exception as e:
            logger.error(f"Campaign {campaign.campaign_id} failed: {e}")
            campaign.status = "failed"
        
        await self.campaigns.update(campaign)
    
    async def _get_recipient_batches(
        self,
        campaign: Campaign,
        batch_size: int = 1000
    ) -> AsyncIterator[list[str]]:
        """
        Stream recipient user IDs in batches.
        
        Uses cursor-based pagination to handle millions of users
        without loading all into memory.
        """
        
        if campaign.user_ids:
            # Static list of user IDs
            for i in range(0, len(campaign.user_ids), batch_size):
                yield campaign.user_ids[i:i + batch_size]
        
        elif campaign.segment_query:
            # Dynamic segment - paginate through database
            cursor = None
            
            while True:
                users, cursor = await self.users.query_segment(
                    campaign.segment_query,
                    limit=batch_size,
                    cursor=cursor
                )
                
                if not users:
                    break
                
                yield [u.user_id for u in users]
                
                if not cursor:
                    break
    
    async def _process_batch(self, campaign: Campaign, user_ids: list[str]):
        """Process a batch of recipients."""
        
        tasks = []
        
        for user_id in user_ids:
            task = self.notifications.send({
                "user_id": user_id,
                "type": "marketing",
                "template": campaign.template,
                "variables": campaign.variables,
                "metadata": {
                    "campaign_id": campaign.campaign_id,
                }
            })
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Update counts
        for result in results:
            if isinstance(result, Exception):
                campaign.failed_count += 1
            else:
                campaign.sent_count += 1
        
        # Persist progress periodically
        if campaign.sent_count % 10000 == 0:
            await self.campaigns.update(campaign)
            logger.info(
                f"Campaign {campaign.campaign_id} progress: "
                f"{campaign.sent_count}/{campaign.total_recipients}"
            )
    
    async def _throttle(self, campaign: Campaign, batch_size: int):
        """Rate limit to target send rate."""
        
        # Calculate delay to achieve target rate
        target_rate = campaign.target_send_rate
        delay = batch_size / target_rate
        
        await asyncio.sleep(delay)
    
    async def _should_stop(self, campaign_id: str) -> bool:
        """Check if campaign should stop (paused or cancelled)."""
        campaign = await self.campaigns.get(campaign_id)
        return campaign.status in ["pausing", "cancelled"]


class CampaignRateLimiter:
    """
    Global rate limiter for campaigns.
    
    Ensures total campaign throughput doesn't overwhelm the system,
    even with multiple concurrent campaigns.
    """
    
    def __init__(
        self,
        redis_client,
        global_limit: int = 5000,  # Total campaign sends/sec
        window_seconds: int = 1
    ):
        self.redis = redis_client
        self.global_limit = global_limit
        self.window = window_seconds
    
    async def acquire(self, count: int = 1) -> bool:
        """
        Try to acquire capacity for sending.
        
        Returns True if allowed, False if rate limited.
        """
        
        key = "campaign:rate_limit"
        now = datetime.utcnow().timestamp()
        window_start = now - self.window
        
        pipe = self.redis.pipeline()
        
        # Remove old entries
        pipe.zremrangebyscore(key, 0, window_start)
        
        # Count current window
        pipe.zcard(key)
        
        results = await pipe.execute()
        current_count = results[1]
        
        if current_count + count > self.global_limit:
            return False
        
        # Add new entries
        await self.redis.zadd(key, {f"{now}:{count}": now})
        await self.redis.expire(key, self.window + 1)
        
        return True
    
    async def wait_for_capacity(self, count: int = 1):
        """Wait until capacity is available."""
        
        while not await self.acquire(count):
            await asyncio.sleep(0.1)

1.3 Priority Isolation

# services/priority_isolation.py

"""
Priority isolation ensures campaigns don't affect critical notifications.

Strategy:
1. Separate Kafka topics per priority
2. Separate worker pools per priority
3. Critical notifications bypass campaign rate limits
4. Dedicated resources for transaction/security notifications
"""

class PriorityIsolationConfig:
    """Configuration for priority-based resource isolation."""
    
    KAFKA_TOPICS = {
        "critical": {
            "topic": "notifications.critical",
            "partitions": 32,
            "replication": 3,
            "retention_hours": 168,
        },
        "high": {
            "topic": "notifications.high",
            "partitions": 64,
            "replication": 3,
            "retention_hours": 72,
        },
        "medium": {
            "topic": "notifications.medium",
            "partitions": 32,
            "replication": 2,
            "retention_hours": 24,
        },
        "low": {
            "topic": "notifications.low",
            "partitions": 16,
            "replication": 2,
            "retention_hours": 24,
        },
    }
    
    WORKER_POOLS = {
        "critical": {
            "min_workers": 10,
            "max_workers": 50,
            "dedicated": True,  # Never shared with other priorities
        },
        "high": {
            "min_workers": 20,
            "max_workers": 100,
            "dedicated": True,
        },
        "low": {
            "min_workers": 5,
            "max_workers": 200,  # Can scale for campaigns
            "dedicated": False,
        },
    }
    
    RATE_LIMITS = {
        "critical": None,  # No rate limit
        "high": 10000,     # 10K/sec
        "medium": 5000,    # 5K/sec
        "low": 2000,       # 2K/sec (campaigns)
    }

Chapter 2: The Hot User Problem

A celebrity with 1 million followers posts something. You need to notify all followers. This is the "fan-out" problem.

2.1 The Problem

HOT USER SCENARIO

Celebrity posts at 10:00:00 AM
├── 1,000,000 followers need notification
├── All notifications created simultaneously
├── All partition to same user's data
└── All hit same database rows for preferences

What breaks:
1. Single partition becomes hot (all fan-out to one key)
2. Database row locks (everyone reading same user)
3. Provider rate limits (1M push notifications at once)
4. Memory exhaustion (buffering 1M notifications)

Traditional approach fails:
- "Notify all followers of user X"
- Generates 1M notification requests instantly
- System drowns

2.2 Solution: Async Fan-Out with Batching

# services/fanout.py

import asyncio
from datetime import datetime
from typing import AsyncIterator
import logging

logger = logging.getLogger(__name__)


class FanoutService:
    """
    Handles fan-out for notifications to many recipients.
    
    Strategies:
    1. Async processing (don't block the triggering event)
    2. Batched retrieval (paginate through followers)
    3. Rate-limited sending (don't overwhelm downstream)
    4. Progress tracking (resume on failure)
    """
    
    def __init__(
        self,
        follower_repo,
        notification_service,
        fanout_repo,
        batch_size: int = 1000,
        rate_limit: int = 5000  # per second
    ):
        self.followers = follower_repo
        self.notifications = notification_service
        self.fanouts = fanout_repo
        self.batch_size = batch_size
        self.rate_limit = rate_limit
    
    async def trigger_fanout(
        self,
        source_user_id: str,
        notification_type: str,
        template: str,
        variables: dict
    ) -> str:
        """
        Trigger fan-out to all followers.
        
        Returns fanout_id for tracking.
        Actual sending happens asynchronously.
        """
        
        # Get follower count
        follower_count = await self.followers.get_count(source_user_id)
        
        # Create fanout job
        fanout = {
            "fanout_id": str(uuid.uuid4()),
            "source_user_id": source_user_id,
            "notification_type": notification_type,
            "template": template,
            "variables": variables,
            "total_recipients": follower_count,
            "processed_count": 0,
            "status": "pending",
            "created_at": datetime.utcnow(),
        }
        
        await self.fanouts.save(fanout)
        
        # Queue for async processing
        await self._queue_fanout(fanout["fanout_id"])
        
        logger.info(
            f"Triggered fanout {fanout['fanout_id']} "
            f"to {follower_count} recipients"
        )
        
        return fanout["fanout_id"]
    
    async def process_fanout(self, fanout_id: str):
        """Process a fanout job (called by worker)."""
        
        fanout = await self.fanouts.get(fanout_id)
        if not fanout:
            return
        
        fanout["status"] = "processing"
        await self.fanouts.update(fanout)
        
        try:
            # Process in batches with rate limiting
            tokens_per_second = self.rate_limit
            token_bucket = tokens_per_second
            last_refill = datetime.utcnow()
            
            async for batch in self._get_follower_batches(fanout["source_user_id"]):
                # Rate limiting with token bucket
                now = datetime.utcnow()
                elapsed = (now - last_refill).total_seconds()
                token_bucket = min(tokens_per_second, token_bucket + elapsed * tokens_per_second)
                last_refill = now
                
                if token_bucket < len(batch):
                    wait_time = (len(batch) - token_bucket) / tokens_per_second
                    await asyncio.sleep(wait_time)
                    token_bucket = 0
                else:
                    token_bucket -= len(batch)
                
                # Send batch
                await self._send_batch(fanout, batch)
                
                fanout["processed_count"] += len(batch)
                
                # Checkpoint progress
                if fanout["processed_count"] % 10000 == 0:
                    await self.fanouts.update(fanout)
            
            fanout["status"] = "completed"
            
        except Exception as e:
            logger.error(f"Fanout {fanout_id} failed: {e}")
            fanout["status"] = "failed"
            fanout["error"] = str(e)
        
        fanout["completed_at"] = datetime.utcnow()
        await self.fanouts.update(fanout)
    
    async def _get_follower_batches(self, user_id: str) -> AsyncIterator[list[str]]:
        """Stream follower IDs in batches."""
        
        cursor = None
        
        while True:
            followers, cursor = await self.followers.get_followers(
                user_id,
                limit=self.batch_size,
                cursor=cursor
            )
            
            if not followers:
                break
            
            yield [f.follower_id for f in followers]
            
            if not cursor:
                break
    
    async def _send_batch(self, fanout: dict, follower_ids: list[str]):
        """Send notifications to a batch of followers."""
        
        tasks = [
            self.notifications.send({
                "user_id": follower_id,
                "type": fanout["notification_type"],
                "template": fanout["template"],
                "variables": fanout["variables"],
                "metadata": {
                    "fanout_id": fanout["fanout_id"],
                    "source_user_id": fanout["source_user_id"],
                }
            })
            for follower_id in follower_ids
        ]
        
        await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _queue_fanout(self, fanout_id: str):
        """Queue fanout for async processing."""
        # Publish to Kafka fanout topic
        pass

2.3 Database Hot Spot Mitigation

# services/hot_spot_mitigation.py

"""
Strategies to avoid database hot spots during fan-out.
"""

class CachedPreferencesService:
    """
    Cache user preferences to avoid DB hot spots.
    
    During fan-out, we'd hit the source user's preferences
    millions of times. Cache aggressively.
    """
    
    def __init__(self, preference_repo, cache, default_ttl: int = 300):
        self.preferences = preference_repo
        self.cache = cache
        self.default_ttl = default_ttl
    
    async def get_preferences_batch(
        self,
        user_ids: list[str]
    ) -> dict[str, UserPreferences]:
        """
        Get preferences for multiple users efficiently.
        
        Uses batch cache lookup and single DB query for misses.
        """
        
        # Try cache first
        cache_keys = [f"prefs:{uid}" for uid in user_ids]
        cached = await self.cache.mget(cache_keys)
        
        result = {}
        missing_ids = []
        
        for i, user_id in enumerate(user_ids):
            if cached[i]:
                result[user_id] = UserPreferences.from_dict(cached[i])
            else:
                missing_ids.append(user_id)
        
        # Batch fetch from DB
        if missing_ids:
            db_results = await self.preferences.get_batch(missing_ids)
            
            # Cache and add to result
            to_cache = {}
            for user_id, prefs in db_results.items():
                result[user_id] = prefs
                to_cache[f"prefs:{user_id}"] = prefs.to_dict()
            
            if to_cache:
                await self.cache.mset(to_cache, ttl=self.default_ttl)
        
        return result


class PartitionAwareNotificationService:
    """
    Avoid Kafka partition hot spots during fan-out.
    
    Problem: If we partition by source_user_id, all fan-out
    notifications go to same partition.
    
    Solution: Partition by recipient_user_id instead.
    """
    
    async def send(self, notification: dict):
        # Use recipient as partition key, not source
        partition_key = notification["user_id"]  # recipient
        
        # NOT notification["metadata"]["source_user_id"]
        
        await self.kafka.send(
            topic=self._get_topic(notification),
            key=partition_key.encode(),
            value=notification
        )

Chapter 3: Provider Rate Limits

Every provider has rate limits. You need to respect them while maximizing throughput.

3.1 Provider Rate Limit Configuration

# config/provider_limits.py

PROVIDER_RATE_LIMITS = {
    "fcm": {
        "requests_per_second": 1000,  # Per project
        "batch_size": 500,            # Messages per batch request
        "concurrent_connections": 100,
    },
    "apns": {
        "requests_per_second": 4000,   # Apple is generous
        "concurrent_connections": 500,
        "notification_per_connection": 1,
    },
    "sendgrid": {
        "requests_per_second": 100,    # Depends on plan
        "emails_per_request": 1000,    # Batch API
        "daily_limit": 100000,         # Plan dependent
    },
    "twilio": {
        "messages_per_second": 100,    # Per phone number
        "concurrent_connections": 50,
        # Different limits per number type:
        "toll_free_per_second": 30,
        "short_code_per_second": 100,
        "10dlc_per_second": 15,        # Varies by trust score
    },
}

3.2 Provider Rate Limiter

# services/provider_rate_limiter.py

import asyncio
from datetime import datetime
from typing import Optional
import logging

logger = logging.getLogger(__name__)


class ProviderRateLimiter:
    """
    Rate limiter for external notification providers.
    
    Features:
    - Per-provider rate limiting
    - Token bucket algorithm
    - Backpressure signaling
    - Automatic recovery
    """
    
    def __init__(self, redis_client, config: dict):
        self.redis = redis_client
        self.config = config
        self.local_buckets = {}  # For fast path
    
    async def acquire(
        self,
        provider: str,
        count: int = 1,
        timeout: float = 5.0
    ) -> bool:
        """
        Acquire rate limit tokens for a provider.
        
        Returns True if acquired, False if timed out.
        """
        
        limit = self.config.get(provider, {}).get("requests_per_second", 100)
        
        start = datetime.utcnow()
        
        while True:
            if await self._try_acquire(provider, count, limit):
                return True
            
            # Check timeout
            elapsed = (datetime.utcnow() - start).total_seconds()
            if elapsed >= timeout:
                return False
            
            # Wait before retry
            await asyncio.sleep(0.05)
    
    async def _try_acquire(
        self,
        provider: str,
        count: int,
        limit: int
    ) -> bool:
        """Try to acquire tokens using Redis."""
        
        key = f"rate_limit:provider:{provider}"
        now = datetime.utcnow().timestamp()
        window_start = now - 1.0  # 1 second window
        
        # Lua script for atomic rate limiting
        script = """
        local key = KEYS[1]
        local now = tonumber(ARGV[1])
        local window_start = tonumber(ARGV[2])
        local count = tonumber(ARGV[3])
        local limit = tonumber(ARGV[4])
        
        -- Remove old entries
        redis.call('ZREMRANGEBYSCORE', key, 0, window_start)
        
        -- Get current count
        local current = redis.call('ZCARD', key)
        
        -- Check if we can acquire
        if current + count > limit then
            return 0
        end
        
        -- Add new entries
        for i = 1, count do
            redis.call('ZADD', key, now, now .. ':' .. i .. ':' .. math.random())
        end
        
        redis.call('EXPIRE', key, 2)
        
        return 1
        """
        
        result = await self.redis.eval(
            script,
            keys=[key],
            args=[now, window_start, count, limit]
        )
        
        return result == 1
    
    async def get_available_capacity(self, provider: str) -> int:
        """Get remaining capacity for a provider."""
        
        limit = self.config.get(provider, {}).get("requests_per_second", 100)
        key = f"rate_limit:provider:{provider}"
        
        now = datetime.utcnow().timestamp()
        window_start = now - 1.0
        
        # Count current usage
        current = await self.redis.zcount(key, window_start, now)
        
        return max(0, limit - current)
    
    async def report_rate_limit_error(self, provider: str, retry_after: int = 60):
        """
        Report that provider returned rate limit error.
        
        Temporarily reduces our rate to avoid further errors.
        """
        
        key = f"rate_limit:backoff:{provider}"
        
        # Set backoff flag
        await self.redis.setex(key, retry_after, "1")
        
        logger.warning(
            f"Provider {provider} rate limited, backing off for {retry_after}s"
        )
    
    async def is_backed_off(self, provider: str) -> bool:
        """Check if we're in backoff for a provider."""
        key = f"rate_limit:backoff:{provider}"
        return await self.redis.exists(key)

Part II: Reliability Patterns

Chapter 4: Circuit Breakers

When a provider is failing, stop sending to it temporarily.

4.1 Circuit Breaker Implementation

# services/circuit_breaker.py

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, Callable
import asyncio
import logging

logger = logging.getLogger(__name__)


class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered


@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5      # Failures before opening
    success_threshold: int = 3      # Successes to close from half-open
    timeout_seconds: int = 30       # Time before half-open
    half_open_max_calls: int = 3    # Calls allowed in half-open


@dataclass
class CircuitStats:
    state: CircuitState = CircuitState.CLOSED
    failure_count: int = 0
    success_count: int = 0
    last_failure_time: Optional[datetime] = None
    last_state_change: datetime = None
    half_open_calls: int = 0


class CircuitBreaker:
    """
    Circuit breaker for provider calls.
    
    States:
    - CLOSED: Normal operation, track failures
    - OPEN: Rejecting calls, wait for timeout
    - HALF_OPEN: Testing with limited calls
    
    Week 2 Concept: Circuit Breakers
    """
    
    def __init__(
        self,
        name: str,
        config: CircuitBreakerConfig = None,
        on_state_change: Callable = None
    ):
        self.name = name
        self.config = config or CircuitBreakerConfig()
        self.stats = CircuitStats(last_state_change=datetime.utcnow())
        self.on_state_change = on_state_change
        self._lock = asyncio.Lock()
    
    async def call(self, func: Callable, *args, **kwargs):
        """
        Execute function with circuit breaker protection.
        
        Raises CircuitOpenError if circuit is open.
        """
        
        # Check if call is allowed
        if not await self._can_call():
            raise CircuitOpenError(
                f"Circuit {self.name} is open, request rejected"
            )
        
        try:
            result = await func(*args, **kwargs)
            await self._record_success()
            return result
            
        except Exception as e:
            await self._record_failure()
            raise
    
    async def _can_call(self) -> bool:
        """Check if a call is allowed."""
        
        async with self._lock:
            if self.stats.state == CircuitState.CLOSED:
                return True
            
            if self.stats.state == CircuitState.OPEN:
                # Check if timeout elapsed
                if self._timeout_elapsed():
                    self._transition_to(CircuitState.HALF_OPEN)
                    return True
                return False
            
            if self.stats.state == CircuitState.HALF_OPEN:
                # Allow limited calls
                if self.stats.half_open_calls < self.config.half_open_max_calls:
                    self.stats.half_open_calls += 1
                    return True
                return False
        
        return False
    
    async def _record_success(self):
        """Record a successful call."""
        
        async with self._lock:
            if self.stats.state == CircuitState.HALF_OPEN:
                self.stats.success_count += 1
                
                if self.stats.success_count >= self.config.success_threshold:
                    self._transition_to(CircuitState.CLOSED)
            
            elif self.stats.state == CircuitState.CLOSED:
                # Reset failure count on success
                self.stats.failure_count = 0
    
    async def _record_failure(self):
        """Record a failed call."""
        
        async with self._lock:
            self.stats.failure_count += 1
            self.stats.last_failure_time = datetime.utcnow()
            
            if self.stats.state == CircuitState.HALF_OPEN:
                # Any failure in half-open goes back to open
                self._transition_to(CircuitState.OPEN)
            
            elif self.stats.state == CircuitState.CLOSED:
                if self.stats.failure_count >= self.config.failure_threshold:
                    self._transition_to(CircuitState.OPEN)
    
    def _transition_to(self, new_state: CircuitState):
        """Transition to a new state."""
        
        old_state = self.stats.state
        self.stats.state = new_state
        self.stats.last_state_change = datetime.utcnow()
        
        # Reset counters
        if new_state == CircuitState.CLOSED:
            self.stats.failure_count = 0
            self.stats.success_count = 0
        elif new_state == CircuitState.HALF_OPEN:
            self.stats.half_open_calls = 0
            self.stats.success_count = 0
        
        logger.warning(
            f"Circuit {self.name} transitioned: {old_state.value} -> {new_state.value}"
        )
        
        if self.on_state_change:
            self.on_state_change(self.name, old_state, new_state)
    
    def _timeout_elapsed(self) -> bool:
        """Check if timeout has elapsed since last state change."""
        elapsed = datetime.utcnow() - self.stats.last_state_change
        return elapsed.total_seconds() >= self.config.timeout_seconds
    
    @property
    def is_open(self) -> bool:
        return self.stats.state == CircuitState.OPEN


class CircuitOpenError(Exception):
    pass


class CircuitBreakerRegistry:
    """Registry of circuit breakers for all providers."""
    
    def __init__(self):
        self.breakers: dict[str, CircuitBreaker] = {}
    
    def get_or_create(
        self,
        name: str,
        config: CircuitBreakerConfig = None
    ) -> CircuitBreaker:
        """Get or create a circuit breaker."""
        
        if name not in self.breakers:
            self.breakers[name] = CircuitBreaker(
                name=name,
                config=config,
                on_state_change=self._on_state_change
            )
        
        return self.breakers[name]
    
    def _on_state_change(self, name: str, old_state: CircuitState, new_state: CircuitState):
        """Handle circuit state changes."""
        # Could publish metrics, send alerts, etc.
        pass
    
    def get_all_status(self) -> dict:
        """Get status of all circuit breakers."""
        return {
            name: {
                "state": cb.stats.state.value,
                "failure_count": cb.stats.failure_count,
                "last_failure": cb.stats.last_failure_time.isoformat() if cb.stats.last_failure_time else None,
            }
            for name, cb in self.breakers.items()
        }

4.2 Provider with Circuit Breaker

# providers/fcm_with_circuit_breaker.py

class FCMProviderWithCircuitBreaker:
    """FCM provider with circuit breaker protection."""
    
    def __init__(
        self,
        fcm_provider: FCMProvider,
        circuit_breaker: CircuitBreaker
    ):
        self.provider = fcm_provider
        self.circuit = circuit_breaker
    
    async def send(self, *args, **kwargs) -> DeliveryResult:
        """Send with circuit breaker protection."""
        
        try:
            return await self.circuit.call(
                self.provider.send,
                *args,
                **kwargs
            )
            
        except CircuitOpenError:
            return DeliveryResult(
                status=DeliveryStatus.FAILED,
                error_code="CIRCUIT_OPEN",
                error_message="Provider circuit breaker is open",
                should_retry=True,
                should_fallback=True
            )

Chapter 5: Provider Failover

When one provider fails, automatically switch to backup.

5.1 Multi-Provider Manager

# services/provider_manager.py

from dataclasses import dataclass
from typing import Optional
import logging

logger = logging.getLogger(__name__)


@dataclass
class ProviderConfig:
    name: str
    priority: int  # Lower = higher priority
    weight: int = 100  # For load balancing
    is_fallback: bool = False


class ProviderManager:
    """
    Manages multiple providers per channel with failover.
    
    Features:
    - Primary/fallback configuration
    - Automatic failover on circuit break
    - Health-based routing
    - Load balancing across providers
    """
    
    def __init__(
        self,
        providers: dict,  # name -> provider instance
        circuit_registry: CircuitBreakerRegistry,
        health_checker
    ):
        self.providers = providers
        self.circuits = circuit_registry
        self.health = health_checker
        
        # Provider configs per channel
        self.configs = {
            "push": [
                ProviderConfig("fcm", priority=1),
                ProviderConfig("apns", priority=1),
            ],
            "email": [
                ProviderConfig("sendgrid", priority=1),
                ProviderConfig("ses", priority=2, is_fallback=True),
            ],
            "sms": [
                ProviderConfig("twilio", priority=1),
                ProviderConfig("sns", priority=2, is_fallback=True),
            ],
        }
    
    async def get_provider(
        self,
        channel: str,
        exclude: list[str] = None
    ) -> Optional[tuple[str, any]]:
        """
        Get the best available provider for a channel.
        
        Returns (provider_name, provider_instance) or None if all unavailable.
        """
        
        exclude = exclude or []
        configs = self.configs.get(channel, [])
        
        # Sort by priority
        sorted_configs = sorted(configs, key=lambda c: c.priority)
        
        for config in sorted_configs:
            if config.name in exclude:
                continue
            
            # Check circuit breaker
            circuit = self.circuits.get_or_create(config.name)
            if circuit.is_open:
                continue
            
            # Check health
            if not await self.health.is_healthy(config.name):
                continue
            
            provider = self.providers.get(config.name)
            if provider:
                return (config.name, provider)
        
        return None
    
    async def send_with_failover(
        self,
        channel: str,
        notification: dict,
        max_attempts: int = 3
    ) -> DeliveryResult:
        """
        Send notification with automatic failover.
        
        Tries providers in order until one succeeds.
        """
        
        attempted = []
        last_result = None
        
        for attempt in range(max_attempts):
            provider_info = await self.get_provider(channel, exclude=attempted)
            
            if not provider_info:
                break
            
            provider_name, provider = provider_info
            attempted.append(provider_name)
            
            try:
                result = await provider.send(notification)
                
                if result.status == DeliveryStatus.SUCCESS:
                    return result
                
                if not result.should_fallback:
                    return result
                
                last_result = result
                logger.info(
                    f"Provider {provider_name} failed, trying fallback"
                )
                
            except Exception as e:
                logger.error(f"Provider {provider_name} error: {e}")
                last_result = DeliveryResult(
                    status=DeliveryStatus.FAILED,
                    error_code="PROVIDER_ERROR",
                    error_message=str(e)
                )
        
        # All providers failed
        return last_result or DeliveryResult(
            status=DeliveryStatus.FAILED,
            error_code="NO_PROVIDERS",
            error_message="All providers unavailable"
        )

Part III: Edge Cases

Chapter 6: The Edge Case Encyclopedia

Every edge case you'll encounter in production.

6.1 Device Token Issues

# edge_cases/device_tokens.py

"""
Device Token Edge Cases

1. Token Rotation
   - iOS/Android periodically rotate device tokens
   - Old token becomes invalid
   - Solution: Handle InvalidToken, listen for new token events

2. Multiple Devices
   - User has phone + tablet + web
   - Each has different token
   - Solution: Send to all active tokens

3. Stale Tokens
   - User uninstalls app
   - Token remains in our database
   - Solution: Mark invalid on InvalidToken, periodic cleanup

4. Token Collision
   - Same token registered for multiple users (rare but possible)
   - Solution: Associate token with single user, update on conflict
"""

class DeviceTokenManager:
    """Handles device token lifecycle and edge cases."""
    
    async def handle_invalid_token(
        self,
        token: str,
        error_code: str
    ):
        """Handle token invalidation from provider."""
        
        if error_code in ["UNREGISTERED", "InvalidToken", "NotRegistered"]:
            # Token is permanently invalid
            await self.tokens.mark_invalid(
                token,
                reason=error_code,
                invalidated_at=datetime.utcnow()
            )
            
            logger.info(f"Marked token as invalid: {token[:20]}...")
    
    async def handle_token_refresh(
        self,
        user_id: str,
        old_token: str,
        new_token: str,
        platform: str
    ):
        """Handle token rotation from client."""
        
        # Validate new token format
        if not self._is_valid_token_format(new_token, platform):
            raise ValueError("Invalid token format")
        
        # Check if new token already exists for different user
        existing = await self.tokens.get_by_token(new_token)
        if existing and existing.user_id != user_id:
            # Token collision - reassign to new user
            await self.tokens.delete(existing.token_id)
        
        # Update or create
        if old_token:
            await self.tokens.update_token(
                user_id=user_id,
                old_token=old_token,
                new_token=new_token
            )
        else:
            await self.tokens.create(
                user_id=user_id,
                token=new_token,
                platform=platform
            )
    
    async def cleanup_stale_tokens(self, days_inactive: int = 90):
        """Remove tokens that haven't been used in a while."""
        
        cutoff = datetime.utcnow() - timedelta(days=days_inactive)
        
        deleted = await self.tokens.delete_inactive(before=cutoff)
        
        logger.info(f"Cleaned up {deleted} stale device tokens")

6.2 Email Bounce Handling

# edge_cases/email_bounces.py

"""
Email Bounce Edge Cases

1. Hard Bounce
   - Email address doesn't exist
   - Solution: Mark as invalid, never send again

2. Soft Bounce
   - Temporary issue (mailbox full, server down)
   - Solution: Retry with backoff, mark invalid after N failures

3. Complaint (Spam Report)
   - User marked as spam
   - Solution: Immediate unsubscribe, legal requirement

4. Unsubscribe
   - User clicked unsubscribe link
   - Solution: Update preferences, honor immediately
"""

class EmailBounceHandler:
    """Handles email bounces and complaints."""
    
    HARD_BOUNCE_CODES = [
        "550",  # User unknown
        "551",  # User not local
        "552",  # Mailbox full (sometimes hard)
        "553",  # Invalid address
        "554",  # Transaction failed
    ]
    
    async def handle_bounce(
        self,
        email: str,
        bounce_type: str,
        bounce_code: str,
        diagnostic: str
    ):
        """Handle email bounce webhook."""
        
        email_record = await self.emails.get_by_email(email)
        if not email_record:
            return
        
        if bounce_type == "hard" or bounce_code in self.HARD_BOUNCE_CODES:
            # Permanent failure - mark as invalid
            await self.emails.update(
                email_record.email_id,
                {
                    "status": "bounced",
                    "bounce_type": "hard",
                    "bounce_count": email_record.bounce_count + 1,
                    "last_bounce_at": datetime.utcnow(),
                }
            )
            
            logger.warning(f"Hard bounce for {email}: {diagnostic}")
            
        else:
            # Soft bounce - increment counter
            await self.emails.update(
                email_record.email_id,
                {
                    "bounce_count": email_record.bounce_count + 1,
                    "last_bounce_at": datetime.utcnow(),
                }
            )
            
            # Mark as invalid after too many soft bounces
            if email_record.bounce_count >= 5:
                await self.emails.update(
                    email_record.email_id,
                    {"status": "bounced", "bounce_type": "soft"}
                )
    
    async def handle_complaint(self, email: str, feedback_type: str):
        """Handle spam complaint - MUST unsubscribe immediately."""
        
        email_record = await self.emails.get_by_email(email)
        if not email_record:
            return
        
        # Mark as complained
        await self.emails.update(
            email_record.email_id,
            {"status": "complained"}
        )
        
        # Unsubscribe from all marketing
        await self.preferences.unsubscribe(
            email_record.user_id,
            category="marketing"
        )
        
        logger.warning(
            f"Spam complaint from {email}, unsubscribed from marketing"
        )

6.3 Timezone and DST Edge Cases

# edge_cases/timezones.py

"""
Timezone Edge Cases

1. DST Transitions
   - 2 AM becomes 3 AM (spring forward)
   - 2 AM happens twice (fall back)
   - Solution: Store in UTC, convert at display/send time

2. Timezone Changes
   - User travels to different timezone
   - Solution: Use device timezone or explicit user preference

3. Quiet Hours Across DST
   - User sets quiet hours 10 PM - 8 AM
   - DST changes, quiet hours might be wrong for a day
   - Solution: Re-evaluate on each notification

4. Scheduled Notifications
   - User schedules for "9 AM tomorrow"
   - Tomorrow has DST change
   - Solution: Store timezone with scheduled time
"""

import pytz
from datetime import datetime


class TimezoneAwareScheduler:
    """Handles timezone edge cases for scheduling."""
    
    def get_next_occurrence(
        self,
        user_timezone: str,
        target_time: time,
        after: datetime = None
    ) -> datetime:
        """
        Get next occurrence of a time in user's timezone.
        
        Handles DST correctly.
        """
        
        tz = pytz.timezone(user_timezone)
        after = after or datetime.utcnow()
        
        # Convert 'after' to user's timezone
        after_local = after.astimezone(tz)
        
        # Create candidate datetime in user's timezone
        candidate = tz.localize(
            datetime.combine(after_local.date(), target_time)
        )
        
        # If candidate is in past, move to next day
        if candidate <= after_local:
            candidate = tz.localize(
                datetime.combine(
                    after_local.date() + timedelta(days=1),
                    target_time
                )
            )
        
        # Handle DST: pytz.localize handles ambiguous times
        # For non-existent times (spring forward), it raises exception
        # We catch and adjust
        
        try:
            # Verify the time exists (isn't skipped by DST)
            tz.localize(candidate.replace(tzinfo=None), is_dst=None)
        except pytz.exceptions.NonExistentTimeError:
            # Time doesn't exist (DST spring forward)
            # Move forward by the DST offset (typically 1 hour)
            candidate = candidate + timedelta(hours=1)
        except pytz.exceptions.AmbiguousTimeError:
            # Time exists twice (DST fall back)
            # Use the first occurrence (DST=True means standard time)
            candidate = tz.localize(
                candidate.replace(tzinfo=None),
                is_dst=True
            )
        
        # Convert to UTC for storage
        return candidate.astimezone(pytz.UTC).replace(tzinfo=None)
    
    def is_quiet_hours(
        self,
        user_timezone: str,
        quiet_start: time,
        quiet_end: time,
        check_time: datetime = None
    ) -> bool:
        """Check if current time is in quiet hours, handling DST."""
        
        tz = pytz.timezone(user_timezone)
        check_time = check_time or datetime.utcnow()
        
        # Get current time in user's timezone
        local_time = check_time.astimezone(tz).time()
        
        # Handle overnight quiet hours (e.g., 10 PM - 8 AM)
        if quiet_start > quiet_end:
            return local_time >= quiet_start or local_time <= quiet_end
        else:
            return quiet_start <= local_time <= quiet_end

6.4 Race Conditions

# edge_cases/race_conditions.py

"""
Race Condition Edge Cases

1. Duplicate Sends
   - Same notification queued twice
   - Solution: Idempotency keys, deduplication window

2. Preference Update During Send
   - User opts out while notification is in queue
   - Solution: Check preferences at delivery time, not queue time

3. User Deletion During Send
   - User deletes account while notification queued
   - Solution: Check user exists before delivery

4. Concurrent Preference Updates
   - Two devices update preferences simultaneously
   - Solution: Optimistic locking with version
"""

class RaceConditionGuards:
    """Guards against common race conditions."""
    
    async def safe_deliver(
        self,
        notification: dict,
        preferences_service,
        user_service
    ) -> DeliveryResult:
        """Deliver with race condition checks."""
        
        user_id = notification["user_id"]
        
        # Check 1: User still exists
        user = await user_service.get(user_id)
        if not user or user.status == "deleted":
            return DeliveryResult(
                status=DeliveryStatus.FAILED,
                error_code="USER_DELETED",
                error_message="User account has been deleted"
            )
        
        # Check 2: Re-fetch preferences (might have changed since queue time)
        prefs = await preferences_service.get_preferences(user_id)
        
        category = notification.get("category", notification["type"])
        channel = notification["channel"]
        
        if not preferences_service.is_notification_allowed(prefs, category, channel):
            return DeliveryResult(
                status=DeliveryStatus.SKIPPED,
                error_code="USER_OPTED_OUT",
                error_message="User has opted out of this notification type"
            )
        
        # Check 3: Deduplication
        idempotency_key = notification.get("idempotency_key")
        if idempotency_key:
            if await self._is_duplicate(idempotency_key):
                return DeliveryResult(
                    status=DeliveryStatus.SKIPPED,
                    error_code="DUPLICATE",
                    error_message="Notification already sent"
                )
        
        # All checks passed, proceed with delivery
        return await self._do_deliver(notification)
    
    async def _is_duplicate(self, idempotency_key: str) -> bool:
        """Check if this is a duplicate send."""
        
        # Use Redis SET NX with TTL for deduplication window
        key = f"dedup:{idempotency_key}"
        
        result = await self.redis.set(
            key,
            "1",
            nx=True,  # Only set if not exists
            ex=3600   # 1 hour deduplication window
        )
        
        return result is None  # None means key already existed

Part IV: Failure Scenarios

Chapter 7: What Happens When X Fails?

7.1 Failure Matrix

FAILURE SCENARIO MATRIX

Component          │ Impact                           │ Recovery
───────────────────┼──────────────────────────────────┼───────────────────────
API Server         │ Requests rejected                │ Auto-scale, LB removes
PostgreSQL Primary │ Writes fail                      │ Promote replica
PostgreSQL Replica │ Read latency increases           │ Use other replicas
Redis              │ Cache miss, rate limit issues    │ Fall back to DB
Kafka Broker       │ Some partitions unavailable      │ Rebalance consumers
Kafka (all)        │ Cannot queue notifications       │ Buffer in memory/disk
FCM                │ Push delivery fails              │ Circuit break, fallback
SendGrid           │ Email delivery fails             │ Fallback to SES
Twilio             │ SMS delivery fails               │ Fallback to SNS
Worker             │ Partition processing stops       │ Kafka rebalances
Network (internal) │ Service communication fails      │ Timeouts, retries
Network (external) │ Provider unreachable             │ Circuit break

7.2 Graceful Degradation

# services/degradation.py

from enum import Enum
from typing import Optional
import logging

logger = logging.getLogger(__name__)


class SystemMode(Enum):
    NORMAL = "normal"
    DEGRADED = "degraded"      # Non-critical features disabled
    CRITICAL_ONLY = "critical"  # Only critical notifications


class GracefulDegradationService:
    """
    Manages graceful degradation during failures.
    
    When system is overloaded or failing:
    1. Shed low-priority load
    2. Disable non-essential features
    3. Preserve critical path
    """
    
    def __init__(self, redis_client, config):
        self.redis = redis_client
        self.config = config
    
    async def get_system_mode(self) -> SystemMode:
        """Get current system operating mode."""
        mode = await self.redis.get("system:mode")
        return SystemMode(mode.decode()) if mode else SystemMode.NORMAL
    
    async def set_system_mode(self, mode: SystemMode, reason: str):
        """Set system operating mode."""
        await self.redis.set("system:mode", mode.value)
        logger.warning(f"System mode changed to {mode.value}: {reason}")
    
    async def should_process(
        self,
        notification: dict,
        channel: str
    ) -> tuple[bool, Optional[str]]:
        """
        Determine if notification should be processed given current mode.
        
        Returns (should_process, skip_reason).
        """
        
        mode = await self.get_system_mode()
        priority = notification.get("priority", "medium")
        notification_type = notification.get("type")
        
        if mode == SystemMode.NORMAL:
            return (True, None)
        
        if mode == SystemMode.CRITICAL_ONLY:
            # Only process critical notifications
            if priority == "critical" or notification_type == "security":
                return (True, None)
            return (False, "System in critical-only mode")
        
        if mode == SystemMode.DEGRADED:
            # Skip low priority and some features
            if priority == "low":
                return (False, "Low priority skipped in degraded mode")
            
            # Disable non-essential channels
            if channel == "email" and notification_type == "marketing":
                return (False, "Marketing emails disabled in degraded mode")
            
            return (True, None)
        
        return (True, None)
    
    async def check_and_degrade(self, metrics: dict):
        """
        Check metrics and degrade if necessary.
        
        Called periodically by health checker.
        """
        
        current_mode = await self.get_system_mode()
        
        # Check queue depth
        queue_depth = metrics.get("kafka_lag", 0)
        max_acceptable = self.config.get("max_queue_depth", 1000000)
        
        if queue_depth > max_acceptable * 2:
            if current_mode != SystemMode.CRITICAL_ONLY:
                await self.set_system_mode(
                    SystemMode.CRITICAL_ONLY,
                    f"Queue depth critical: {queue_depth}"
                )
            return
        
        if queue_depth > max_acceptable:
            if current_mode == SystemMode.NORMAL:
                await self.set_system_mode(
                    SystemMode.DEGRADED,
                    f"Queue depth high: {queue_depth}"
                )
            return
        
        # Check error rate
        error_rate = metrics.get("error_rate", 0)
        
        if error_rate > 0.2:  # >20% errors
            if current_mode == SystemMode.NORMAL:
                await self.set_system_mode(
                    SystemMode.DEGRADED,
                    f"Error rate high: {error_rate:.1%}"
                )
            return
        
        # Recover if metrics are good
        if current_mode != SystemMode.NORMAL:
            if queue_depth < max_acceptable * 0.5 and error_rate < 0.05:
                await self.set_system_mode(
                    SystemMode.NORMAL,
                    "Metrics recovered"
                )

Part V: Cost Optimization

Chapter 8: Reducing Notification Costs

SMS costs $3M+/month. Let's fix that.

8.1 Channel Cost Model

# services/cost_optimization.py

from dataclasses import dataclass
from typing import Optional
import logging

logger = logging.getLogger(__name__)


@dataclass
class ChannelCost:
    """Cost per notification for each channel."""
    channel: str
    cost_per_unit: float  # USD
    unit_name: str
    monthly_budget: float


CHANNEL_COSTS = {
    "push": ChannelCost("push", 0.0, "notification", float("inf")),
    "in_app": ChannelCost("in_app", 0.0, "notification", float("inf")),
    "email": ChannelCost("email", 0.0001, "email", 15000),  # $0.10/1000
    "sms": ChannelCost("sms", 0.01, "message", 100000),     # $0.01/msg, $100K budget
}


class CostAwareChannelSelector:
    """
    Selects channels with cost awareness.
    
    Strategies:
    1. Prefer free channels (push, in-app)
    2. Fall back to email before SMS
    3. SMS only for critical or explicit user preference
    4. Track and cap spend per channel
    """
    
    def __init__(self, redis_client, channel_costs: dict = None):
        self.redis = redis_client
        self.costs = channel_costs or CHANNEL_COSTS
    
    async def select_channel(
        self,
        available_channels: list[str],
        notification: dict,
        user_preferences: dict
    ) -> Optional[str]:
        """
        Select the most cost-effective channel.
        
        Prioritizes:
        1. User's preferred channel (if set)
        2. Cheapest available channel
        3. Budget constraints
        """
        
        priority = notification.get("priority", "medium")
        notification_type = notification.get("type")
        
        # Critical notifications: user preference or all channels
        if priority == "critical":
            return self._select_for_critical(available_channels, user_preferences)
        
        # Sort by cost
        sorted_channels = sorted(
            available_channels,
            key=lambda c: self.costs.get(c, ChannelCost(c, 1.0, "", 0)).cost_per_unit
        )
        
        for channel in sorted_channels:
            # Check budget
            if not await self._has_budget(channel):
                continue
            
            # Check if appropriate for notification type
            if self._is_appropriate(channel, notification_type):
                return channel
        
        # Fallback to any available
        return sorted_channels[0] if sorted_channels else None
    
    def _select_for_critical(
        self,
        available_channels: list[str],
        preferences: dict
    ) -> str:
        """Select channel for critical notifications."""
        
        preferred = preferences.get("critical_channel")
        if preferred and preferred in available_channels:
            return preferred
        
        # Default priority for critical: push, then SMS
        for channel in ["push", "sms", "email"]:
            if channel in available_channels:
                return channel
        
        return available_channels[0] if available_channels else None
    
    def _is_appropriate(self, channel: str, notification_type: str) -> bool:
        """Check if channel is appropriate for notification type."""
        
        APPROPRIATE_CHANNELS = {
            "security": ["push", "sms", "email"],
            "transaction": ["push", "in_app", "email"],
            "marketing": ["email", "push"],
            "reminder": ["push", "email"],
            "social": ["push", "in_app"],
        }
        
        allowed = APPROPRIATE_CHANNELS.get(notification_type, ["push", "email"])
        return channel in allowed
    
    async def _has_budget(self, channel: str) -> bool:
        """Check if channel has remaining budget."""
        
        cost = self.costs.get(channel)
        if not cost or cost.monthly_budget == float("inf"):
            return True
        
        # Get current month's spend
        month_key = datetime.utcnow().strftime("%Y-%m")
        spend_key = f"channel_spend:{channel}:{month_key}"
        
        current_spend = await self.redis.get(spend_key)
        current_spend = float(current_spend or 0)
        
        return current_spend < cost.monthly_budget
    
    async def record_send(self, channel: str, count: int = 1):
        """Record a send for budget tracking."""
        
        cost = self.costs.get(channel)
        if not cost:
            return
        
        month_key = datetime.utcnow().strftime("%Y-%m")
        spend_key = f"channel_spend:{channel}:{month_key}"
        
        spend = cost.cost_per_unit * count
        
        await self.redis.incrbyfloat(spend_key, spend)
        await self.redis.expire(spend_key, 86400 * 35)  # Keep 35 days

8.2 SMS Optimization Strategies

# services/sms_optimization.py

"""
SMS Cost Optimization Strategies

SMS is 100x more expensive than email. Use sparingly.

Strategies:
1. Only use for critical notifications
2. Push first, SMS as fallback only if push fails
3. Combine multiple messages into one
4. Use short codes for high volume (better rates)
5. Regional pricing (some countries are cheaper)
"""

class SMSOptimizer:
    """Optimizes SMS usage to reduce costs."""
    
    # Countries with high SMS costs (>$0.05/msg)
    EXPENSIVE_COUNTRIES = ["US", "CA", "AU", "GB", "DE", "FR"]
    
    # Countries with low SMS costs (<$0.01/msg)
    CHEAP_COUNTRIES = ["IN", "PH", "ID", "VN", "NG"]
    
    def should_use_sms(
        self,
        notification: dict,
        user: dict,
        push_available: bool
    ) -> tuple[bool, str]:
        """
        Determine if SMS should be used.
        
        Returns (should_use, reason).
        """
        
        notification_type = notification.get("type")
        priority = notification.get("priority")
        country = user.get("country_code")
        
        # Always use SMS for security alerts
        if notification_type == "security" and priority == "critical":
            return (True, "Critical security notification")
        
        # If push is available and working, don't use SMS
        if push_available and notification_type not in ["security"]:
            return (False, "Push notification available")
        
        # Check user's explicit preference
        if user.get("sms_preference") == "never":
            return (False, "User opted out of SMS")
        
        # Cost-based decision for expensive countries
        if country in self.EXPENSIVE_COUNTRIES:
            if notification_type in ["marketing", "social"]:
                return (False, f"SMS too expensive for {notification_type} in {country}")
        
        # Fallback: use SMS if it's the only option for important notifications
        if priority in ["critical", "high"]:
            return (True, "Fallback for high priority notification")
        
        return (False, "SMS not justified for this notification")
    
    def optimize_message(self, message: str) -> str:
        """
        Optimize SMS message to fit in single segment.
        
        Standard SMS: 160 chars (GSM-7) or 70 chars (Unicode)
        Multi-segment costs more!
        """
        
        # Check if Unicode needed
        if self._needs_unicode(message):
            max_length = 70
        else:
            max_length = 160
        
        if len(message) <= max_length:
            return message
        
        # Truncate with ellipsis
        return message[:max_length - 3] + "..."
    
    def _needs_unicode(self, text: str) -> bool:
        """Check if text requires Unicode encoding."""
        try:
            text.encode('gsm03.38')
            return False
        except (UnicodeEncodeError, LookupError):
            return True

Summary

What We Covered Today

DAY 4 SUMMARY: SCALE, RELIABILITY & EDGE CASES

SCALING CHALLENGES
├── Campaign mode (10M in 1 hour)
│   ├── Rate-limited ingestion
│   ├── Batched processing
│   └── Priority isolation
├── Hot user problem (fan-out)
│   ├── Async fan-out
│   ├── Batched follower queries
│   └── Partition-aware routing
└── Provider rate limits
    ├── Token bucket limiter
    ├── Backoff on rate limit errors
    └── Per-provider configuration

RELIABILITY PATTERNS
├── Circuit breakers
│   ├── CLOSED → OPEN → HALF_OPEN cycle
│   ├── Failure threshold triggers
│   └── Automatic recovery testing
├── Provider failover
│   ├── Primary/fallback configuration
│   ├── Health-based routing
│   └── Automatic fallover
└── Graceful degradation
    ├── System modes (normal/degraded/critical)
    ├── Load shedding
    └── Auto-recovery

EDGE CASES
├── Device tokens
│   ├── Token rotation
│   ├── Multiple devices
│   ├── Stale token cleanup
│   └── Token collision
├── Email bounces
│   ├── Hard vs soft bounces
│   ├── Spam complaints
│   └── Unsubscribe handling
├── Timezones
│   ├── DST transitions
│   ├── Quiet hours across DST
│   └── Scheduled notification DST
└── Race conditions
    ├── Duplicate sends
    ├── Preference changes during delivery
    ├── User deletion during delivery
    └── Concurrent updates

COST OPTIMIZATION
├── Channel cost model
├── Cost-aware channel selection
├── SMS optimization strategies
└── Budget tracking and caps

Interview Tip of the Day

INTERVIEW TIP: SHOW YOU'VE OPERATED REAL SYSTEMS

When asked "what can go wrong?", show operational experience:

"At scale, several things break:

1. Campaign spikes - Marketing sends 10M at once. We rate-limit
   ingestion and use priority queues so campaigns don't affect
   transaction notifications.

2. Provider failures - We have circuit breakers per provider.
   If SendGrid fails, we automatically failover to SES. The
   circuit opens after 5 failures, tests recovery every 30 seconds.

3. Hot users - Celebrity notifications fan out to millions.
   We process these asynchronously with rate limiting to avoid
   overwhelming downstream systems.

4. The worst edge case I've seen: a user changes their email
   while we're mid-send. The notification was queued for old
   email. Now we re-check preferences at delivery time, not
   just queue time."

This shows you've dealt with real production issues.

End of Week 6, Day 4

Tomorrow: Day 5 — Operations, Monitoring & Interview Mastery