Himanshu Kukreja
0%
LearnSystem DesignWeek 6Core Notification Flow
Day 02

Week 6 — Day 2: Core Notification Flow

System Design Mastery Series — Practical Application Week


Introduction

Yesterday we understood the problem and created the high-level design. Today we build the core notification flow — the path a notification takes from creation to delivery.

Today's Theme: "The happy path must be bulletproof"

By the end of today, you'll have production-ready code for:

  • Notification ingestion with validation
  • Preference-based routing
  • Provider integration (FCM, APNs, SendGrid, Twilio)
  • Delivery tracking with status management

Part I: Notification Ingestion

Chapter 1: The Send API

When an internal service wants to send a notification, it calls our Send API. This is the entry point to our system.

1.1 API Design Principles

INGESTION DESIGN PRINCIPLES

1. FAST RESPONSE
   - Return immediately after validation
   - Don't wait for delivery
   - Async processing via queue

2. IDEMPOTENT
   - Same request = same result
   - Use idempotency keys
   - Safe to retry

3. VALIDATED
   - Reject bad requests early
   - Clear error messages
   - Don't let garbage into the queue

4. TRACEABLE
   - Assign notification ID immediately
   - Return ID to caller
   - Enable status tracking

1.2 Request/Response Models

# models/notification.py

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import uuid


class NotificationType(Enum):
    TRANSACTION = "transaction"
    SECURITY = "security"
    MARKETING = "marketing"
    REMINDER = "reminder"
    SOCIAL = "social"


class NotificationPriority(Enum):
    CRITICAL = "critical"  # Security alerts, fraud
    HIGH = "high"          # Transactions, payments
    MEDIUM = "medium"      # Social, reminders
    LOW = "low"            # Marketing, newsletters


class NotificationStatus(Enum):
    PENDING = "pending"
    QUEUED = "queued"
    ROUTING = "routing"
    SENDING = "sending"
    SENT = "sent"
    DELIVERED = "delivered"
    FAILED = "failed"
    BOUNCED = "bounced"


class Channel(Enum):
    PUSH = "push"
    EMAIL = "email"
    SMS = "sms"
    IN_APP = "in_app"
    WHATSAPP = "whatsapp"


@dataclass
class SendNotificationRequest:
    """Request to send a notification."""
    user_id: str
    type: NotificationType
    template: str
    variables: dict = field(default_factory=dict)
    
    # Optional overrides
    channels: Optional[list[Channel]] = None
    priority: Optional[NotificationPriority] = None
    scheduled_at: Optional[datetime] = None
    
    # Metadata for tracking
    metadata: dict = field(default_factory=dict)
    idempotency_key: Optional[str] = None


@dataclass
class Notification:
    """Internal notification model."""
    notification_id: str
    user_id: str
    type: NotificationType
    priority: NotificationPriority
    template: str
    variables: dict
    
    status: NotificationStatus = NotificationStatus.PENDING
    channel: Optional[Channel] = None
    
    # Rendered content
    title: Optional[str] = None
    body: Optional[str] = None
    
    # Tracking
    idempotency_key: Optional[str] = None
    scheduled_at: Optional[datetime] = None
    created_at: datetime = field(default_factory=datetime.utcnow)
    
    # Delivery info
    provider_message_id: Optional[str] = None
    sent_at: Optional[datetime] = None
    delivered_at: Optional[datetime] = None
    
    # Error tracking
    error_code: Optional[str] = None
    error_message: Optional[str] = None
    retry_count: int = 0
    
    metadata: dict = field(default_factory=dict)


@dataclass
class SendNotificationResponse:
    """Response after accepting a notification."""
    notification_id: str
    status: NotificationStatus
    created_at: datetime

1.3 The Send API Implementation

# api/send.py

from fastapi import APIRouter, HTTPException, Header, Depends
from typing import Optional
import uuid

from models.notification import (
    SendNotificationRequest, SendNotificationResponse,
    Notification, NotificationStatus, NotificationPriority
)
from services.validation import ValidationService
from services.outbox import OutboxService
from repositories.notification import NotificationRepository
from repositories.idempotency import IdempotencyRepository

router = APIRouter()


class NotificationSendAPI:
    """
    Notification Send API.
    
    Handles incoming notification requests, validates them,
    and queues them for processing.
    
    Week 2 Concept: Idempotency
    Week 3 Concept: Transactional Outbox
    """
    
    def __init__(
        self,
        validator: ValidationService,
        notification_repo: NotificationRepository,
        idempotency_repo: IdempotencyRepository,
        outbox: OutboxService
    ):
        self.validator = validator
        self.notifications = notification_repo
        self.idempotency = idempotency_repo
        self.outbox = outbox
    
    async def send(
        self,
        request: SendNotificationRequest,
        request_id: Optional[str] = None
    ) -> SendNotificationResponse:
        """
        Accept a notification request.
        
        Steps:
        1. Check idempotency (have we seen this before?)
        2. Validate the request
        3. Create notification record
        4. Write to outbox (same transaction)
        5. Return notification ID
        """
        
        # Step 1: Idempotency check
        idempotency_key = request.idempotency_key or request_id
        if idempotency_key:
            existing = await self.idempotency.get(idempotency_key)
            if existing:
                # Return the existing notification (don't create duplicate)
                return SendNotificationResponse(
                    notification_id=existing.notification_id,
                    status=existing.status,
                    created_at=existing.created_at
                )
        
        # Step 2: Validate request
        validation_result = await self.validator.validate(request)
        if not validation_result.is_valid:
            raise HTTPException(
                status_code=400,
                detail={
                    "error": "VALIDATION_ERROR",
                    "message": validation_result.error_message,
                    "field": validation_result.error_field
                }
            )
        
        # Step 3: Create notification
        notification = Notification(
            notification_id=str(uuid.uuid4()),
            user_id=request.user_id,
            type=request.type,
            priority=self._determine_priority(request),
            template=request.template,
            variables=request.variables,
            idempotency_key=idempotency_key,
            scheduled_at=request.scheduled_at,
            metadata=request.metadata
        )
        
        # Step 4: Save notification AND write to outbox atomically
        # This is the transactional outbox pattern from Week 3
        async with self.notifications.transaction() as txn:
            await self.notifications.save(notification, txn)
            
            # Determine which Kafka topic based on priority
            topic = self._get_topic_for_priority(notification.priority)
            
            await self.outbox.write(
                topic=topic,
                key=notification.user_id,  # Partition by user for ordering
                payload=notification.to_dict(),
                transaction=txn
            )
            
            # Save idempotency mapping
            if idempotency_key:
                await self.idempotency.save(
                    idempotency_key, 
                    notification,
                    transaction=txn
                )
        
        # Step 5: Return response
        return SendNotificationResponse(
            notification_id=notification.notification_id,
            status=NotificationStatus.QUEUED,
            created_at=notification.created_at
        )
    
    def _determine_priority(self, request: SendNotificationRequest) -> NotificationPriority:
        """Determine notification priority based on type or explicit override."""
        if request.priority:
            return request.priority
        
        # Default priorities by type
        PRIORITY_MAP = {
            NotificationType.SECURITY: NotificationPriority.CRITICAL,
            NotificationType.TRANSACTION: NotificationPriority.HIGH,
            NotificationType.SOCIAL: NotificationPriority.MEDIUM,
            NotificationType.REMINDER: NotificationPriority.MEDIUM,
            NotificationType.MARKETING: NotificationPriority.LOW,
        }
        return PRIORITY_MAP.get(request.type, NotificationPriority.MEDIUM)
    
    def _get_topic_for_priority(self, priority: NotificationPriority) -> str:
        """Map priority to Kafka topic."""
        TOPIC_MAP = {
            NotificationPriority.CRITICAL: "notifications.critical",
            NotificationPriority.HIGH: "notifications.high",
            NotificationPriority.MEDIUM: "notifications.medium",
            NotificationPriority.LOW: "notifications.low",
        }
        return TOPIC_MAP[priority]


# FastAPI route
@router.post("/v1/notifications", response_model=SendNotificationResponse)
async def send_notification(
    request: SendNotificationRequest,
    x_request_id: Optional[str] = Header(None),
    api: NotificationSendAPI = Depends()
):
    return await api.send(request, request_id=x_request_id)

1.4 Validation Service

# services/validation.py

from dataclasses import dataclass
from typing import Optional

from models.notification import SendNotificationRequest
from repositories.user import UserRepository
from repositories.template import TemplateRepository


@dataclass
class ValidationResult:
    is_valid: bool
    error_message: Optional[str] = None
    error_field: Optional[str] = None


class ValidationService:
    """
    Validates notification requests before processing.
    
    Catches errors early to avoid wasting resources on invalid requests.
    """
    
    def __init__(
        self,
        user_repo: UserRepository,
        template_repo: TemplateRepository
    ):
        self.users = user_repo
        self.templates = template_repo
    
    async def validate(self, request: SendNotificationRequest) -> ValidationResult:
        """Validate a notification request."""
        
        # Check user exists
        user = await self.users.get(request.user_id)
        if not user:
            return ValidationResult(
                is_valid=False,
                error_message=f"User not found: {request.user_id}",
                error_field="user_id"
            )
        
        # Check user hasn't been deleted/banned
        if user.status != "active":
            return ValidationResult(
                is_valid=False,
                error_message=f"User is not active: {user.status}",
                error_field="user_id"
            )
        
        # Check template exists
        template = await self.templates.get(request.template)
        if not template:
            return ValidationResult(
                is_valid=False,
                error_message=f"Template not found: {request.template}",
                error_field="template"
            )
        
        # Validate required variables are provided
        missing_vars = self._check_required_variables(
            template.variables_schema,
            request.variables
        )
        if missing_vars:
            return ValidationResult(
                is_valid=False,
                error_message=f"Missing required variables: {missing_vars}",
                error_field="variables"
            )
        
        # Validate scheduled_at is in future
        if request.scheduled_at:
            from datetime import datetime, timedelta
            min_schedule = datetime.utcnow() + timedelta(minutes=1)
            if request.scheduled_at < min_schedule:
                return ValidationResult(
                    is_valid=False,
                    error_message="scheduled_at must be at least 1 minute in the future",
                    error_field="scheduled_at"
                )
        
        return ValidationResult(is_valid=True)
    
    def _check_required_variables(
        self,
        schema: dict,
        provided: dict
    ) -> list[str]:
        """Check which required variables are missing."""
        if not schema:
            return []
        
        required = schema.get("required", [])
        return [var for var in required if var not in provided]

Chapter 2: Transactional Outbox

The transactional outbox pattern ensures we never lose notifications, even if Kafka is temporarily unavailable.

2.1 Why Outbox?

THE PROBLEM WITHOUT OUTBOX

Naive approach:
  1. Save notification to database
  2. Publish to Kafka

What can go wrong:

Scenario A: Database succeeds, Kafka fails
  → Notification saved but never processed
  → USER NEVER GETS NOTIFIED

Scenario B: Kafka succeeds, database fails
  → Notification processed but not saved
  → Can't track status, might retry and DUPLICATE


THE OUTBOX SOLUTION

Instead:
  1. Save notification to database  ─┐
  2. Write to outbox table          ─┴─ SAME TRANSACTION
  3. Background process reads outbox and publishes to Kafka
  
If step 3 fails:
  → Outbox entry still exists
  → Retry until successful
  → NEVER LOSE A NOTIFICATION

2.2 Outbox Implementation

# services/outbox.py

import json
import asyncio
from datetime import datetime
from typing import Optional
import logging

logger = logging.getLogger(__name__)


class OutboxService:
    """
    Transactional outbox for reliable event publishing.
    
    Week 3 Concept: Transactional Outbox Pattern
    """
    
    def __init__(self, db_pool, kafka_producer):
        self.db = db_pool
        self.kafka = kafka_producer
    
    async def write(
        self,
        topic: str,
        key: str,
        payload: dict,
        transaction=None
    ):
        """
        Write an event to the outbox table.
        
        Must be called within the same transaction as the business logic.
        """
        conn = transaction or await self.db.acquire()
        
        await conn.execute("""
            INSERT INTO notification_outbox (topic, partition_key, payload)
            VALUES ($1, $2, $3)
        """, topic, key, json.dumps(payload))


class OutboxProcessor:
    """
    Background processor that publishes outbox events to Kafka.
    
    Runs continuously, polling for unpublished events.
    """
    
    def __init__(
        self,
        db_pool,
        kafka_producer,
        batch_size: int = 100,
        poll_interval_ms: int = 100
    ):
        self.db = db_pool
        self.kafka = kafka_producer
        self.batch_size = batch_size
        self.poll_interval = poll_interval_ms / 1000
        self.running = False
    
    async def start(self):
        """Start the outbox processor."""
        self.running = True
        logger.info("Outbox processor started")
        
        while self.running:
            try:
                processed = await self._process_batch()
                
                if processed == 0:
                    # No events to process, wait before polling again
                    await asyncio.sleep(self.poll_interval)
                    
            except Exception as e:
                logger.error(f"Outbox processor error: {e}")
                await asyncio.sleep(1)  # Back off on error
    
    async def stop(self):
        """Stop the outbox processor."""
        self.running = False
        logger.info("Outbox processor stopped")
    
    async def _process_batch(self) -> int:
        """Process a batch of outbox events."""
        
        async with self.db.acquire() as conn:
            # Fetch unpublished events
            # Use FOR UPDATE SKIP LOCKED for concurrent processors
            events = await conn.fetch("""
                SELECT outbox_id, topic, partition_key, payload
                FROM notification_outbox
                WHERE published = false
                ORDER BY outbox_id
                LIMIT $1
                FOR UPDATE SKIP LOCKED
            """, self.batch_size)
            
            if not events:
                return 0
            
            # Publish each event to Kafka
            published_ids = []
            
            for event in events:
                try:
                    await self.kafka.send(
                        topic=event['topic'],
                        key=event['partition_key'].encode(),
                        value=event['payload'].encode()
                    )
                    published_ids.append(event['outbox_id'])
                    
                except Exception as e:
                    logger.error(f"Failed to publish event {event['outbox_id']}: {e}")
                    # Continue with other events
            
            # Mark published events
            if published_ids:
                await conn.execute("""
                    UPDATE notification_outbox
                    SET published = true, published_at = NOW()
                    WHERE outbox_id = ANY($1)
                """, published_ids)
            
            return len(published_ids)

Part II: Routing

Chapter 3: The Router Service

The router consumes notifications from Kafka and determines which channels to use based on user preferences.

3.1 User Preferences

# models/preferences.py

from dataclasses import dataclass, field
from typing import Optional
from datetime import time


@dataclass
class ChannelPreference:
    enabled: bool = True
    priority: int = 1  # Lower = higher priority


@dataclass
class CategoryPreference:
    enabled: bool = True
    channels: list[str] = field(default_factory=list)


@dataclass
class QuietHours:
    enabled: bool = False
    start: Optional[time] = None  # e.g., 22:00
    end: Optional[time] = None    # e.g., 08:00


@dataclass
class UserPreferences:
    user_id: str
    global_enabled: bool = True
    
    channels: dict[str, ChannelPreference] = field(default_factory=dict)
    categories: dict[str, CategoryPreference] = field(default_factory=dict)
    
    quiet_hours: QuietHours = field(default_factory=QuietHours)
    timezone: str = "UTC"
    
    # For optimistic locking
    version: int = 1
    
    def is_channel_enabled(self, channel: str) -> bool:
        """Check if a channel is enabled."""
        if not self.global_enabled:
            return False
        pref = self.channels.get(channel)
        return pref.enabled if pref else True
    
    def get_channels_for_category(self, category: str) -> list[str]:
        """Get enabled channels for a category."""
        cat_pref = self.categories.get(category)
        if not cat_pref or not cat_pref.enabled:
            return []
        
        # Filter by channel-level preferences
        return [
            ch for ch in cat_pref.channels
            if self.is_channel_enabled(ch)
        ]

3.2 Router Implementation

# services/router.py

from datetime import datetime, timezone
from typing import Optional
import pytz
import logging

from models.notification import Notification, Channel, NotificationStatus
from models.preferences import UserPreferences
from repositories.preferences import PreferencesRepository
from repositories.device_token import DeviceTokenRepository
from services.kafka_producer import KafkaProducer

logger = logging.getLogger(__name__)


class NotificationRouter:
    """
    Routes notifications to appropriate channels based on user preferences.
    
    This is the brain of the notification system.
    """
    
    def __init__(
        self,
        preferences_repo: PreferencesRepository,
        device_tokens_repo: DeviceTokenRepository,
        kafka_producer: KafkaProducer
    ):
        self.preferences = preferences_repo
        self.device_tokens = device_tokens_repo
        self.kafka = kafka_producer
    
    async def route(self, notification: Notification) -> list[Channel]:
        """
        Determine which channels to use for this notification.
        
        Returns list of channels that will receive the notification.
        """
        
        # Step 1: Get user preferences (from cache or DB)
        prefs = await self.preferences.get(notification.user_id)
        if not prefs:
            # No preferences = use defaults
            prefs = UserPreferences(user_id=notification.user_id)
        
        # Step 2: Check if notifications are globally disabled
        if not prefs.global_enabled:
            logger.info(f"User {notification.user_id} has notifications disabled")
            return []
        
        # Step 3: Get channels for this notification type
        category = notification.type.value
        channels = prefs.get_channels_for_category(category)
        
        if not channels:
            # Use default channels for this notification type
            channels = self._get_default_channels(notification)
        
        # Step 4: Filter by quiet hours
        if self._is_quiet_hours(prefs, notification):
            channels = self._filter_for_quiet_hours(channels, notification)
        
        # Step 5: Check channel availability (e.g., has device token for push)
        available_channels = await self._filter_by_availability(
            channels, notification.user_id
        )
        
        # Step 6: Publish to channel-specific topics
        for channel in available_channels:
            await self._publish_to_channel(notification, channel)
        
        return available_channels
    
    def _get_default_channels(self, notification: Notification) -> list[str]:
        """Get default channels based on notification type."""
        
        DEFAULTS = {
            "security": ["push", "sms", "email"],   # Multi-channel for security
            "transaction": ["push", "in_app"],
            "marketing": ["email"],
            "reminder": ["push", "email"],
            "social": ["push", "in_app"],
        }
        
        return DEFAULTS.get(notification.type.value, ["push"])
    
    def _is_quiet_hours(self, prefs: UserPreferences, notification: Notification) -> bool:
        """Check if current time is in user's quiet hours."""
        
        if not prefs.quiet_hours.enabled:
            return False
        
        # Get current time in user's timezone
        user_tz = pytz.timezone(prefs.timezone)
        user_now = datetime.now(user_tz).time()
        
        start = prefs.quiet_hours.start
        end = prefs.quiet_hours.end
        
        if start <= end:
            # Simple case: e.g., 09:00 - 17:00
            return start <= user_now <= end
        else:
            # Overnight: e.g., 22:00 - 08:00
            return user_now >= start or user_now <= end
    
    def _filter_for_quiet_hours(
        self,
        channels: list[str],
        notification: Notification
    ) -> list[str]:
        """Filter channels during quiet hours."""
        
        # Critical notifications bypass quiet hours
        if notification.priority.value == "critical":
            return channels
        
        # During quiet hours, only use non-intrusive channels
        NON_INTRUSIVE = {"email", "in_app"}
        return [ch for ch in channels if ch in NON_INTRUSIVE]
    
    async def _filter_by_availability(
        self,
        channels: list[str],
        user_id: str
    ) -> list[str]:
        """Filter channels based on what's actually available for this user."""
        
        available = []
        
        for channel in channels:
            if channel == "push":
                # Check if user has any valid device tokens
                tokens = await self.device_tokens.get_active_tokens(user_id)
                if tokens:
                    available.append(channel)
                    
            elif channel == "email":
                # Email is always "available" if user has email
                # (bounce handling happens at delivery time)
                available.append(channel)
                
            elif channel == "sms":
                # SMS is always "available" if user has phone
                available.append(channel)
                
            elif channel == "in_app":
                # In-app is always available
                available.append(channel)
        
        return available
    
    async def _publish_to_channel(self, notification: Notification, channel: str):
        """Publish notification to channel-specific Kafka topic."""
        
        topic = f"channel.{channel}"
        
        # Add channel to notification
        channel_notification = {
            **notification.to_dict(),
            "channel": channel,
            "status": NotificationStatus.ROUTING.value
        }
        
        await self.kafka.send(
            topic=topic,
            key=notification.user_id.encode(),
            value=channel_notification
        )
        
        logger.info(
            f"Routed notification {notification.notification_id} "
            f"to channel {channel}"
        )

3.3 Router Consumer (Kafka)

# workers/router_worker.py

import asyncio
import json
import logging
from aiokafka import AIOKafkaConsumer

from models.notification import Notification
from services.router import NotificationRouter

logger = logging.getLogger(__name__)


class RouterWorker:
    """
    Kafka consumer that routes notifications.
    
    Consumes from priority topics and routes to channel topics.
    """
    
    def __init__(
        self,
        kafka_brokers: str,
        router: NotificationRouter,
        consumer_group: str = "notification-router"
    ):
        self.brokers = kafka_brokers
        self.router = router
        self.group = consumer_group
        self.consumer = None
        self.running = False
    
    async def start(self):
        """Start consuming notifications."""
        
        self.consumer = AIOKafkaConsumer(
            "notifications.critical",
            "notifications.high",
            "notifications.medium",
            "notifications.low",
            bootstrap_servers=self.brokers,
            group_id=self.group,
            auto_offset_reset="earliest",
            enable_auto_commit=False  # Manual commit for reliability
        )
        
        await self.consumer.start()
        self.running = True
        
        logger.info("Router worker started")
        
        try:
            async for message in self.consumer:
                await self._process_message(message)
                
        finally:
            await self.consumer.stop()
    
    async def stop(self):
        """Stop the worker."""
        self.running = False
    
    async def _process_message(self, message):
        """Process a single notification message."""
        
        try:
            payload = json.loads(message.value.decode())
            notification = Notification.from_dict(payload)
            
            # Check if scheduled for later
            if notification.scheduled_at and notification.scheduled_at > datetime.utcnow():
                # Re-queue with delay (or use a scheduling service)
                await self._reschedule(notification)
                await self.consumer.commit()
                return
            
            # Route the notification
            channels = await self.router.route(notification)
            
            if not channels:
                logger.warning(
                    f"No channels available for notification {notification.notification_id}"
                )
            
            # Commit offset after successful processing
            await self.consumer.commit()
            
        except Exception as e:
            logger.error(f"Error processing message: {e}", exc_info=True)
            # Don't commit - message will be reprocessed
            # In production, implement dead letter queue after N retries

Part III: Template Rendering

Chapter 4: The Template Engine

Before delivering a notification, we need to render the template with user-specific variables.

4.1 Template Model

# models/template.py

from dataclasses import dataclass, field
from typing import Optional


@dataclass
class Template:
    template_id: str
    name: str          # e.g., "payment_received"
    channel: str       # push, email, sms, etc.
    locale: str        # en-US, es-ES, etc.
    
    # Content
    title: Optional[str] = None    # For push/in-app
    subject: Optional[str] = None  # For email
    body: str = ""
    html_body: Optional[str] = None  # For email HTML
    
    # Rich content
    image_url: Optional[str] = None
    action_url: Optional[str] = None
    action_buttons: list[dict] = field(default_factory=list)
    
    # Variable schema for validation
    variables_schema: dict = field(default_factory=dict)
    
    version: int = 1
    is_active: bool = True

4.2 Template Service

# services/template.py

import re
from typing import Optional
import logging

from models.template import Template
from repositories.template import TemplateRepository
from services.cache import CacheService

logger = logging.getLogger(__name__)


class TemplateService:
    """
    Template rendering service.
    
    Handles:
    - Template lookup with caching
    - Variable substitution
    - Localization
    - Fallback to default locale
    """
    
    def __init__(
        self,
        template_repo: TemplateRepository,
        cache: CacheService
    ):
        self.templates = template_repo
        self.cache = cache
        
        # Simple variable pattern: {{variable_name}}
        self.var_pattern = re.compile(r'\{\{(\w+)\}\}')
    
    async def render(
        self,
        template_name: str,
        channel: str,
        variables: dict,
        locale: str = "en-US"
    ) -> dict:
        """
        Render a template with variables.
        
        Returns dict with rendered title, body, etc.
        """
        
        # Get template (with locale fallback)
        template = await self._get_template(template_name, channel, locale)
        
        if not template:
            raise TemplateNotFoundError(
                f"Template not found: {template_name}/{channel}/{locale}"
            )
        
        # Render each field
        result = {
            "template_id": template.template_id,
            "channel": channel,
        }
        
        if template.title:
            result["title"] = self._substitute(template.title, variables)
        
        if template.subject:
            result["subject"] = self._substitute(template.subject, variables)
        
        if template.body:
            result["body"] = self._substitute(template.body, variables)
        
        if template.html_body:
            result["html_body"] = self._substitute(template.html_body, variables)
        
        if template.image_url:
            result["image_url"] = self._substitute(template.image_url, variables)
        
        if template.action_url:
            result["action_url"] = self._substitute(template.action_url, variables)
        
        if template.action_buttons:
            result["action_buttons"] = template.action_buttons
        
        return result
    
    async def _get_template(
        self,
        name: str,
        channel: str,
        locale: str
    ) -> Optional[Template]:
        """Get template with caching and locale fallback."""
        
        # Try cache first
        cache_key = f"template:{name}:{channel}:{locale}"
        cached = await self.cache.get(cache_key)
        if cached:
            return Template.from_dict(cached)
        
        # Try exact locale
        template = await self.templates.get_by_name(name, channel, locale)
        
        # Fallback to base locale (e.g., en-US -> en)
        if not template and "-" in locale:
            base_locale = locale.split("-")[0]
            template = await self.templates.get_by_name(name, channel, base_locale)
        
        # Fallback to default locale
        if not template:
            template = await self.templates.get_by_name(name, channel, "en-US")
        
        # Cache the result
        if template:
            await self.cache.set(cache_key, template.to_dict(), ttl=600)  # 10 min
        
        return template
    
    def _substitute(self, text: str, variables: dict) -> str:
        """Substitute variables in text."""
        
        def replace(match):
            var_name = match.group(1)
            value = variables.get(var_name, "")
            return str(value)
        
        return self.var_pattern.sub(replace, text)


class TemplateNotFoundError(Exception):
    pass

Part IV: Provider Integration

Chapter 5: Provider Abstraction

We need to integrate with multiple providers (FCM, APNs, SendGrid, Twilio). A clean abstraction makes this manageable.

5.1 Provider Interface

# providers/base.py

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
from enum import Enum


class DeliveryStatus(Enum):
    SUCCESS = "success"
    FAILED = "failed"
    RATE_LIMITED = "rate_limited"
    INVALID_TOKEN = "invalid_token"
    BOUNCED = "bounced"


@dataclass
class DeliveryResult:
    """Result of a delivery attempt."""
    status: DeliveryStatus
    provider_message_id: Optional[str] = None
    error_code: Optional[str] = None
    error_message: Optional[str] = None
    should_retry: bool = False
    should_fallback: bool = False


@dataclass 
class ProviderHealth:
    """Health status of a provider."""
    is_healthy: bool
    success_rate: float  # 0.0 - 1.0
    avg_latency_ms: float
    last_check: str


class NotificationProvider(ABC):
    """
    Base class for notification providers.
    
    All providers implement this interface for consistent behavior.
    """
    
    @property
    @abstractmethod
    def name(self) -> str:
        """Provider name (e.g., 'fcm', 'sendgrid')."""
        pass
    
    @property
    @abstractmethod
    def channel(self) -> str:
        """Channel this provider serves (e.g., 'push', 'email')."""
        pass
    
    @abstractmethod
    async def send(
        self,
        recipient: str,
        title: str,
        body: str,
        **kwargs
    ) -> DeliveryResult:
        """
        Send a notification.
        
        Args:
            recipient: Channel-specific identifier (token, email, phone)
            title: Notification title
            body: Notification body
            **kwargs: Additional provider-specific options
        """
        pass
    
    @abstractmethod
    async def check_health(self) -> ProviderHealth:
        """Check provider health."""
        pass

5.2 FCM Provider (Push - Android)

# providers/fcm.py

import aiohttp
import logging
from datetime import datetime
from typing import Optional

from providers.base import NotificationProvider, DeliveryResult, DeliveryStatus, ProviderHealth

logger = logging.getLogger(__name__)


class FCMProvider(NotificationProvider):
    """
    Firebase Cloud Messaging provider for Android push notifications.
    
    Uses FCM HTTP v1 API.
    """
    
    def __init__(
        self,
        project_id: str,
        credentials_path: str,
        timeout_seconds: float = 10.0
    ):
        self.project_id = project_id
        self.credentials_path = credentials_path
        self.timeout = aiohttp.ClientTimeout(total=timeout_seconds)
        self.api_url = f"https://fcm.googleapis.com/v1/projects/{project_id}/messages:send"
        
        self._access_token = None
        self._token_expiry = None
    
    @property
    def name(self) -> str:
        return "fcm"
    
    @property
    def channel(self) -> str:
        return "push"
    
    async def send(
        self,
        recipient: str,  # FCM device token
        title: str,
        body: str,
        image_url: Optional[str] = None,
        action_url: Optional[str] = None,
        data: Optional[dict] = None,
        **kwargs
    ) -> DeliveryResult:
        """Send push notification via FCM."""
        
        # Build FCM message
        message = {
            "message": {
                "token": recipient,
                "notification": {
                    "title": title,
                    "body": body,
                },
                "android": {
                    "priority": "high",
                    "notification": {
                        "click_action": action_url or "OPEN_APP",
                    }
                }
            }
        }
        
        if image_url:
            message["message"]["notification"]["image"] = image_url
        
        if data:
            message["message"]["data"] = {k: str(v) for k, v in data.items()}
        
        # Get access token
        access_token = await self._get_access_token()
        
        # Send request
        try:
            async with aiohttp.ClientSession(timeout=self.timeout) as session:
                async with session.post(
                    self.api_url,
                    json=message,
                    headers={
                        "Authorization": f"Bearer {access_token}",
                        "Content-Type": "application/json"
                    }
                ) as response:
                    
                    if response.status == 200:
                        result = await response.json()
                        return DeliveryResult(
                            status=DeliveryStatus.SUCCESS,
                            provider_message_id=result.get("name")
                        )
                    
                    # Handle errors
                    error_body = await response.json()
                    error_code = error_body.get("error", {}).get("code")
                    error_message = error_body.get("error", {}).get("message")
                    
                    return self._handle_error(response.status, error_code, error_message)
                    
        except aiohttp.ClientError as e:
            logger.error(f"FCM request failed: {e}")
            return DeliveryResult(
                status=DeliveryStatus.FAILED,
                error_code="CONNECTION_ERROR",
                error_message=str(e),
                should_retry=True
            )
    
    def _handle_error(
        self,
        status_code: int,
        error_code: str,
        error_message: str
    ) -> DeliveryResult:
        """Handle FCM error responses."""
        
        # Invalid/expired token - don't retry, mark token as invalid
        if error_code in ["UNREGISTERED", "INVALID_ARGUMENT"]:
            return DeliveryResult(
                status=DeliveryStatus.INVALID_TOKEN,
                error_code=error_code,
                error_message=error_message,
                should_retry=False,
                should_fallback=True  # Try another channel
            )
        
        # Rate limited
        if status_code == 429:
            return DeliveryResult(
                status=DeliveryStatus.RATE_LIMITED,
                error_code="RATE_LIMITED",
                error_message=error_message,
                should_retry=True
            )
        
        # Server error - retry
        if status_code >= 500:
            return DeliveryResult(
                status=DeliveryStatus.FAILED,
                error_code=error_code,
                error_message=error_message,
                should_retry=True
            )
        
        # Other errors
        return DeliveryResult(
            status=DeliveryStatus.FAILED,
            error_code=error_code,
            error_message=error_message,
            should_retry=False
        )
    
    async def _get_access_token(self) -> str:
        """Get OAuth2 access token for FCM API."""
        # In production, use google-auth library
        # This is simplified
        from google.oauth2 import service_account
        from google.auth.transport.requests import Request
        
        if self._access_token and self._token_expiry > datetime.utcnow():
            return self._access_token
        
        credentials = service_account.Credentials.from_service_account_file(
            self.credentials_path,
            scopes=["https://www.googleapis.com/auth/firebase.messaging"]
        )
        credentials.refresh(Request())
        
        self._access_token = credentials.token
        self._token_expiry = credentials.expiry
        
        return self._access_token
    
    async def check_health(self) -> ProviderHealth:
        """Check FCM health by validating credentials."""
        try:
            await self._get_access_token()
            return ProviderHealth(
                is_healthy=True,
                success_rate=0.99,  # Would track actual rate in production
                avg_latency_ms=150,
                last_check=datetime.utcnow().isoformat()
            )
        except Exception as e:
            return ProviderHealth(
                is_healthy=False,
                success_rate=0.0,
                avg_latency_ms=0,
                last_check=datetime.utcnow().isoformat()
            )

5.3 SendGrid Provider (Email)

# providers/sendgrid.py

import aiohttp
import logging
from datetime import datetime
from typing import Optional

from providers.base import NotificationProvider, DeliveryResult, DeliveryStatus, ProviderHealth

logger = logging.getLogger(__name__)


class SendGridProvider(NotificationProvider):
    """
    SendGrid provider for email notifications.
    """
    
    def __init__(
        self,
        api_key: str,
        from_email: str,
        from_name: str = "Notifications",
        timeout_seconds: float = 30.0
    ):
        self.api_key = api_key
        self.from_email = from_email
        self.from_name = from_name
        self.timeout = aiohttp.ClientTimeout(total=timeout_seconds)
        self.api_url = "https://api.sendgrid.com/v3/mail/send"
    
    @property
    def name(self) -> str:
        return "sendgrid"
    
    @property
    def channel(self) -> str:
        return "email"
    
    async def send(
        self,
        recipient: str,  # Email address
        title: str,      # Subject
        body: str,       # Plain text body
        html_body: Optional[str] = None,
        **kwargs
    ) -> DeliveryResult:
        """Send email via SendGrid."""
        
        # Build SendGrid request
        payload = {
            "personalizations": [
                {
                    "to": [{"email": recipient}]
                }
            ],
            "from": {
                "email": self.from_email,
                "name": self.from_name
            },
            "subject": title,
            "content": []
        }
        
        # Add plain text content
        payload["content"].append({
            "type": "text/plain",
            "value": body
        })
        
        # Add HTML content if provided
        if html_body:
            payload["content"].append({
                "type": "text/html",
                "value": html_body
            })
        
        # Add tracking settings
        payload["tracking_settings"] = {
            "click_tracking": {"enable": True},
            "open_tracking": {"enable": True}
        }
        
        # Send request
        try:
            async with aiohttp.ClientSession(timeout=self.timeout) as session:
                async with session.post(
                    self.api_url,
                    json=payload,
                    headers={
                        "Authorization": f"Bearer {self.api_key}",
                        "Content-Type": "application/json"
                    }
                ) as response:
                    
                    # SendGrid returns 202 Accepted on success
                    if response.status == 202:
                        message_id = response.headers.get("X-Message-Id")
                        return DeliveryResult(
                            status=DeliveryStatus.SUCCESS,
                            provider_message_id=message_id
                        )
                    
                    # Handle errors
                    error_body = await response.json()
                    return self._handle_error(response.status, error_body)
                    
        except aiohttp.ClientError as e:
            logger.error(f"SendGrid request failed: {e}")
            return DeliveryResult(
                status=DeliveryStatus.FAILED,
                error_code="CONNECTION_ERROR",
                error_message=str(e),
                should_retry=True
            )
    
    def _handle_error(self, status_code: int, error_body: dict) -> DeliveryResult:
        """Handle SendGrid error responses."""
        
        errors = error_body.get("errors", [])
        error_message = errors[0].get("message") if errors else "Unknown error"
        
        # Invalid email address
        if status_code == 400 and "email" in error_message.lower():
            return DeliveryResult(
                status=DeliveryStatus.BOUNCED,
                error_code="INVALID_EMAIL",
                error_message=error_message,
                should_retry=False,
                should_fallback=True
            )
        
        # Rate limited
        if status_code == 429:
            return DeliveryResult(
                status=DeliveryStatus.RATE_LIMITED,
                error_code="RATE_LIMITED",
                error_message=error_message,
                should_retry=True
            )
        
        # Server error
        if status_code >= 500:
            return DeliveryResult(
                status=DeliveryStatus.FAILED,
                error_code="SERVER_ERROR",
                error_message=error_message,
                should_retry=True
            )
        
        return DeliveryResult(
            status=DeliveryStatus.FAILED,
            error_code="UNKNOWN_ERROR",
            error_message=error_message,
            should_retry=False
        )
    
    async def check_health(self) -> ProviderHealth:
        """Check SendGrid health."""
        # Could call SendGrid stats API
        return ProviderHealth(
            is_healthy=True,
            success_rate=0.98,
            avg_latency_ms=500,
            last_check=datetime.utcnow().isoformat()
        )

5.4 Twilio Provider (SMS)

# providers/twilio.py

import aiohttp
import base64
import logging
from datetime import datetime
from typing import Optional

from providers.base import NotificationProvider, DeliveryResult, DeliveryStatus, ProviderHealth

logger = logging.getLogger(__name__)


class TwilioProvider(NotificationProvider):
    """
    Twilio provider for SMS notifications.
    """
    
    def __init__(
        self,
        account_sid: str,
        auth_token: str,
        from_number: str,
        timeout_seconds: float = 30.0
    ):
        self.account_sid = account_sid
        self.auth_token = auth_token
        self.from_number = from_number
        self.timeout = aiohttp.ClientTimeout(total=timeout_seconds)
        self.api_url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
    
    @property
    def name(self) -> str:
        return "twilio"
    
    @property
    def channel(self) -> str:
        return "sms"
    
    async def send(
        self,
        recipient: str,  # Phone number in E.164 format
        title: str,      # Not used for SMS
        body: str,
        **kwargs
    ) -> DeliveryResult:
        """Send SMS via Twilio."""
        
        # SMS doesn't have titles - combine if title provided
        message = body
        if title:
            message = f"{title}: {body}"
        
        # Truncate to SMS limit (160 chars for GSM, less for Unicode)
        if len(message) > 160:
            message = message[:157] + "..."
        
        # Build request
        data = {
            "To": recipient,
            "From": self.from_number,
            "Body": message
        }
        
        # Create auth header
        credentials = base64.b64encode(
            f"{self.account_sid}:{self.auth_token}".encode()
        ).decode()
        
        try:
            async with aiohttp.ClientSession(timeout=self.timeout) as session:
                async with session.post(
                    self.api_url,
                    data=data,
                    headers={
                        "Authorization": f"Basic {credentials}",
                        "Content-Type": "application/x-www-form-urlencoded"
                    }
                ) as response:
                    
                    result = await response.json()
                    
                    if response.status == 201:
                        return DeliveryResult(
                            status=DeliveryStatus.SUCCESS,
                            provider_message_id=result.get("sid")
                        )
                    
                    return self._handle_error(response.status, result)
                    
        except aiohttp.ClientError as e:
            logger.error(f"Twilio request failed: {e}")
            return DeliveryResult(
                status=DeliveryStatus.FAILED,
                error_code="CONNECTION_ERROR",
                error_message=str(e),
                should_retry=True
            )
    
    def _handle_error(self, status_code: int, result: dict) -> DeliveryResult:
        """Handle Twilio error responses."""
        
        error_code = str(result.get("code", ""))
        error_message = result.get("message", "Unknown error")
        
        # Invalid phone number
        if error_code in ["21211", "21614", "21217"]:
            return DeliveryResult(
                status=DeliveryStatus.INVALID_TOKEN,
                error_code=error_code,
                error_message=error_message,
                should_retry=False,
                should_fallback=True
            )
        
        # Rate limited
        if error_code == "20429" or status_code == 429:
            return DeliveryResult(
                status=DeliveryStatus.RATE_LIMITED,
                error_code="RATE_LIMITED",
                error_message=error_message,
                should_retry=True
            )
        
        # Queue full - retry
        if error_code == "30001":
            return DeliveryResult(
                status=DeliveryStatus.FAILED,
                error_code=error_code,
                error_message=error_message,
                should_retry=True
            )
        
        return DeliveryResult(
            status=DeliveryStatus.FAILED,
            error_code=error_code,
            error_message=error_message,
            should_retry=False
        )
    
    async def check_health(self) -> ProviderHealth:
        """Check Twilio health."""
        return ProviderHealth(
            is_healthy=True,
            success_rate=0.99,
            avg_latency_ms=300,
            last_check=datetime.utcnow().isoformat()
        )

Part V: Delivery Workers

Chapter 6: Channel Workers

Each channel has dedicated workers that consume from channel-specific Kafka topics.

6.1 Base Worker

# workers/base.py

import asyncio
import json
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from aiokafka import AIOKafkaConsumer

from models.notification import Notification, NotificationStatus
from providers.base import DeliveryResult, DeliveryStatus
from repositories.notification import NotificationRepository
from repositories.delivery_attempt import DeliveryAttemptRepository
from services.metrics import MetricsService

logger = logging.getLogger(__name__)


class BaseChannelWorker(ABC):
    """
    Base class for channel delivery workers.
    
    Handles:
    - Kafka consumption
    - Retry logic
    - Status updates
    - Metrics
    
    Week 2 Concepts: Retries, Circuit Breakers
    Week 3 Concepts: Dead Letter Queue
    """
    
    def __init__(
        self,
        kafka_brokers: str,
        topic: str,
        consumer_group: str,
        notification_repo: NotificationRepository,
        attempt_repo: DeliveryAttemptRepository,
        metrics: MetricsService,
        max_retries: int = 3
    ):
        self.brokers = kafka_brokers
        self.topic = topic
        self.group = consumer_group
        self.notifications = notification_repo
        self.attempts = attempt_repo
        self.metrics = metrics
        self.max_retries = max_retries
        
        self.consumer = None
        self.running = False
    
    @property
    @abstractmethod
    def channel(self) -> str:
        """Channel this worker handles."""
        pass
    
    @abstractmethod
    async def deliver(self, notification: Notification) -> DeliveryResult:
        """Deliver notification through the channel."""
        pass
    
    async def start(self):
        """Start the worker."""
        self.consumer = AIOKafkaConsumer(
            self.topic,
            bootstrap_servers=self.brokers,
            group_id=self.group,
            auto_offset_reset="earliest",
            enable_auto_commit=False
        )
        
        await self.consumer.start()
        self.running = True
        
        logger.info(f"{self.channel} worker started, consuming from {self.topic}")
        
        try:
            async for message in self.consumer:
                if not self.running:
                    break
                await self._process_message(message)
        finally:
            await self.consumer.stop()
    
    async def stop(self):
        """Stop the worker gracefully."""
        self.running = False
    
    async def _process_message(self, message):
        """Process a single message."""
        started_at = datetime.utcnow()
        
        try:
            payload = json.loads(message.value.decode())
            notification = Notification.from_dict(payload)
            
            # Update status to sending
            await self.notifications.update_status(
                notification.notification_id,
                NotificationStatus.SENDING
            )
            
            # Attempt delivery
            result = await self.deliver(notification)
            
            # Record attempt
            await self._record_attempt(notification, result, started_at)
            
            # Handle result
            if result.status == DeliveryStatus.SUCCESS:
                await self._handle_success(notification, result)
                
            elif result.should_retry and notification.retry_count < self.max_retries:
                await self._handle_retry(notification, result)
                
            elif result.should_fallback:
                await self._handle_fallback(notification, result)
                
            else:
                await self._handle_failure(notification, result)
            
            # Commit offset
            await self.consumer.commit()
            
            # Record metrics
            duration = (datetime.utcnow() - started_at).total_seconds() * 1000
            self.metrics.record_delivery(
                channel=self.channel,
                status=result.status.value,
                duration_ms=duration
            )
            
        except Exception as e:
            logger.error(f"Error processing message: {e}", exc_info=True)
            # Don't commit - will be reprocessed
    
    async def _record_attempt(
        self,
        notification: Notification,
        result: DeliveryResult,
        started_at: datetime
    ):
        """Record delivery attempt for debugging."""
        await self.attempts.save({
            "notification_id": notification.notification_id,
            "channel": self.channel,
            "attempt_number": notification.retry_count + 1,
            "started_at": started_at,
            "completed_at": datetime.utcnow(),
            "success": result.status == DeliveryStatus.SUCCESS,
            "error_code": result.error_code,
            "error_message": result.error_message,
            "provider_message_id": result.provider_message_id
        })
    
    async def _handle_success(self, notification: Notification, result: DeliveryResult):
        """Handle successful delivery."""
        await self.notifications.update(
            notification.notification_id,
            {
                "status": NotificationStatus.SENT,
                "provider_message_id": result.provider_message_id,
                "sent_at": datetime.utcnow()
            }
        )
        logger.info(f"Delivered {notification.notification_id} via {self.channel}")
    
    async def _handle_retry(self, notification: Notification, result: DeliveryResult):
        """Handle retriable failure."""
        notification.retry_count += 1
        
        # Calculate backoff delay
        delay = self._calculate_backoff(notification.retry_count)
        
        # Re-queue with delay
        await self._requeue_with_delay(notification, delay)
        
        await self.notifications.update(
            notification.notification_id,
            {
                "retry_count": notification.retry_count,
                "error_code": result.error_code,
                "error_message": result.error_message
            }
        )
        
        logger.warning(
            f"Retrying {notification.notification_id} in {delay}s "
            f"(attempt {notification.retry_count}/{self.max_retries})"
        )
    
    async def _handle_fallback(self, notification: Notification, result: DeliveryResult):
        """Handle failure that should trigger fallback channel."""
        # Update status
        await self.notifications.update(
            notification.notification_id,
            {
                "status": NotificationStatus.FAILED,
                "error_code": result.error_code,
                "error_message": f"{self.channel} failed, trying fallback"
            }
        )
        
        # The router should have stored fallback config
        # Re-route to next channel
        # This would be handled by a fallback service
        logger.info(f"Triggering fallback for {notification.notification_id}")
    
    async def _handle_failure(self, notification: Notification, result: DeliveryResult):
        """Handle permanent failure."""
        await self.notifications.update(
            notification.notification_id,
            {
                "status": NotificationStatus.FAILED,
                "error_code": result.error_code,
                "error_message": result.error_message
            }
        )
        
        # Send to dead letter queue
        await self._send_to_dlq(notification, result)
        
        logger.error(
            f"Failed to deliver {notification.notification_id}: "
            f"{result.error_code} - {result.error_message}"
        )
    
    def _calculate_backoff(self, retry_count: int) -> int:
        """Calculate exponential backoff with jitter."""
        import random
        base_delay = 2 ** retry_count  # 2, 4, 8, 16...
        jitter = random.uniform(0, 1)
        return int(base_delay + jitter)
    
    async def _requeue_with_delay(self, notification: Notification, delay: int):
        """Re-queue notification with delay."""
        # In production, use a delay queue or scheduled job
        # For simplicity, we just sleep (not ideal)
        await asyncio.sleep(delay)
        # Re-publish to same topic
        pass
    
    async def _send_to_dlq(self, notification: Notification, result: DeliveryResult):
        """Send to dead letter queue."""
        dlq_topic = f"dlq.{self.channel}"
        # Publish to DLQ
        pass

6.2 Push Worker

# workers/push_worker.py

from workers.base import BaseChannelWorker
from providers.base import DeliveryResult, DeliveryStatus
from providers.fcm import FCMProvider
from providers.apns import APNsProvider
from repositories.device_token import DeviceTokenRepository
from models.notification import Notification
from services.template import TemplateService


class PushWorker(BaseChannelWorker):
    """
    Worker for push notifications.
    
    Handles both Android (FCM) and iOS (APNs).
    """
    
    def __init__(
        self,
        fcm_provider: FCMProvider,
        apns_provider: APNsProvider,
        device_tokens: DeviceTokenRepository,
        template_service: TemplateService,
        **kwargs
    ):
        super().__init__(topic="channel.push", **kwargs)
        self.fcm = fcm_provider
        self.apns = apns_provider
        self.device_tokens = device_tokens
        self.templates = template_service
    
    @property
    def channel(self) -> str:
        return "push"
    
    async def deliver(self, notification: Notification) -> DeliveryResult:
        """Deliver push notification to all user's devices."""
        
        # Get all active device tokens for user
        tokens = await self.device_tokens.get_active_tokens(notification.user_id)
        
        if not tokens:
            return DeliveryResult(
                status=DeliveryStatus.FAILED,
                error_code="NO_DEVICES",
                error_message="User has no registered devices",
                should_fallback=True
            )
        
        # Render template if not already rendered
        if not notification.title or not notification.body:
            rendered = await self.templates.render(
                notification.template,
                "push",
                notification.variables
            )
            notification.title = rendered.get("title", "")
            notification.body = rendered.get("body", "")
        
        # Send to each device
        results = []
        for token_info in tokens:
            provider = self._get_provider(token_info["platform"])
            
            result = await provider.send(
                recipient=token_info["token"],
                title=notification.title,
                body=notification.body,
                data={"notification_id": notification.notification_id}
            )
            
            results.append((token_info, result))
            
            # Handle invalid tokens
            if result.status == DeliveryStatus.INVALID_TOKEN:
                await self.device_tokens.mark_invalid(
                    token_info["token_id"],
                    reason=result.error_message
                )
        
        # Determine overall result
        # Success if at least one device succeeded
        successes = [r for _, r in results if r.status == DeliveryStatus.SUCCESS]
        
        if successes:
            return DeliveryResult(
                status=DeliveryStatus.SUCCESS,
                provider_message_id=successes[0].provider_message_id
            )
        
        # All failed
        last_result = results[-1][1] if results else None
        return last_result or DeliveryResult(
            status=DeliveryStatus.FAILED,
            error_code="ALL_DEVICES_FAILED",
            should_fallback=True
        )
    
    def _get_provider(self, platform: str):
        """Get provider for platform."""
        if platform == "ios":
            return self.apns
        return self.fcm

6.3 Email Worker

# workers/email_worker.py

from workers.base import BaseChannelWorker
from providers.base import DeliveryResult
from providers.sendgrid import SendGridProvider
from models.notification import Notification
from services.template import TemplateService
from repositories.user_email import UserEmailRepository


class EmailWorker(BaseChannelWorker):
    """Worker for email notifications."""
    
    def __init__(
        self,
        sendgrid: SendGridProvider,
        template_service: TemplateService,
        user_emails: UserEmailRepository,
        **kwargs
    ):
        super().__init__(topic="channel.email", **kwargs)
        self.provider = sendgrid
        self.templates = template_service
        self.emails = user_emails
    
    @property
    def channel(self) -> str:
        return "email"
    
    async def deliver(self, notification: Notification) -> DeliveryResult:
        """Deliver email notification."""
        
        # Get user's email
        email_info = await self.emails.get_primary(notification.user_id)
        
        if not email_info or email_info["status"] != "active":
            return DeliveryResult(
                status=DeliveryStatus.FAILED,
                error_code="NO_EMAIL",
                error_message="User has no valid email address",
                should_fallback=True
            )
        
        # Render template
        rendered = await self.templates.render(
            notification.template,
            "email",
            notification.variables
        )
        
        # Send email
        result = await self.provider.send(
            recipient=email_info["email"],
            title=rendered.get("subject", notification.title),
            body=rendered.get("body", notification.body),
            html_body=rendered.get("html_body")
        )
        
        # Track send
        if result.status == DeliveryStatus.SUCCESS:
            await self.emails.update_last_sent(email_info["email_id"])
        
        return result

Part VI: Delivery Status Tracking

Chapter 7: Status Management

7.1 Status State Machine

NOTIFICATION STATUS STATE MACHINE

                    ┌─────────┐
                    │ PENDING │
                    └────┬────┘
                         │ Written to outbox
                         ▼
                    ┌─────────┐
                    │ QUEUED  │
                    └────┬────┘
                         │ Consumed by router
                         ▼
                    ┌─────────┐
                    │ ROUTING │
                    └────┬────┘
                         │ Published to channel topic
                         ▼
                    ┌─────────┐
                    │ SENDING │
                    └────┬────┘
                         │
           ┌─────────────┼─────────────┐
           │             │             │
           ▼             ▼             ▼
      ┌─────────┐   ┌─────────┐   ┌─────────┐
      │  SENT   │   │ FAILED  │   │ BOUNCED │
      └────┬────┘   └─────────┘   └─────────┘
           │
           │ Provider callback
           ▼
      ┌───────────┐
      │ DELIVERED │
      └─────┬─────┘
            │ User interaction
            ▼
       ┌─────────┐
       │  READ   │
       └─────────┘


VALID TRANSITIONS:

PENDING   → QUEUED
QUEUED    → ROUTING
ROUTING   → SENDING
SENDING   → SENT, FAILED, BOUNCED
SENT      → DELIVERED, FAILED
DELIVERED → READ
FAILED    → QUEUED (retry)

7.2 Status Update Service

# services/status.py

from datetime import datetime
from typing import Optional
import logging

from models.notification import NotificationStatus
from repositories.notification import NotificationRepository
from services.events import EventPublisher

logger = logging.getLogger(__name__)


class StatusService:
    """
    Manages notification status updates.
    
    Ensures valid state transitions and publishes events.
    """
    
    VALID_TRANSITIONS = {
        NotificationStatus.PENDING: {NotificationStatus.QUEUED},
        NotificationStatus.QUEUED: {NotificationStatus.ROUTING},
        NotificationStatus.ROUTING: {NotificationStatus.SENDING},
        NotificationStatus.SENDING: {
            NotificationStatus.SENT,
            NotificationStatus.FAILED,
            NotificationStatus.BOUNCED
        },
        NotificationStatus.SENT: {
            NotificationStatus.DELIVERED,
            NotificationStatus.FAILED
        },
        NotificationStatus.DELIVERED: {NotificationStatus.READ},
        NotificationStatus.FAILED: {NotificationStatus.QUEUED},  # Retry
    }
    
    def __init__(
        self,
        notification_repo: NotificationRepository,
        event_publisher: EventPublisher
    ):
        self.notifications = notification_repo
        self.events = event_publisher
    
    async def update_status(
        self,
        notification_id: str,
        new_status: NotificationStatus,
        error_code: Optional[str] = None,
        error_message: Optional[str] = None,
        provider_message_id: Optional[str] = None
    ) -> bool:
        """
        Update notification status.
        
        Returns True if update was successful.
        """
        
        # Get current notification
        notification = await self.notifications.get(notification_id)
        if not notification:
            logger.warning(f"Notification not found: {notification_id}")
            return False
        
        current_status = notification.status
        
        # Validate transition
        if not self._is_valid_transition(current_status, new_status):
            logger.warning(
                f"Invalid status transition for {notification_id}: "
                f"{current_status} → {new_status}"
            )
            return False
        
        # Build update
        update = {"status": new_status}
        
        if new_status == NotificationStatus.SENT:
            update["sent_at"] = datetime.utcnow()
        elif new_status == NotificationStatus.DELIVERED:
            update["delivered_at"] = datetime.utcnow()
        elif new_status == NotificationStatus.READ:
            update["read_at"] = datetime.utcnow()
        
        if error_code:
            update["error_code"] = error_code
        if error_message:
            update["error_message"] = error_message
        if provider_message_id:
            update["provider_message_id"] = provider_message_id
        
        # Apply update
        await self.notifications.update(notification_id, update)
        
        # Publish status change event
        await self.events.publish("notification.status_changed", {
            "notification_id": notification_id,
            "user_id": notification.user_id,
            "old_status": current_status.value,
            "new_status": new_status.value,
            "timestamp": datetime.utcnow().isoformat()
        })
        
        return True
    
    def _is_valid_transition(
        self,
        from_status: NotificationStatus,
        to_status: NotificationStatus
    ) -> bool:
        """Check if status transition is valid."""
        valid_next = self.VALID_TRANSITIONS.get(from_status, set())
        return to_status in valid_next

7.3 Webhook Handler (Provider Callbacks)

# api/webhooks.py

from fastapi import APIRouter, Request, HTTPException
import hashlib
import hmac
import logging

from services.status import StatusService
from models.notification import NotificationStatus

logger = logging.getLogger(__name__)
router = APIRouter()


class WebhookHandler:
    """
    Handles callbacks from notification providers.
    
    Providers call our webhooks to report delivery status.
    """
    
    def __init__(
        self,
        status_service: StatusService,
        sendgrid_webhook_key: str,
        twilio_auth_token: str
    ):
        self.status = status_service
        self.sendgrid_key = sendgrid_webhook_key
        self.twilio_token = twilio_auth_token
    
    async def handle_sendgrid(self, events: list[dict]):
        """Handle SendGrid webhook events."""
        
        for event in events:
            notification_id = event.get("notification_id")  # Custom header
            event_type = event.get("event")
            
            if not notification_id:
                continue
            
            # Map SendGrid events to our statuses
            status_map = {
                "delivered": NotificationStatus.DELIVERED,
                "bounce": NotificationStatus.BOUNCED,
                "dropped": NotificationStatus.FAILED,
                "open": None,  # Track but don't change status
                "click": None,
            }
            
            new_status = status_map.get(event_type)
            
            if new_status:
                await self.status.update_status(
                    notification_id,
                    new_status,
                    error_code=event.get("reason"),
                    error_message=event.get("response")
                )
    
    async def handle_twilio(self, data: dict):
        """Handle Twilio status callback."""
        
        message_sid = data.get("MessageSid")
        status = data.get("MessageStatus")
        
        # Look up notification by provider message ID
        # (In production, maintain a mapping)
        
        status_map = {
            "delivered": NotificationStatus.DELIVERED,
            "failed": NotificationStatus.FAILED,
            "undelivered": NotificationStatus.FAILED,
        }
        
        new_status = status_map.get(status)
        
        if new_status:
            # Update status
            pass
    
    def verify_sendgrid_signature(self, request: Request, payload: bytes) -> bool:
        """Verify SendGrid webhook signature."""
        signature = request.headers.get("X-Twilio-Email-Event-Webhook-Signature")
        timestamp = request.headers.get("X-Twilio-Email-Event-Webhook-Timestamp")
        
        if not signature or not timestamp:
            return False
        
        # Verify HMAC
        expected = hmac.new(
            self.sendgrid_key.encode(),
            timestamp.encode() + payload,
            hashlib.sha256
        ).digest()
        
        return hmac.compare_digest(signature.encode(), expected)


@router.post("/webhooks/sendgrid")
async def sendgrid_webhook(request: Request, handler: WebhookHandler):
    body = await request.body()
    
    if not handler.verify_sendgrid_signature(request, body):
        raise HTTPException(status_code=401, detail="Invalid signature")
    
    events = await request.json()
    await handler.handle_sendgrid(events)
    
    return {"status": "ok"}


@router.post("/webhooks/twilio")
async def twilio_webhook(request: Request, handler: WebhookHandler):
    form = await request.form()
    await handler.handle_twilio(dict(form))
    return {"status": "ok"}

Summary

What We Built Today

DAY 2 SUMMARY: CORE NOTIFICATION FLOW

INGESTION
├── Send API with validation
├── Idempotency handling
├── Transactional outbox pattern
└── Priority-based topic routing

ROUTING
├── Preference-based channel selection
├── Quiet hours handling
├── Channel availability checks
└── Kafka topic publishing

TEMPLATE RENDERING
├── Variable substitution
├── Locale fallback
├── Template caching
└── Multi-channel support

PROVIDER INTEGRATION
├── FCM (Android push)
├── APNs (iOS push) - interface shown
├── SendGrid (email)
├── Twilio (SMS)
└── Consistent provider interface

DELIVERY WORKERS
├── Base worker with retry logic
├── Push worker (multi-device)
├── Email worker
├── Exponential backoff
└── Dead letter queue

STATUS TRACKING
├── State machine validation
├── Status update service
├── Provider webhook handlers
└── Event publishing

Week 1-5 Concepts Applied

Concept Where Applied
Transactional Outbox (Week 3) Reliable event publishing after save
Idempotency (Week 2) Deduplication via idempotency keys
Retry with Backoff (Week 2) Exponential backoff in workers
Dead Letter Queue (Week 3) Failed notifications after max retries
Caching (Week 4) Template and preference caching

What's Coming Tomorrow

Day 3: Advanced Features"Features that separate good from great"

  • Template system with A/B testing
  • Batching and digest notifications
  • Scheduling and delayed delivery
  • Multi-channel orchestration (saga pattern)
  • Real-time in-app notifications

Interview Tip of the Day

INTERVIEW TIP: TRACE THE HAPPY PATH FIRST

When explaining a system, trace ONE complete request:

"Let me walk you through a payment notification end-to-end:

1. Payment service calls POST /v1/notifications
2. We validate, generate ID, write to DB + outbox atomically
3. Outbox processor publishes to Kafka high-priority topic
4. Router consumes, looks up user preferences
5. User wants push + email, so we publish to both channel topics
6. Push worker sends to FCM, gets success
7. Email worker sends to SendGrid, gets queued
8. SendGrid calls our webhook when delivered
9. User can query status via GET /v1/notifications/{id}"

This shows:
- You understand the full flow
- You can explain complex systems simply
- You think about reliability (outbox, retries)
- You know how webhooks work

THEN dive into edge cases and failure modes.

End of Week 6, Day 2

Tomorrow: Day 3 — Advanced Features