Week 6 — Day 2: Core Notification Flow
System Design Mastery Series — Practical Application Week
Introduction
Yesterday we understood the problem and created the high-level design. Today we build the core notification flow — the path a notification takes from creation to delivery.
Today's Theme: "The happy path must be bulletproof"
By the end of today, you'll have production-ready code for:
- Notification ingestion with validation
- Preference-based routing
- Provider integration (FCM, APNs, SendGrid, Twilio)
- Delivery tracking with status management
Part I: Notification Ingestion
Chapter 1: The Send API
When an internal service wants to send a notification, it calls our Send API. This is the entry point to our system.
1.1 API Design Principles
INGESTION DESIGN PRINCIPLES
1. FAST RESPONSE
- Return immediately after validation
- Don't wait for delivery
- Async processing via queue
2. IDEMPOTENT
- Same request = same result
- Use idempotency keys
- Safe to retry
3. VALIDATED
- Reject bad requests early
- Clear error messages
- Don't let garbage into the queue
4. TRACEABLE
- Assign notification ID immediately
- Return ID to caller
- Enable status tracking
1.2 Request/Response Models
# models/notification.py
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import uuid
class NotificationType(Enum):
TRANSACTION = "transaction"
SECURITY = "security"
MARKETING = "marketing"
REMINDER = "reminder"
SOCIAL = "social"
class NotificationPriority(Enum):
CRITICAL = "critical" # Security alerts, fraud
HIGH = "high" # Transactions, payments
MEDIUM = "medium" # Social, reminders
LOW = "low" # Marketing, newsletters
class NotificationStatus(Enum):
PENDING = "pending"
QUEUED = "queued"
ROUTING = "routing"
SENDING = "sending"
SENT = "sent"
DELIVERED = "delivered"
FAILED = "failed"
BOUNCED = "bounced"
class Channel(Enum):
PUSH = "push"
EMAIL = "email"
SMS = "sms"
IN_APP = "in_app"
WHATSAPP = "whatsapp"
@dataclass
class SendNotificationRequest:
"""Request to send a notification."""
user_id: str
type: NotificationType
template: str
variables: dict = field(default_factory=dict)
# Optional overrides
channels: Optional[list[Channel]] = None
priority: Optional[NotificationPriority] = None
scheduled_at: Optional[datetime] = None
# Metadata for tracking
metadata: dict = field(default_factory=dict)
idempotency_key: Optional[str] = None
@dataclass
class Notification:
"""Internal notification model."""
notification_id: str
user_id: str
type: NotificationType
priority: NotificationPriority
template: str
variables: dict
status: NotificationStatus = NotificationStatus.PENDING
channel: Optional[Channel] = None
# Rendered content
title: Optional[str] = None
body: Optional[str] = None
# Tracking
idempotency_key: Optional[str] = None
scheduled_at: Optional[datetime] = None
created_at: datetime = field(default_factory=datetime.utcnow)
# Delivery info
provider_message_id: Optional[str] = None
sent_at: Optional[datetime] = None
delivered_at: Optional[datetime] = None
# Error tracking
error_code: Optional[str] = None
error_message: Optional[str] = None
retry_count: int = 0
metadata: dict = field(default_factory=dict)
@dataclass
class SendNotificationResponse:
"""Response after accepting a notification."""
notification_id: str
status: NotificationStatus
created_at: datetime
1.3 The Send API Implementation
# api/send.py
from fastapi import APIRouter, HTTPException, Header, Depends
from typing import Optional
import uuid
from models.notification import (
SendNotificationRequest, SendNotificationResponse,
Notification, NotificationStatus, NotificationPriority
)
from services.validation import ValidationService
from services.outbox import OutboxService
from repositories.notification import NotificationRepository
from repositories.idempotency import IdempotencyRepository
router = APIRouter()
class NotificationSendAPI:
"""
Notification Send API.
Handles incoming notification requests, validates them,
and queues them for processing.
Week 2 Concept: Idempotency
Week 3 Concept: Transactional Outbox
"""
def __init__(
self,
validator: ValidationService,
notification_repo: NotificationRepository,
idempotency_repo: IdempotencyRepository,
outbox: OutboxService
):
self.validator = validator
self.notifications = notification_repo
self.idempotency = idempotency_repo
self.outbox = outbox
async def send(
self,
request: SendNotificationRequest,
request_id: Optional[str] = None
) -> SendNotificationResponse:
"""
Accept a notification request.
Steps:
1. Check idempotency (have we seen this before?)
2. Validate the request
3. Create notification record
4. Write to outbox (same transaction)
5. Return notification ID
"""
# Step 1: Idempotency check
idempotency_key = request.idempotency_key or request_id
if idempotency_key:
existing = await self.idempotency.get(idempotency_key)
if existing:
# Return the existing notification (don't create duplicate)
return SendNotificationResponse(
notification_id=existing.notification_id,
status=existing.status,
created_at=existing.created_at
)
# Step 2: Validate request
validation_result = await self.validator.validate(request)
if not validation_result.is_valid:
raise HTTPException(
status_code=400,
detail={
"error": "VALIDATION_ERROR",
"message": validation_result.error_message,
"field": validation_result.error_field
}
)
# Step 3: Create notification
notification = Notification(
notification_id=str(uuid.uuid4()),
user_id=request.user_id,
type=request.type,
priority=self._determine_priority(request),
template=request.template,
variables=request.variables,
idempotency_key=idempotency_key,
scheduled_at=request.scheduled_at,
metadata=request.metadata
)
# Step 4: Save notification AND write to outbox atomically
# This is the transactional outbox pattern from Week 3
async with self.notifications.transaction() as txn:
await self.notifications.save(notification, txn)
# Determine which Kafka topic based on priority
topic = self._get_topic_for_priority(notification.priority)
await self.outbox.write(
topic=topic,
key=notification.user_id, # Partition by user for ordering
payload=notification.to_dict(),
transaction=txn
)
# Save idempotency mapping
if idempotency_key:
await self.idempotency.save(
idempotency_key,
notification,
transaction=txn
)
# Step 5: Return response
return SendNotificationResponse(
notification_id=notification.notification_id,
status=NotificationStatus.QUEUED,
created_at=notification.created_at
)
def _determine_priority(self, request: SendNotificationRequest) -> NotificationPriority:
"""Determine notification priority based on type or explicit override."""
if request.priority:
return request.priority
# Default priorities by type
PRIORITY_MAP = {
NotificationType.SECURITY: NotificationPriority.CRITICAL,
NotificationType.TRANSACTION: NotificationPriority.HIGH,
NotificationType.SOCIAL: NotificationPriority.MEDIUM,
NotificationType.REMINDER: NotificationPriority.MEDIUM,
NotificationType.MARKETING: NotificationPriority.LOW,
}
return PRIORITY_MAP.get(request.type, NotificationPriority.MEDIUM)
def _get_topic_for_priority(self, priority: NotificationPriority) -> str:
"""Map priority to Kafka topic."""
TOPIC_MAP = {
NotificationPriority.CRITICAL: "notifications.critical",
NotificationPriority.HIGH: "notifications.high",
NotificationPriority.MEDIUM: "notifications.medium",
NotificationPriority.LOW: "notifications.low",
}
return TOPIC_MAP[priority]
# FastAPI route
@router.post("/v1/notifications", response_model=SendNotificationResponse)
async def send_notification(
request: SendNotificationRequest,
x_request_id: Optional[str] = Header(None),
api: NotificationSendAPI = Depends()
):
return await api.send(request, request_id=x_request_id)
1.4 Validation Service
# services/validation.py
from dataclasses import dataclass
from typing import Optional
from models.notification import SendNotificationRequest
from repositories.user import UserRepository
from repositories.template import TemplateRepository
@dataclass
class ValidationResult:
is_valid: bool
error_message: Optional[str] = None
error_field: Optional[str] = None
class ValidationService:
"""
Validates notification requests before processing.
Catches errors early to avoid wasting resources on invalid requests.
"""
def __init__(
self,
user_repo: UserRepository,
template_repo: TemplateRepository
):
self.users = user_repo
self.templates = template_repo
async def validate(self, request: SendNotificationRequest) -> ValidationResult:
"""Validate a notification request."""
# Check user exists
user = await self.users.get(request.user_id)
if not user:
return ValidationResult(
is_valid=False,
error_message=f"User not found: {request.user_id}",
error_field="user_id"
)
# Check user hasn't been deleted/banned
if user.status != "active":
return ValidationResult(
is_valid=False,
error_message=f"User is not active: {user.status}",
error_field="user_id"
)
# Check template exists
template = await self.templates.get(request.template)
if not template:
return ValidationResult(
is_valid=False,
error_message=f"Template not found: {request.template}",
error_field="template"
)
# Validate required variables are provided
missing_vars = self._check_required_variables(
template.variables_schema,
request.variables
)
if missing_vars:
return ValidationResult(
is_valid=False,
error_message=f"Missing required variables: {missing_vars}",
error_field="variables"
)
# Validate scheduled_at is in future
if request.scheduled_at:
from datetime import datetime, timedelta
min_schedule = datetime.utcnow() + timedelta(minutes=1)
if request.scheduled_at < min_schedule:
return ValidationResult(
is_valid=False,
error_message="scheduled_at must be at least 1 minute in the future",
error_field="scheduled_at"
)
return ValidationResult(is_valid=True)
def _check_required_variables(
self,
schema: dict,
provided: dict
) -> list[str]:
"""Check which required variables are missing."""
if not schema:
return []
required = schema.get("required", [])
return [var for var in required if var not in provided]
Chapter 2: Transactional Outbox
The transactional outbox pattern ensures we never lose notifications, even if Kafka is temporarily unavailable.
2.1 Why Outbox?
THE PROBLEM WITHOUT OUTBOX
Naive approach:
1. Save notification to database
2. Publish to Kafka
What can go wrong:
Scenario A: Database succeeds, Kafka fails
→ Notification saved but never processed
→ USER NEVER GETS NOTIFIED
Scenario B: Kafka succeeds, database fails
→ Notification processed but not saved
→ Can't track status, might retry and DUPLICATE
THE OUTBOX SOLUTION
Instead:
1. Save notification to database ─┐
2. Write to outbox table ─┴─ SAME TRANSACTION
3. Background process reads outbox and publishes to Kafka
If step 3 fails:
→ Outbox entry still exists
→ Retry until successful
→ NEVER LOSE A NOTIFICATION
2.2 Outbox Implementation
# services/outbox.py
import json
import asyncio
from datetime import datetime
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class OutboxService:
"""
Transactional outbox for reliable event publishing.
Week 3 Concept: Transactional Outbox Pattern
"""
def __init__(self, db_pool, kafka_producer):
self.db = db_pool
self.kafka = kafka_producer
async def write(
self,
topic: str,
key: str,
payload: dict,
transaction=None
):
"""
Write an event to the outbox table.
Must be called within the same transaction as the business logic.
"""
conn = transaction or await self.db.acquire()
await conn.execute("""
INSERT INTO notification_outbox (topic, partition_key, payload)
VALUES ($1, $2, $3)
""", topic, key, json.dumps(payload))
class OutboxProcessor:
"""
Background processor that publishes outbox events to Kafka.
Runs continuously, polling for unpublished events.
"""
def __init__(
self,
db_pool,
kafka_producer,
batch_size: int = 100,
poll_interval_ms: int = 100
):
self.db = db_pool
self.kafka = kafka_producer
self.batch_size = batch_size
self.poll_interval = poll_interval_ms / 1000
self.running = False
async def start(self):
"""Start the outbox processor."""
self.running = True
logger.info("Outbox processor started")
while self.running:
try:
processed = await self._process_batch()
if processed == 0:
# No events to process, wait before polling again
await asyncio.sleep(self.poll_interval)
except Exception as e:
logger.error(f"Outbox processor error: {e}")
await asyncio.sleep(1) # Back off on error
async def stop(self):
"""Stop the outbox processor."""
self.running = False
logger.info("Outbox processor stopped")
async def _process_batch(self) -> int:
"""Process a batch of outbox events."""
async with self.db.acquire() as conn:
# Fetch unpublished events
# Use FOR UPDATE SKIP LOCKED for concurrent processors
events = await conn.fetch("""
SELECT outbox_id, topic, partition_key, payload
FROM notification_outbox
WHERE published = false
ORDER BY outbox_id
LIMIT $1
FOR UPDATE SKIP LOCKED
""", self.batch_size)
if not events:
return 0
# Publish each event to Kafka
published_ids = []
for event in events:
try:
await self.kafka.send(
topic=event['topic'],
key=event['partition_key'].encode(),
value=event['payload'].encode()
)
published_ids.append(event['outbox_id'])
except Exception as e:
logger.error(f"Failed to publish event {event['outbox_id']}: {e}")
# Continue with other events
# Mark published events
if published_ids:
await conn.execute("""
UPDATE notification_outbox
SET published = true, published_at = NOW()
WHERE outbox_id = ANY($1)
""", published_ids)
return len(published_ids)
Part II: Routing
Chapter 3: The Router Service
The router consumes notifications from Kafka and determines which channels to use based on user preferences.
3.1 User Preferences
# models/preferences.py
from dataclasses import dataclass, field
from typing import Optional
from datetime import time
@dataclass
class ChannelPreference:
enabled: bool = True
priority: int = 1 # Lower = higher priority
@dataclass
class CategoryPreference:
enabled: bool = True
channels: list[str] = field(default_factory=list)
@dataclass
class QuietHours:
enabled: bool = False
start: Optional[time] = None # e.g., 22:00
end: Optional[time] = None # e.g., 08:00
@dataclass
class UserPreferences:
user_id: str
global_enabled: bool = True
channels: dict[str, ChannelPreference] = field(default_factory=dict)
categories: dict[str, CategoryPreference] = field(default_factory=dict)
quiet_hours: QuietHours = field(default_factory=QuietHours)
timezone: str = "UTC"
# For optimistic locking
version: int = 1
def is_channel_enabled(self, channel: str) -> bool:
"""Check if a channel is enabled."""
if not self.global_enabled:
return False
pref = self.channels.get(channel)
return pref.enabled if pref else True
def get_channels_for_category(self, category: str) -> list[str]:
"""Get enabled channels for a category."""
cat_pref = self.categories.get(category)
if not cat_pref or not cat_pref.enabled:
return []
# Filter by channel-level preferences
return [
ch for ch in cat_pref.channels
if self.is_channel_enabled(ch)
]
3.2 Router Implementation
# services/router.py
from datetime import datetime, timezone
from typing import Optional
import pytz
import logging
from models.notification import Notification, Channel, NotificationStatus
from models.preferences import UserPreferences
from repositories.preferences import PreferencesRepository
from repositories.device_token import DeviceTokenRepository
from services.kafka_producer import KafkaProducer
logger = logging.getLogger(__name__)
class NotificationRouter:
"""
Routes notifications to appropriate channels based on user preferences.
This is the brain of the notification system.
"""
def __init__(
self,
preferences_repo: PreferencesRepository,
device_tokens_repo: DeviceTokenRepository,
kafka_producer: KafkaProducer
):
self.preferences = preferences_repo
self.device_tokens = device_tokens_repo
self.kafka = kafka_producer
async def route(self, notification: Notification) -> list[Channel]:
"""
Determine which channels to use for this notification.
Returns list of channels that will receive the notification.
"""
# Step 1: Get user preferences (from cache or DB)
prefs = await self.preferences.get(notification.user_id)
if not prefs:
# No preferences = use defaults
prefs = UserPreferences(user_id=notification.user_id)
# Step 2: Check if notifications are globally disabled
if not prefs.global_enabled:
logger.info(f"User {notification.user_id} has notifications disabled")
return []
# Step 3: Get channels for this notification type
category = notification.type.value
channels = prefs.get_channels_for_category(category)
if not channels:
# Use default channels for this notification type
channels = self._get_default_channels(notification)
# Step 4: Filter by quiet hours
if self._is_quiet_hours(prefs, notification):
channels = self._filter_for_quiet_hours(channels, notification)
# Step 5: Check channel availability (e.g., has device token for push)
available_channels = await self._filter_by_availability(
channels, notification.user_id
)
# Step 6: Publish to channel-specific topics
for channel in available_channels:
await self._publish_to_channel(notification, channel)
return available_channels
def _get_default_channels(self, notification: Notification) -> list[str]:
"""Get default channels based on notification type."""
DEFAULTS = {
"security": ["push", "sms", "email"], # Multi-channel for security
"transaction": ["push", "in_app"],
"marketing": ["email"],
"reminder": ["push", "email"],
"social": ["push", "in_app"],
}
return DEFAULTS.get(notification.type.value, ["push"])
def _is_quiet_hours(self, prefs: UserPreferences, notification: Notification) -> bool:
"""Check if current time is in user's quiet hours."""
if not prefs.quiet_hours.enabled:
return False
# Get current time in user's timezone
user_tz = pytz.timezone(prefs.timezone)
user_now = datetime.now(user_tz).time()
start = prefs.quiet_hours.start
end = prefs.quiet_hours.end
if start <= end:
# Simple case: e.g., 09:00 - 17:00
return start <= user_now <= end
else:
# Overnight: e.g., 22:00 - 08:00
return user_now >= start or user_now <= end
def _filter_for_quiet_hours(
self,
channels: list[str],
notification: Notification
) -> list[str]:
"""Filter channels during quiet hours."""
# Critical notifications bypass quiet hours
if notification.priority.value == "critical":
return channels
# During quiet hours, only use non-intrusive channels
NON_INTRUSIVE = {"email", "in_app"}
return [ch for ch in channels if ch in NON_INTRUSIVE]
async def _filter_by_availability(
self,
channels: list[str],
user_id: str
) -> list[str]:
"""Filter channels based on what's actually available for this user."""
available = []
for channel in channels:
if channel == "push":
# Check if user has any valid device tokens
tokens = await self.device_tokens.get_active_tokens(user_id)
if tokens:
available.append(channel)
elif channel == "email":
# Email is always "available" if user has email
# (bounce handling happens at delivery time)
available.append(channel)
elif channel == "sms":
# SMS is always "available" if user has phone
available.append(channel)
elif channel == "in_app":
# In-app is always available
available.append(channel)
return available
async def _publish_to_channel(self, notification: Notification, channel: str):
"""Publish notification to channel-specific Kafka topic."""
topic = f"channel.{channel}"
# Add channel to notification
channel_notification = {
**notification.to_dict(),
"channel": channel,
"status": NotificationStatus.ROUTING.value
}
await self.kafka.send(
topic=topic,
key=notification.user_id.encode(),
value=channel_notification
)
logger.info(
f"Routed notification {notification.notification_id} "
f"to channel {channel}"
)
3.3 Router Consumer (Kafka)
# workers/router_worker.py
import asyncio
import json
import logging
from aiokafka import AIOKafkaConsumer
from models.notification import Notification
from services.router import NotificationRouter
logger = logging.getLogger(__name__)
class RouterWorker:
"""
Kafka consumer that routes notifications.
Consumes from priority topics and routes to channel topics.
"""
def __init__(
self,
kafka_brokers: str,
router: NotificationRouter,
consumer_group: str = "notification-router"
):
self.brokers = kafka_brokers
self.router = router
self.group = consumer_group
self.consumer = None
self.running = False
async def start(self):
"""Start consuming notifications."""
self.consumer = AIOKafkaConsumer(
"notifications.critical",
"notifications.high",
"notifications.medium",
"notifications.low",
bootstrap_servers=self.brokers,
group_id=self.group,
auto_offset_reset="earliest",
enable_auto_commit=False # Manual commit for reliability
)
await self.consumer.start()
self.running = True
logger.info("Router worker started")
try:
async for message in self.consumer:
await self._process_message(message)
finally:
await self.consumer.stop()
async def stop(self):
"""Stop the worker."""
self.running = False
async def _process_message(self, message):
"""Process a single notification message."""
try:
payload = json.loads(message.value.decode())
notification = Notification.from_dict(payload)
# Check if scheduled for later
if notification.scheduled_at and notification.scheduled_at > datetime.utcnow():
# Re-queue with delay (or use a scheduling service)
await self._reschedule(notification)
await self.consumer.commit()
return
# Route the notification
channels = await self.router.route(notification)
if not channels:
logger.warning(
f"No channels available for notification {notification.notification_id}"
)
# Commit offset after successful processing
await self.consumer.commit()
except Exception as e:
logger.error(f"Error processing message: {e}", exc_info=True)
# Don't commit - message will be reprocessed
# In production, implement dead letter queue after N retries
Part III: Template Rendering
Chapter 4: The Template Engine
Before delivering a notification, we need to render the template with user-specific variables.
4.1 Template Model
# models/template.py
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class Template:
template_id: str
name: str # e.g., "payment_received"
channel: str # push, email, sms, etc.
locale: str # en-US, es-ES, etc.
# Content
title: Optional[str] = None # For push/in-app
subject: Optional[str] = None # For email
body: str = ""
html_body: Optional[str] = None # For email HTML
# Rich content
image_url: Optional[str] = None
action_url: Optional[str] = None
action_buttons: list[dict] = field(default_factory=list)
# Variable schema for validation
variables_schema: dict = field(default_factory=dict)
version: int = 1
is_active: bool = True
4.2 Template Service
# services/template.py
import re
from typing import Optional
import logging
from models.template import Template
from repositories.template import TemplateRepository
from services.cache import CacheService
logger = logging.getLogger(__name__)
class TemplateService:
"""
Template rendering service.
Handles:
- Template lookup with caching
- Variable substitution
- Localization
- Fallback to default locale
"""
def __init__(
self,
template_repo: TemplateRepository,
cache: CacheService
):
self.templates = template_repo
self.cache = cache
# Simple variable pattern: {{variable_name}}
self.var_pattern = re.compile(r'\{\{(\w+)\}\}')
async def render(
self,
template_name: str,
channel: str,
variables: dict,
locale: str = "en-US"
) -> dict:
"""
Render a template with variables.
Returns dict with rendered title, body, etc.
"""
# Get template (with locale fallback)
template = await self._get_template(template_name, channel, locale)
if not template:
raise TemplateNotFoundError(
f"Template not found: {template_name}/{channel}/{locale}"
)
# Render each field
result = {
"template_id": template.template_id,
"channel": channel,
}
if template.title:
result["title"] = self._substitute(template.title, variables)
if template.subject:
result["subject"] = self._substitute(template.subject, variables)
if template.body:
result["body"] = self._substitute(template.body, variables)
if template.html_body:
result["html_body"] = self._substitute(template.html_body, variables)
if template.image_url:
result["image_url"] = self._substitute(template.image_url, variables)
if template.action_url:
result["action_url"] = self._substitute(template.action_url, variables)
if template.action_buttons:
result["action_buttons"] = template.action_buttons
return result
async def _get_template(
self,
name: str,
channel: str,
locale: str
) -> Optional[Template]:
"""Get template with caching and locale fallback."""
# Try cache first
cache_key = f"template:{name}:{channel}:{locale}"
cached = await self.cache.get(cache_key)
if cached:
return Template.from_dict(cached)
# Try exact locale
template = await self.templates.get_by_name(name, channel, locale)
# Fallback to base locale (e.g., en-US -> en)
if not template and "-" in locale:
base_locale = locale.split("-")[0]
template = await self.templates.get_by_name(name, channel, base_locale)
# Fallback to default locale
if not template:
template = await self.templates.get_by_name(name, channel, "en-US")
# Cache the result
if template:
await self.cache.set(cache_key, template.to_dict(), ttl=600) # 10 min
return template
def _substitute(self, text: str, variables: dict) -> str:
"""Substitute variables in text."""
def replace(match):
var_name = match.group(1)
value = variables.get(var_name, "")
return str(value)
return self.var_pattern.sub(replace, text)
class TemplateNotFoundError(Exception):
pass
Part IV: Provider Integration
Chapter 5: Provider Abstraction
We need to integrate with multiple providers (FCM, APNs, SendGrid, Twilio). A clean abstraction makes this manageable.
5.1 Provider Interface
# providers/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class DeliveryStatus(Enum):
SUCCESS = "success"
FAILED = "failed"
RATE_LIMITED = "rate_limited"
INVALID_TOKEN = "invalid_token"
BOUNCED = "bounced"
@dataclass
class DeliveryResult:
"""Result of a delivery attempt."""
status: DeliveryStatus
provider_message_id: Optional[str] = None
error_code: Optional[str] = None
error_message: Optional[str] = None
should_retry: bool = False
should_fallback: bool = False
@dataclass
class ProviderHealth:
"""Health status of a provider."""
is_healthy: bool
success_rate: float # 0.0 - 1.0
avg_latency_ms: float
last_check: str
class NotificationProvider(ABC):
"""
Base class for notification providers.
All providers implement this interface for consistent behavior.
"""
@property
@abstractmethod
def name(self) -> str:
"""Provider name (e.g., 'fcm', 'sendgrid')."""
pass
@property
@abstractmethod
def channel(self) -> str:
"""Channel this provider serves (e.g., 'push', 'email')."""
pass
@abstractmethod
async def send(
self,
recipient: str,
title: str,
body: str,
**kwargs
) -> DeliveryResult:
"""
Send a notification.
Args:
recipient: Channel-specific identifier (token, email, phone)
title: Notification title
body: Notification body
**kwargs: Additional provider-specific options
"""
pass
@abstractmethod
async def check_health(self) -> ProviderHealth:
"""Check provider health."""
pass
5.2 FCM Provider (Push - Android)
# providers/fcm.py
import aiohttp
import logging
from datetime import datetime
from typing import Optional
from providers.base import NotificationProvider, DeliveryResult, DeliveryStatus, ProviderHealth
logger = logging.getLogger(__name__)
class FCMProvider(NotificationProvider):
"""
Firebase Cloud Messaging provider for Android push notifications.
Uses FCM HTTP v1 API.
"""
def __init__(
self,
project_id: str,
credentials_path: str,
timeout_seconds: float = 10.0
):
self.project_id = project_id
self.credentials_path = credentials_path
self.timeout = aiohttp.ClientTimeout(total=timeout_seconds)
self.api_url = f"https://fcm.googleapis.com/v1/projects/{project_id}/messages:send"
self._access_token = None
self._token_expiry = None
@property
def name(self) -> str:
return "fcm"
@property
def channel(self) -> str:
return "push"
async def send(
self,
recipient: str, # FCM device token
title: str,
body: str,
image_url: Optional[str] = None,
action_url: Optional[str] = None,
data: Optional[dict] = None,
**kwargs
) -> DeliveryResult:
"""Send push notification via FCM."""
# Build FCM message
message = {
"message": {
"token": recipient,
"notification": {
"title": title,
"body": body,
},
"android": {
"priority": "high",
"notification": {
"click_action": action_url or "OPEN_APP",
}
}
}
}
if image_url:
message["message"]["notification"]["image"] = image_url
if data:
message["message"]["data"] = {k: str(v) for k, v in data.items()}
# Get access token
access_token = await self._get_access_token()
# Send request
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.post(
self.api_url,
json=message,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
) as response:
if response.status == 200:
result = await response.json()
return DeliveryResult(
status=DeliveryStatus.SUCCESS,
provider_message_id=result.get("name")
)
# Handle errors
error_body = await response.json()
error_code = error_body.get("error", {}).get("code")
error_message = error_body.get("error", {}).get("message")
return self._handle_error(response.status, error_code, error_message)
except aiohttp.ClientError as e:
logger.error(f"FCM request failed: {e}")
return DeliveryResult(
status=DeliveryStatus.FAILED,
error_code="CONNECTION_ERROR",
error_message=str(e),
should_retry=True
)
def _handle_error(
self,
status_code: int,
error_code: str,
error_message: str
) -> DeliveryResult:
"""Handle FCM error responses."""
# Invalid/expired token - don't retry, mark token as invalid
if error_code in ["UNREGISTERED", "INVALID_ARGUMENT"]:
return DeliveryResult(
status=DeliveryStatus.INVALID_TOKEN,
error_code=error_code,
error_message=error_message,
should_retry=False,
should_fallback=True # Try another channel
)
# Rate limited
if status_code == 429:
return DeliveryResult(
status=DeliveryStatus.RATE_LIMITED,
error_code="RATE_LIMITED",
error_message=error_message,
should_retry=True
)
# Server error - retry
if status_code >= 500:
return DeliveryResult(
status=DeliveryStatus.FAILED,
error_code=error_code,
error_message=error_message,
should_retry=True
)
# Other errors
return DeliveryResult(
status=DeliveryStatus.FAILED,
error_code=error_code,
error_message=error_message,
should_retry=False
)
async def _get_access_token(self) -> str:
"""Get OAuth2 access token for FCM API."""
# In production, use google-auth library
# This is simplified
from google.oauth2 import service_account
from google.auth.transport.requests import Request
if self._access_token and self._token_expiry > datetime.utcnow():
return self._access_token
credentials = service_account.Credentials.from_service_account_file(
self.credentials_path,
scopes=["https://www.googleapis.com/auth/firebase.messaging"]
)
credentials.refresh(Request())
self._access_token = credentials.token
self._token_expiry = credentials.expiry
return self._access_token
async def check_health(self) -> ProviderHealth:
"""Check FCM health by validating credentials."""
try:
await self._get_access_token()
return ProviderHealth(
is_healthy=True,
success_rate=0.99, # Would track actual rate in production
avg_latency_ms=150,
last_check=datetime.utcnow().isoformat()
)
except Exception as e:
return ProviderHealth(
is_healthy=False,
success_rate=0.0,
avg_latency_ms=0,
last_check=datetime.utcnow().isoformat()
)
5.3 SendGrid Provider (Email)
# providers/sendgrid.py
import aiohttp
import logging
from datetime import datetime
from typing import Optional
from providers.base import NotificationProvider, DeliveryResult, DeliveryStatus, ProviderHealth
logger = logging.getLogger(__name__)
class SendGridProvider(NotificationProvider):
"""
SendGrid provider for email notifications.
"""
def __init__(
self,
api_key: str,
from_email: str,
from_name: str = "Notifications",
timeout_seconds: float = 30.0
):
self.api_key = api_key
self.from_email = from_email
self.from_name = from_name
self.timeout = aiohttp.ClientTimeout(total=timeout_seconds)
self.api_url = "https://api.sendgrid.com/v3/mail/send"
@property
def name(self) -> str:
return "sendgrid"
@property
def channel(self) -> str:
return "email"
async def send(
self,
recipient: str, # Email address
title: str, # Subject
body: str, # Plain text body
html_body: Optional[str] = None,
**kwargs
) -> DeliveryResult:
"""Send email via SendGrid."""
# Build SendGrid request
payload = {
"personalizations": [
{
"to": [{"email": recipient}]
}
],
"from": {
"email": self.from_email,
"name": self.from_name
},
"subject": title,
"content": []
}
# Add plain text content
payload["content"].append({
"type": "text/plain",
"value": body
})
# Add HTML content if provided
if html_body:
payload["content"].append({
"type": "text/html",
"value": html_body
})
# Add tracking settings
payload["tracking_settings"] = {
"click_tracking": {"enable": True},
"open_tracking": {"enable": True}
}
# Send request
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.post(
self.api_url,
json=payload,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
) as response:
# SendGrid returns 202 Accepted on success
if response.status == 202:
message_id = response.headers.get("X-Message-Id")
return DeliveryResult(
status=DeliveryStatus.SUCCESS,
provider_message_id=message_id
)
# Handle errors
error_body = await response.json()
return self._handle_error(response.status, error_body)
except aiohttp.ClientError as e:
logger.error(f"SendGrid request failed: {e}")
return DeliveryResult(
status=DeliveryStatus.FAILED,
error_code="CONNECTION_ERROR",
error_message=str(e),
should_retry=True
)
def _handle_error(self, status_code: int, error_body: dict) -> DeliveryResult:
"""Handle SendGrid error responses."""
errors = error_body.get("errors", [])
error_message = errors[0].get("message") if errors else "Unknown error"
# Invalid email address
if status_code == 400 and "email" in error_message.lower():
return DeliveryResult(
status=DeliveryStatus.BOUNCED,
error_code="INVALID_EMAIL",
error_message=error_message,
should_retry=False,
should_fallback=True
)
# Rate limited
if status_code == 429:
return DeliveryResult(
status=DeliveryStatus.RATE_LIMITED,
error_code="RATE_LIMITED",
error_message=error_message,
should_retry=True
)
# Server error
if status_code >= 500:
return DeliveryResult(
status=DeliveryStatus.FAILED,
error_code="SERVER_ERROR",
error_message=error_message,
should_retry=True
)
return DeliveryResult(
status=DeliveryStatus.FAILED,
error_code="UNKNOWN_ERROR",
error_message=error_message,
should_retry=False
)
async def check_health(self) -> ProviderHealth:
"""Check SendGrid health."""
# Could call SendGrid stats API
return ProviderHealth(
is_healthy=True,
success_rate=0.98,
avg_latency_ms=500,
last_check=datetime.utcnow().isoformat()
)
5.4 Twilio Provider (SMS)
# providers/twilio.py
import aiohttp
import base64
import logging
from datetime import datetime
from typing import Optional
from providers.base import NotificationProvider, DeliveryResult, DeliveryStatus, ProviderHealth
logger = logging.getLogger(__name__)
class TwilioProvider(NotificationProvider):
"""
Twilio provider for SMS notifications.
"""
def __init__(
self,
account_sid: str,
auth_token: str,
from_number: str,
timeout_seconds: float = 30.0
):
self.account_sid = account_sid
self.auth_token = auth_token
self.from_number = from_number
self.timeout = aiohttp.ClientTimeout(total=timeout_seconds)
self.api_url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
@property
def name(self) -> str:
return "twilio"
@property
def channel(self) -> str:
return "sms"
async def send(
self,
recipient: str, # Phone number in E.164 format
title: str, # Not used for SMS
body: str,
**kwargs
) -> DeliveryResult:
"""Send SMS via Twilio."""
# SMS doesn't have titles - combine if title provided
message = body
if title:
message = f"{title}: {body}"
# Truncate to SMS limit (160 chars for GSM, less for Unicode)
if len(message) > 160:
message = message[:157] + "..."
# Build request
data = {
"To": recipient,
"From": self.from_number,
"Body": message
}
# Create auth header
credentials = base64.b64encode(
f"{self.account_sid}:{self.auth_token}".encode()
).decode()
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.post(
self.api_url,
data=data,
headers={
"Authorization": f"Basic {credentials}",
"Content-Type": "application/x-www-form-urlencoded"
}
) as response:
result = await response.json()
if response.status == 201:
return DeliveryResult(
status=DeliveryStatus.SUCCESS,
provider_message_id=result.get("sid")
)
return self._handle_error(response.status, result)
except aiohttp.ClientError as e:
logger.error(f"Twilio request failed: {e}")
return DeliveryResult(
status=DeliveryStatus.FAILED,
error_code="CONNECTION_ERROR",
error_message=str(e),
should_retry=True
)
def _handle_error(self, status_code: int, result: dict) -> DeliveryResult:
"""Handle Twilio error responses."""
error_code = str(result.get("code", ""))
error_message = result.get("message", "Unknown error")
# Invalid phone number
if error_code in ["21211", "21614", "21217"]:
return DeliveryResult(
status=DeliveryStatus.INVALID_TOKEN,
error_code=error_code,
error_message=error_message,
should_retry=False,
should_fallback=True
)
# Rate limited
if error_code == "20429" or status_code == 429:
return DeliveryResult(
status=DeliveryStatus.RATE_LIMITED,
error_code="RATE_LIMITED",
error_message=error_message,
should_retry=True
)
# Queue full - retry
if error_code == "30001":
return DeliveryResult(
status=DeliveryStatus.FAILED,
error_code=error_code,
error_message=error_message,
should_retry=True
)
return DeliveryResult(
status=DeliveryStatus.FAILED,
error_code=error_code,
error_message=error_message,
should_retry=False
)
async def check_health(self) -> ProviderHealth:
"""Check Twilio health."""
return ProviderHealth(
is_healthy=True,
success_rate=0.99,
avg_latency_ms=300,
last_check=datetime.utcnow().isoformat()
)
Part V: Delivery Workers
Chapter 6: Channel Workers
Each channel has dedicated workers that consume from channel-specific Kafka topics.
6.1 Base Worker
# workers/base.py
import asyncio
import json
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from aiokafka import AIOKafkaConsumer
from models.notification import Notification, NotificationStatus
from providers.base import DeliveryResult, DeliveryStatus
from repositories.notification import NotificationRepository
from repositories.delivery_attempt import DeliveryAttemptRepository
from services.metrics import MetricsService
logger = logging.getLogger(__name__)
class BaseChannelWorker(ABC):
"""
Base class for channel delivery workers.
Handles:
- Kafka consumption
- Retry logic
- Status updates
- Metrics
Week 2 Concepts: Retries, Circuit Breakers
Week 3 Concepts: Dead Letter Queue
"""
def __init__(
self,
kafka_brokers: str,
topic: str,
consumer_group: str,
notification_repo: NotificationRepository,
attempt_repo: DeliveryAttemptRepository,
metrics: MetricsService,
max_retries: int = 3
):
self.brokers = kafka_brokers
self.topic = topic
self.group = consumer_group
self.notifications = notification_repo
self.attempts = attempt_repo
self.metrics = metrics
self.max_retries = max_retries
self.consumer = None
self.running = False
@property
@abstractmethod
def channel(self) -> str:
"""Channel this worker handles."""
pass
@abstractmethod
async def deliver(self, notification: Notification) -> DeliveryResult:
"""Deliver notification through the channel."""
pass
async def start(self):
"""Start the worker."""
self.consumer = AIOKafkaConsumer(
self.topic,
bootstrap_servers=self.brokers,
group_id=self.group,
auto_offset_reset="earliest",
enable_auto_commit=False
)
await self.consumer.start()
self.running = True
logger.info(f"{self.channel} worker started, consuming from {self.topic}")
try:
async for message in self.consumer:
if not self.running:
break
await self._process_message(message)
finally:
await self.consumer.stop()
async def stop(self):
"""Stop the worker gracefully."""
self.running = False
async def _process_message(self, message):
"""Process a single message."""
started_at = datetime.utcnow()
try:
payload = json.loads(message.value.decode())
notification = Notification.from_dict(payload)
# Update status to sending
await self.notifications.update_status(
notification.notification_id,
NotificationStatus.SENDING
)
# Attempt delivery
result = await self.deliver(notification)
# Record attempt
await self._record_attempt(notification, result, started_at)
# Handle result
if result.status == DeliveryStatus.SUCCESS:
await self._handle_success(notification, result)
elif result.should_retry and notification.retry_count < self.max_retries:
await self._handle_retry(notification, result)
elif result.should_fallback:
await self._handle_fallback(notification, result)
else:
await self._handle_failure(notification, result)
# Commit offset
await self.consumer.commit()
# Record metrics
duration = (datetime.utcnow() - started_at).total_seconds() * 1000
self.metrics.record_delivery(
channel=self.channel,
status=result.status.value,
duration_ms=duration
)
except Exception as e:
logger.error(f"Error processing message: {e}", exc_info=True)
# Don't commit - will be reprocessed
async def _record_attempt(
self,
notification: Notification,
result: DeliveryResult,
started_at: datetime
):
"""Record delivery attempt for debugging."""
await self.attempts.save({
"notification_id": notification.notification_id,
"channel": self.channel,
"attempt_number": notification.retry_count + 1,
"started_at": started_at,
"completed_at": datetime.utcnow(),
"success": result.status == DeliveryStatus.SUCCESS,
"error_code": result.error_code,
"error_message": result.error_message,
"provider_message_id": result.provider_message_id
})
async def _handle_success(self, notification: Notification, result: DeliveryResult):
"""Handle successful delivery."""
await self.notifications.update(
notification.notification_id,
{
"status": NotificationStatus.SENT,
"provider_message_id": result.provider_message_id,
"sent_at": datetime.utcnow()
}
)
logger.info(f"Delivered {notification.notification_id} via {self.channel}")
async def _handle_retry(self, notification: Notification, result: DeliveryResult):
"""Handle retriable failure."""
notification.retry_count += 1
# Calculate backoff delay
delay = self._calculate_backoff(notification.retry_count)
# Re-queue with delay
await self._requeue_with_delay(notification, delay)
await self.notifications.update(
notification.notification_id,
{
"retry_count": notification.retry_count,
"error_code": result.error_code,
"error_message": result.error_message
}
)
logger.warning(
f"Retrying {notification.notification_id} in {delay}s "
f"(attempt {notification.retry_count}/{self.max_retries})"
)
async def _handle_fallback(self, notification: Notification, result: DeliveryResult):
"""Handle failure that should trigger fallback channel."""
# Update status
await self.notifications.update(
notification.notification_id,
{
"status": NotificationStatus.FAILED,
"error_code": result.error_code,
"error_message": f"{self.channel} failed, trying fallback"
}
)
# The router should have stored fallback config
# Re-route to next channel
# This would be handled by a fallback service
logger.info(f"Triggering fallback for {notification.notification_id}")
async def _handle_failure(self, notification: Notification, result: DeliveryResult):
"""Handle permanent failure."""
await self.notifications.update(
notification.notification_id,
{
"status": NotificationStatus.FAILED,
"error_code": result.error_code,
"error_message": result.error_message
}
)
# Send to dead letter queue
await self._send_to_dlq(notification, result)
logger.error(
f"Failed to deliver {notification.notification_id}: "
f"{result.error_code} - {result.error_message}"
)
def _calculate_backoff(self, retry_count: int) -> int:
"""Calculate exponential backoff with jitter."""
import random
base_delay = 2 ** retry_count # 2, 4, 8, 16...
jitter = random.uniform(0, 1)
return int(base_delay + jitter)
async def _requeue_with_delay(self, notification: Notification, delay: int):
"""Re-queue notification with delay."""
# In production, use a delay queue or scheduled job
# For simplicity, we just sleep (not ideal)
await asyncio.sleep(delay)
# Re-publish to same topic
pass
async def _send_to_dlq(self, notification: Notification, result: DeliveryResult):
"""Send to dead letter queue."""
dlq_topic = f"dlq.{self.channel}"
# Publish to DLQ
pass
6.2 Push Worker
# workers/push_worker.py
from workers.base import BaseChannelWorker
from providers.base import DeliveryResult, DeliveryStatus
from providers.fcm import FCMProvider
from providers.apns import APNsProvider
from repositories.device_token import DeviceTokenRepository
from models.notification import Notification
from services.template import TemplateService
class PushWorker(BaseChannelWorker):
"""
Worker for push notifications.
Handles both Android (FCM) and iOS (APNs).
"""
def __init__(
self,
fcm_provider: FCMProvider,
apns_provider: APNsProvider,
device_tokens: DeviceTokenRepository,
template_service: TemplateService,
**kwargs
):
super().__init__(topic="channel.push", **kwargs)
self.fcm = fcm_provider
self.apns = apns_provider
self.device_tokens = device_tokens
self.templates = template_service
@property
def channel(self) -> str:
return "push"
async def deliver(self, notification: Notification) -> DeliveryResult:
"""Deliver push notification to all user's devices."""
# Get all active device tokens for user
tokens = await self.device_tokens.get_active_tokens(notification.user_id)
if not tokens:
return DeliveryResult(
status=DeliveryStatus.FAILED,
error_code="NO_DEVICES",
error_message="User has no registered devices",
should_fallback=True
)
# Render template if not already rendered
if not notification.title or not notification.body:
rendered = await self.templates.render(
notification.template,
"push",
notification.variables
)
notification.title = rendered.get("title", "")
notification.body = rendered.get("body", "")
# Send to each device
results = []
for token_info in tokens:
provider = self._get_provider(token_info["platform"])
result = await provider.send(
recipient=token_info["token"],
title=notification.title,
body=notification.body,
data={"notification_id": notification.notification_id}
)
results.append((token_info, result))
# Handle invalid tokens
if result.status == DeliveryStatus.INVALID_TOKEN:
await self.device_tokens.mark_invalid(
token_info["token_id"],
reason=result.error_message
)
# Determine overall result
# Success if at least one device succeeded
successes = [r for _, r in results if r.status == DeliveryStatus.SUCCESS]
if successes:
return DeliveryResult(
status=DeliveryStatus.SUCCESS,
provider_message_id=successes[0].provider_message_id
)
# All failed
last_result = results[-1][1] if results else None
return last_result or DeliveryResult(
status=DeliveryStatus.FAILED,
error_code="ALL_DEVICES_FAILED",
should_fallback=True
)
def _get_provider(self, platform: str):
"""Get provider for platform."""
if platform == "ios":
return self.apns
return self.fcm
6.3 Email Worker
# workers/email_worker.py
from workers.base import BaseChannelWorker
from providers.base import DeliveryResult
from providers.sendgrid import SendGridProvider
from models.notification import Notification
from services.template import TemplateService
from repositories.user_email import UserEmailRepository
class EmailWorker(BaseChannelWorker):
"""Worker for email notifications."""
def __init__(
self,
sendgrid: SendGridProvider,
template_service: TemplateService,
user_emails: UserEmailRepository,
**kwargs
):
super().__init__(topic="channel.email", **kwargs)
self.provider = sendgrid
self.templates = template_service
self.emails = user_emails
@property
def channel(self) -> str:
return "email"
async def deliver(self, notification: Notification) -> DeliveryResult:
"""Deliver email notification."""
# Get user's email
email_info = await self.emails.get_primary(notification.user_id)
if not email_info or email_info["status"] != "active":
return DeliveryResult(
status=DeliveryStatus.FAILED,
error_code="NO_EMAIL",
error_message="User has no valid email address",
should_fallback=True
)
# Render template
rendered = await self.templates.render(
notification.template,
"email",
notification.variables
)
# Send email
result = await self.provider.send(
recipient=email_info["email"],
title=rendered.get("subject", notification.title),
body=rendered.get("body", notification.body),
html_body=rendered.get("html_body")
)
# Track send
if result.status == DeliveryStatus.SUCCESS:
await self.emails.update_last_sent(email_info["email_id"])
return result
Part VI: Delivery Status Tracking
Chapter 7: Status Management
7.1 Status State Machine
NOTIFICATION STATUS STATE MACHINE
┌─────────┐
│ PENDING │
└────┬────┘
│ Written to outbox
▼
┌─────────┐
│ QUEUED │
└────┬────┘
│ Consumed by router
▼
┌─────────┐
│ ROUTING │
└────┬────┘
│ Published to channel topic
▼
┌─────────┐
│ SENDING │
└────┬────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ SENT │ │ FAILED │ │ BOUNCED │
└────┬────┘ └─────────┘ └─────────┘
│
│ Provider callback
▼
┌───────────┐
│ DELIVERED │
└─────┬─────┘
│ User interaction
▼
┌─────────┐
│ READ │
└─────────┘
VALID TRANSITIONS:
PENDING → QUEUED
QUEUED → ROUTING
ROUTING → SENDING
SENDING → SENT, FAILED, BOUNCED
SENT → DELIVERED, FAILED
DELIVERED → READ
FAILED → QUEUED (retry)
7.2 Status Update Service
# services/status.py
from datetime import datetime
from typing import Optional
import logging
from models.notification import NotificationStatus
from repositories.notification import NotificationRepository
from services.events import EventPublisher
logger = logging.getLogger(__name__)
class StatusService:
"""
Manages notification status updates.
Ensures valid state transitions and publishes events.
"""
VALID_TRANSITIONS = {
NotificationStatus.PENDING: {NotificationStatus.QUEUED},
NotificationStatus.QUEUED: {NotificationStatus.ROUTING},
NotificationStatus.ROUTING: {NotificationStatus.SENDING},
NotificationStatus.SENDING: {
NotificationStatus.SENT,
NotificationStatus.FAILED,
NotificationStatus.BOUNCED
},
NotificationStatus.SENT: {
NotificationStatus.DELIVERED,
NotificationStatus.FAILED
},
NotificationStatus.DELIVERED: {NotificationStatus.READ},
NotificationStatus.FAILED: {NotificationStatus.QUEUED}, # Retry
}
def __init__(
self,
notification_repo: NotificationRepository,
event_publisher: EventPublisher
):
self.notifications = notification_repo
self.events = event_publisher
async def update_status(
self,
notification_id: str,
new_status: NotificationStatus,
error_code: Optional[str] = None,
error_message: Optional[str] = None,
provider_message_id: Optional[str] = None
) -> bool:
"""
Update notification status.
Returns True if update was successful.
"""
# Get current notification
notification = await self.notifications.get(notification_id)
if not notification:
logger.warning(f"Notification not found: {notification_id}")
return False
current_status = notification.status
# Validate transition
if not self._is_valid_transition(current_status, new_status):
logger.warning(
f"Invalid status transition for {notification_id}: "
f"{current_status} → {new_status}"
)
return False
# Build update
update = {"status": new_status}
if new_status == NotificationStatus.SENT:
update["sent_at"] = datetime.utcnow()
elif new_status == NotificationStatus.DELIVERED:
update["delivered_at"] = datetime.utcnow()
elif new_status == NotificationStatus.READ:
update["read_at"] = datetime.utcnow()
if error_code:
update["error_code"] = error_code
if error_message:
update["error_message"] = error_message
if provider_message_id:
update["provider_message_id"] = provider_message_id
# Apply update
await self.notifications.update(notification_id, update)
# Publish status change event
await self.events.publish("notification.status_changed", {
"notification_id": notification_id,
"user_id": notification.user_id,
"old_status": current_status.value,
"new_status": new_status.value,
"timestamp": datetime.utcnow().isoformat()
})
return True
def _is_valid_transition(
self,
from_status: NotificationStatus,
to_status: NotificationStatus
) -> bool:
"""Check if status transition is valid."""
valid_next = self.VALID_TRANSITIONS.get(from_status, set())
return to_status in valid_next
7.3 Webhook Handler (Provider Callbacks)
# api/webhooks.py
from fastapi import APIRouter, Request, HTTPException
import hashlib
import hmac
import logging
from services.status import StatusService
from models.notification import NotificationStatus
logger = logging.getLogger(__name__)
router = APIRouter()
class WebhookHandler:
"""
Handles callbacks from notification providers.
Providers call our webhooks to report delivery status.
"""
def __init__(
self,
status_service: StatusService,
sendgrid_webhook_key: str,
twilio_auth_token: str
):
self.status = status_service
self.sendgrid_key = sendgrid_webhook_key
self.twilio_token = twilio_auth_token
async def handle_sendgrid(self, events: list[dict]):
"""Handle SendGrid webhook events."""
for event in events:
notification_id = event.get("notification_id") # Custom header
event_type = event.get("event")
if not notification_id:
continue
# Map SendGrid events to our statuses
status_map = {
"delivered": NotificationStatus.DELIVERED,
"bounce": NotificationStatus.BOUNCED,
"dropped": NotificationStatus.FAILED,
"open": None, # Track but don't change status
"click": None,
}
new_status = status_map.get(event_type)
if new_status:
await self.status.update_status(
notification_id,
new_status,
error_code=event.get("reason"),
error_message=event.get("response")
)
async def handle_twilio(self, data: dict):
"""Handle Twilio status callback."""
message_sid = data.get("MessageSid")
status = data.get("MessageStatus")
# Look up notification by provider message ID
# (In production, maintain a mapping)
status_map = {
"delivered": NotificationStatus.DELIVERED,
"failed": NotificationStatus.FAILED,
"undelivered": NotificationStatus.FAILED,
}
new_status = status_map.get(status)
if new_status:
# Update status
pass
def verify_sendgrid_signature(self, request: Request, payload: bytes) -> bool:
"""Verify SendGrid webhook signature."""
signature = request.headers.get("X-Twilio-Email-Event-Webhook-Signature")
timestamp = request.headers.get("X-Twilio-Email-Event-Webhook-Timestamp")
if not signature or not timestamp:
return False
# Verify HMAC
expected = hmac.new(
self.sendgrid_key.encode(),
timestamp.encode() + payload,
hashlib.sha256
).digest()
return hmac.compare_digest(signature.encode(), expected)
@router.post("/webhooks/sendgrid")
async def sendgrid_webhook(request: Request, handler: WebhookHandler):
body = await request.body()
if not handler.verify_sendgrid_signature(request, body):
raise HTTPException(status_code=401, detail="Invalid signature")
events = await request.json()
await handler.handle_sendgrid(events)
return {"status": "ok"}
@router.post("/webhooks/twilio")
async def twilio_webhook(request: Request, handler: WebhookHandler):
form = await request.form()
await handler.handle_twilio(dict(form))
return {"status": "ok"}
Summary
What We Built Today
DAY 2 SUMMARY: CORE NOTIFICATION FLOW
INGESTION
├── Send API with validation
├── Idempotency handling
├── Transactional outbox pattern
└── Priority-based topic routing
ROUTING
├── Preference-based channel selection
├── Quiet hours handling
├── Channel availability checks
└── Kafka topic publishing
TEMPLATE RENDERING
├── Variable substitution
├── Locale fallback
├── Template caching
└── Multi-channel support
PROVIDER INTEGRATION
├── FCM (Android push)
├── APNs (iOS push) - interface shown
├── SendGrid (email)
├── Twilio (SMS)
└── Consistent provider interface
DELIVERY WORKERS
├── Base worker with retry logic
├── Push worker (multi-device)
├── Email worker
├── Exponential backoff
└── Dead letter queue
STATUS TRACKING
├── State machine validation
├── Status update service
├── Provider webhook handlers
└── Event publishing
Week 1-5 Concepts Applied
| Concept | Where Applied |
|---|---|
| Transactional Outbox (Week 3) | Reliable event publishing after save |
| Idempotency (Week 2) | Deduplication via idempotency keys |
| Retry with Backoff (Week 2) | Exponential backoff in workers |
| Dead Letter Queue (Week 3) | Failed notifications after max retries |
| Caching (Week 4) | Template and preference caching |
What's Coming Tomorrow
Day 3: Advanced Features — "Features that separate good from great"
- Template system with A/B testing
- Batching and digest notifications
- Scheduling and delayed delivery
- Multi-channel orchestration (saga pattern)
- Real-time in-app notifications
Interview Tip of the Day
INTERVIEW TIP: TRACE THE HAPPY PATH FIRST
When explaining a system, trace ONE complete request:
"Let me walk you through a payment notification end-to-end:
1. Payment service calls POST /v1/notifications
2. We validate, generate ID, write to DB + outbox atomically
3. Outbox processor publishes to Kafka high-priority topic
4. Router consumes, looks up user preferences
5. User wants push + email, so we publish to both channel topics
6. Push worker sends to FCM, gets success
7. Email worker sends to SendGrid, gets queued
8. SendGrid calls our webhook when delivered
9. User can query status via GET /v1/notifications/{id}"
This shows:
- You understand the full flow
- You can explain complex systems simply
- You think about reliability (outbox, retries)
- You know how webhooks work
THEN dive into edge cases and failure modes.
End of Week 6, Day 2
Tomorrow: Day 3 — Advanced Features