Week 6 — Day 3: Advanced Features
System Design Mastery Series — Practical Application Week
Introduction
Yesterday we built the core notification flow — ingestion, routing, delivery, and status tracking. Today we add the advanced features that separate a basic notification system from a production-grade platform.
Today's Theme: "Features that separate good from great"
We'll build:
- Sophisticated template system with A/B testing
- Batching and digest notifications
- Scheduled and delayed delivery
- Multi-channel orchestration with fallbacks
- Real-time in-app notification center
Part I: Advanced Template System
Chapter 1: Template Versioning and A/B Testing
Real notification systems don't just render templates — they experiment, measure, and optimize.
1.1 Template with Variants
# models/template.py (extended)
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
from enum import Enum
class VariantStatus(Enum):
DRAFT = "draft"
ACTIVE = "active"
PAUSED = "paused"
ARCHIVED = "archived"
@dataclass
class TemplateVariant:
"""A variant of a template for A/B testing."""
variant_id: str
template_id: str
name: str # e.g., "control", "variant_a", "variant_b"
# Content
title: Optional[str] = None
subject: Optional[str] = None
body: str = ""
html_body: Optional[str] = None
# A/B test config
weight: int = 100 # Traffic percentage (0-100)
status: VariantStatus = VariantStatus.DRAFT
# Metrics
sent_count: int = 0
delivered_count: int = 0
opened_count: int = 0
clicked_count: int = 0
created_at: datetime = field(default_factory=datetime.utcnow)
@property
def delivery_rate(self) -> float:
if self.sent_count == 0:
return 0.0
return self.delivered_count / self.sent_count
@property
def open_rate(self) -> float:
if self.delivered_count == 0:
return 0.0
return self.opened_count / self.delivered_count
@property
def click_rate(self) -> float:
if self.opened_count == 0:
return 0.0
return self.clicked_count / self.opened_count
@dataclass
class ABTest:
"""An A/B test configuration."""
test_id: str
template_id: str
name: str
variants: list[TemplateVariant] = field(default_factory=list)
# Test parameters
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
min_sample_size: int = 1000
confidence_level: float = 0.95
# Status
is_active: bool = False
winner_variant_id: Optional[str] = None
def is_running(self) -> bool:
if not self.is_active:
return False
now = datetime.utcnow()
if self.start_date and now < self.start_date:
return False
if self.end_date and now > self.end_date:
return False
return True
1.2 Variant Selection Service
# services/ab_testing.py
import random
import hashlib
from typing import Optional
from dataclasses import dataclass
import logging
from models.template import TemplateVariant, ABTest
from repositories.ab_test import ABTestRepository
from services.cache import CacheService
logger = logging.getLogger(__name__)
@dataclass
class VariantSelection:
variant: TemplateVariant
test_id: Optional[str] = None
is_control: bool = False
class ABTestingService:
"""
A/B testing service for template variants.
Features:
- Consistent user assignment (same user always sees same variant)
- Weighted random selection
- Automatic winner detection
"""
def __init__(
self,
ab_test_repo: ABTestRepository,
cache: CacheService
):
self.tests = ab_test_repo
self.cache = cache
async def select_variant(
self,
template_id: str,
user_id: str,
channel: str
) -> VariantSelection:
"""
Select a template variant for a user.
Uses consistent hashing so the same user always
gets the same variant within a test.
"""
# Check for active A/B test
test = await self._get_active_test(template_id, channel)
if not test or not test.is_running():
# No active test - return default variant
default = await self._get_default_variant(template_id, channel)
return VariantSelection(variant=default, is_control=True)
# Get active variants
active_variants = [
v for v in test.variants
if v.status == VariantStatus.ACTIVE and v.weight > 0
]
if not active_variants:
default = await self._get_default_variant(template_id, channel)
return VariantSelection(variant=default, is_control=True)
# Consistent selection based on user_id
selected = self._select_by_weight(user_id, test.test_id, active_variants)
is_control = selected.name == "control"
return VariantSelection(
variant=selected,
test_id=test.test_id,
is_control=is_control
)
def _select_by_weight(
self,
user_id: str,
test_id: str,
variants: list[TemplateVariant]
) -> TemplateVariant:
"""
Select variant using weighted consistent hashing.
Same user + test always returns same variant.
"""
# Create deterministic hash from user_id + test_id
hash_input = f"{user_id}:{test_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
# Normalize to 0-100
bucket = hash_value % 100
# Select based on cumulative weights
cumulative = 0
for variant in variants:
cumulative += variant.weight
if bucket < cumulative:
return variant
# Fallback to last variant
return variants[-1]
async def _get_active_test(
self,
template_id: str,
channel: str
) -> Optional[ABTest]:
"""Get active A/B test for template."""
cache_key = f"ab_test:{template_id}:{channel}"
cached = await self.cache.get(cache_key)
if cached:
return ABTest.from_dict(cached)
test = await self.tests.get_active_for_template(template_id, channel)
if test:
await self.cache.set(cache_key, test.to_dict(), ttl=60)
return test
async def _get_default_variant(
self,
template_id: str,
channel: str
) -> TemplateVariant:
"""Get the default (control) variant."""
# Return the primary template as a variant
pass
async def record_event(
self,
variant_id: str,
event_type: str # sent, delivered, opened, clicked
):
"""Record an event for A/B test metrics."""
field_map = {
"sent": "sent_count",
"delivered": "delivered_count",
"opened": "opened_count",
"clicked": "clicked_count"
}
field = field_map.get(event_type)
if field:
await self.tests.increment_variant_metric(variant_id, field)
async def check_for_winner(self, test_id: str) -> Optional[str]:
"""
Check if a test has a statistically significant winner.
Uses chi-squared test for significance.
"""
test = await self.tests.get(test_id)
if not test or len(test.variants) < 2:
return None
# Need minimum sample size
total_sent = sum(v.sent_count for v in test.variants)
if total_sent < test.min_sample_size:
return None
# Find best performing variant
best_variant = max(test.variants, key=lambda v: v.click_rate)
# Check statistical significance
# (Simplified - real implementation would use proper statistics)
if self._is_significant(test.variants, best_variant, test.confidence_level):
return best_variant.variant_id
return None
def _is_significant(
self,
variants: list[TemplateVariant],
best: TemplateVariant,
confidence: float
) -> bool:
"""Check if the best variant is statistically significant."""
# Simplified significance check
# Real implementation would use scipy.stats
if best.sent_count < 100:
return False
# Check if best is significantly better than all others
for variant in variants:
if variant.variant_id == best.variant_id:
continue
# Require at least 10% improvement with enough data
if variant.sent_count < 100:
continue
improvement = (best.click_rate - variant.click_rate) / max(variant.click_rate, 0.001)
if improvement < 0.1: # Less than 10% improvement
return False
return True
1.3 Enhanced Template Service
# services/template.py (enhanced)
class EnhancedTemplateService:
"""
Template service with A/B testing support.
"""
def __init__(
self,
template_repo: TemplateRepository,
ab_testing: ABTestingService,
cache: CacheService
):
self.templates = template_repo
self.ab_testing = ab_testing
self.cache = cache
self.var_pattern = re.compile(r'\{\{(\w+)\}\}')
async def render(
self,
template_name: str,
channel: str,
variables: dict,
user_id: str,
locale: str = "en-US"
) -> dict:
"""
Render a template with A/B testing.
Returns rendered content and variant info for tracking.
"""
# Select variant (may be from A/B test)
selection = await self.ab_testing.select_variant(
template_name,
user_id,
channel
)
variant = selection.variant
# Render the variant
result = {
"template_id": variant.template_id,
"variant_id": variant.variant_id,
"test_id": selection.test_id,
"channel": channel,
}
if variant.title:
result["title"] = self._substitute(variant.title, variables)
if variant.subject:
result["subject"] = self._substitute(variant.subject, variables)
if variant.body:
result["body"] = self._substitute(variant.body, variables)
if variant.html_body:
result["html_body"] = self._substitute(variant.html_body, variables)
# Record that we sent this variant
if selection.test_id:
await self.ab_testing.record_event(variant.variant_id, "sent")
return result
def _substitute(self, text: str, variables: dict) -> str:
"""Substitute variables in text."""
def replace(match):
var_name = match.group(1)
return str(variables.get(var_name, ""))
return self.var_pattern.sub(replace, text)
Part II: Batching and Digests
Chapter 2: Notification Batching
Instead of sending every notification immediately, some should be batched for better user experience.
2.1 When to Batch
BATCHING USE CASES
1. TRADING NOTIFICATIONS
User makes 50 trades in a day
Instead of: 50 separate "Trade executed" notifications
Send: "You made 50 trades today. View summary →"
2. SOCIAL ACTIVITY
User gets 20 likes on a post
Instead of: 20 "X liked your post" notifications
Send: "20 people liked your post"
3. TRANSACTION SUMMARIES
User has 10 transactions
Instead of: 10 separate notifications
Send: Daily digest at 6pm
4. MARKETING
Multiple promotional messages
Instead of: Separate emails
Send: Weekly newsletter
2.2 Batching Service
# services/batching.py
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
from enum import Enum
import asyncio
import logging
logger = logging.getLogger(__name__)
class BatchStrategy(Enum):
COUNT = "count" # Batch after N notifications
TIME = "time" # Batch after T seconds
HYBRID = "hybrid" # Whichever comes first
@dataclass
class BatchConfig:
"""Configuration for batching notifications."""
category: str
strategy: BatchStrategy
# For COUNT strategy
max_count: int = 10
# For TIME strategy
window_seconds: int = 300 # 5 minutes
# Template for batched notification
batch_template: str = ""
# Which fields to aggregate
aggregate_fields: list[str] = field(default_factory=list)
@dataclass
class PendingBatch:
"""A batch being accumulated."""
batch_id: str
user_id: str
category: str
notifications: list[dict] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.utcnow)
def add(self, notification: dict):
self.notifications.append(notification)
@property
def count(self) -> int:
return len(self.notifications)
def age_seconds(self) -> float:
return (datetime.utcnow() - self.created_at).total_seconds()
class BatchingService:
"""
Service for batching notifications.
Accumulates notifications and sends them together.
"""
# Default batch configs
DEFAULT_CONFIGS = {
"social_like": BatchConfig(
category="social_like",
strategy=BatchStrategy.HYBRID,
max_count=10,
window_seconds=300,
batch_template="social_likes_batch",
aggregate_fields=["liker_name", "post_id"]
),
"trade_executed": BatchConfig(
category="trade_executed",
strategy=BatchStrategy.TIME,
window_seconds=60,
batch_template="trades_batch",
aggregate_fields=["symbol", "amount", "price"]
),
}
def __init__(
self,
redis_client,
notification_service,
configs: dict[str, BatchConfig] = None
):
self.redis = redis_client
self.notifications = notification_service
self.configs = configs or self.DEFAULT_CONFIGS
self.running = False
async def should_batch(self, notification: dict) -> bool:
"""Check if this notification should be batched."""
category = notification.get("category")
return category in self.configs
async def add_to_batch(self, notification: dict) -> Optional[str]:
"""
Add notification to batch.
Returns batch_id if added, None if batch was flushed.
"""
user_id = notification["user_id"]
category = notification["category"]
config = self.configs[category]
batch_key = f"batch:{user_id}:{category}"
# Add to batch in Redis
await self.redis.rpush(batch_key, json.dumps(notification))
# Set expiry if new batch
if await self.redis.llen(batch_key) == 1:
await self.redis.expire(batch_key, config.window_seconds + 60)
# Check if batch should be flushed
count = await self.redis.llen(batch_key)
should_flush = False
if config.strategy == BatchStrategy.COUNT:
should_flush = count >= config.max_count
elif config.strategy == BatchStrategy.TIME:
# Time-based batches are flushed by background job
pass
elif config.strategy == BatchStrategy.HYBRID:
should_flush = count >= config.max_count
if should_flush:
await self.flush_batch(user_id, category)
return None
return batch_key
async def flush_batch(self, user_id: str, category: str):
"""Flush a batch and send the aggregated notification."""
batch_key = f"batch:{user_id}:{category}"
config = self.configs[category]
# Get all notifications atomically
pipe = self.redis.pipeline()
pipe.lrange(batch_key, 0, -1)
pipe.delete(batch_key)
results = await pipe.execute()
notifications_data = results[0]
if not notifications_data:
return
notifications = [json.loads(n) for n in notifications_data]
# Create batched notification
batched = self._create_batch_notification(
user_id,
category,
notifications,
config
)
# Send the batched notification
await self.notifications.send(batched)
logger.info(
f"Flushed batch for {user_id}/{category}: "
f"{len(notifications)} notifications"
)
def _create_batch_notification(
self,
user_id: str,
category: str,
notifications: list[dict],
config: BatchConfig
) -> dict:
"""Create a single notification from a batch."""
# Aggregate specified fields
aggregated = {}
for field in config.aggregate_fields:
aggregated[field] = [n.get("variables", {}).get(field) for n in notifications]
# Create variables for batch template
variables = {
"count": len(notifications),
"items": aggregated,
"first_item": notifications[0].get("variables", {}),
"last_item": notifications[-1].get("variables", {}),
}
return {
"user_id": user_id,
"type": "batched",
"category": category,
"template": config.batch_template,
"variables": variables,
"metadata": {
"batched_count": len(notifications),
"original_notification_ids": [n.get("notification_id") for n in notifications]
}
}
async def start_flush_worker(self):
"""Background worker to flush time-based batches."""
self.running = True
while self.running:
try:
await self._flush_expired_batches()
except Exception as e:
logger.error(f"Batch flush error: {e}")
await asyncio.sleep(10) # Check every 10 seconds
async def _flush_expired_batches(self):
"""Find and flush batches that have exceeded their time window."""
# Scan for batch keys
cursor = 0
while True:
cursor, keys = await self.redis.scan(cursor, match="batch:*")
for key in keys:
# Parse key: batch:{user_id}:{category}
parts = key.decode().split(":")
if len(parts) != 3:
continue
_, user_id, category = parts
if category not in self.configs:
continue
config = self.configs[category]
# Check batch age
# (In production, store created_at in batch metadata)
ttl = await self.redis.ttl(key)
age = config.window_seconds + 60 - ttl
if age >= config.window_seconds:
await self.flush_batch(user_id, category)
if cursor == 0:
break
Chapter 3: Digest Notifications
Digests are scheduled summaries sent at specific times.
3.1 Digest Configuration
# models/digest.py
from dataclasses import dataclass, field
from datetime import time
from typing import Optional
from enum import Enum
class DigestFrequency(Enum):
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
@dataclass
class DigestConfig:
"""User's digest preferences."""
user_id: str
# Enabled digests
transaction_digest: bool = True
activity_digest: bool = True
marketing_digest: bool = False
# Timing
frequency: DigestFrequency = DigestFrequency.DAILY
preferred_time: time = time(18, 0) # 6 PM
timezone: str = "UTC"
# Day of week for weekly (0=Monday)
weekly_day: int = 0
# Day of month for monthly
monthly_day: int = 1
@dataclass
class DigestContent:
"""Content for a digest notification."""
user_id: str
digest_type: str
period_start: datetime
period_end: datetime
# Summary data
items: list[dict] = field(default_factory=list)
summary: dict = field(default_factory=dict)
@property
def item_count(self) -> int:
return len(self.items)
3.2 Digest Service
# services/digest.py
from datetime import datetime, timedelta
import pytz
import logging
from models.digest import DigestConfig, DigestContent, DigestFrequency
from repositories.digest_config import DigestConfigRepository
from repositories.notification import NotificationRepository
from services.notification import NotificationService
logger = logging.getLogger(__name__)
class DigestService:
"""
Service for generating and sending digest notifications.
Runs on a schedule to send daily/weekly/monthly summaries.
"""
def __init__(
self,
config_repo: DigestConfigRepository,
notification_repo: NotificationRepository,
notification_service: NotificationService
):
self.configs = config_repo
self.notifications = notification_repo
self.send = notification_service
async def process_digests(self, frequency: DigestFrequency):
"""
Process digests for all users with this frequency.
Called by scheduled job at appropriate time.
"""
# Get users who should receive digest now
users = await self._get_users_for_digest(frequency)
logger.info(f"Processing {frequency.value} digests for {len(users)} users")
for config in users:
try:
await self._generate_and_send_digest(config)
except Exception as e:
logger.error(f"Failed to send digest to {config.user_id}: {e}")
async def _get_users_for_digest(
self,
frequency: DigestFrequency
) -> list[DigestConfig]:
"""Get users who should receive digest now."""
now = datetime.utcnow()
# Get all configs with this frequency
configs = await self.configs.get_by_frequency(frequency)
# Filter by preferred time (with 30-minute window)
eligible = []
for config in configs:
user_tz = pytz.timezone(config.timezone)
user_now = now.astimezone(user_tz)
# Check if current time is within window of preferred time
pref_hour = config.preferred_time.hour
pref_minute = config.preferred_time.minute
if (user_now.hour == pref_hour and
abs(user_now.minute - pref_minute) < 30):
# For weekly, check day of week
if frequency == DigestFrequency.WEEKLY:
if user_now.weekday() != config.weekly_day:
continue
# For monthly, check day of month
if frequency == DigestFrequency.MONTHLY:
if user_now.day != config.monthly_day:
continue
eligible.append(config)
return eligible
async def _generate_and_send_digest(self, config: DigestConfig):
"""Generate and send digest for a user."""
# Determine period
period_end = datetime.utcnow()
if config.frequency == DigestFrequency.DAILY:
period_start = period_end - timedelta(days=1)
elif config.frequency == DigestFrequency.WEEKLY:
period_start = period_end - timedelta(weeks=1)
else:
period_start = period_end - timedelta(days=30)
# Generate each enabled digest type
if config.transaction_digest:
await self._send_transaction_digest(config, period_start, period_end)
if config.activity_digest:
await self._send_activity_digest(config, period_start, period_end)
async def _send_transaction_digest(
self,
config: DigestConfig,
period_start: datetime,
period_end: datetime
):
"""Generate and send transaction digest."""
# Get transactions for period
transactions = await self.notifications.get_by_type_and_period(
user_id=config.user_id,
notification_type="transaction",
start=period_start,
end=period_end
)
if not transactions:
return # No transactions, no digest
# Calculate summary
summary = {
"total_count": len(transactions),
"total_received": sum(
t.variables.get("amount", 0)
for t in transactions
if t.variables.get("direction") == "received"
),
"total_sent": sum(
t.variables.get("amount", 0)
for t in transactions
if t.variables.get("direction") == "sent"
),
}
# Create digest content
content = DigestContent(
user_id=config.user_id,
digest_type="transaction",
period_start=period_start,
period_end=period_end,
items=[t.to_dict() for t in transactions[:10]], # Top 10
summary=summary
)
# Send digest notification
await self.send.send({
"user_id": config.user_id,
"type": "digest",
"template": "transaction_digest",
"variables": {
"period": config.frequency.value,
"summary": summary,
"items": content.items,
"item_count": content.item_count,
},
"channels": ["email"], # Digests typically go to email
})
logger.info(f"Sent transaction digest to {config.user_id}")
Part III: Scheduling and Delayed Delivery
Chapter 4: Scheduled Notifications
Users and systems need to schedule notifications for future delivery.
4.1 Scheduler Service
# services/scheduler.py
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import asyncio
import logging
from models.notification import Notification, NotificationStatus
from repositories.scheduled_notification import ScheduledNotificationRepository
from services.notification import NotificationService
logger = logging.getLogger(__name__)
@dataclass
class ScheduledNotification:
"""A notification scheduled for future delivery."""
schedule_id: str
notification_id: str
user_id: str
# Schedule details
scheduled_at: datetime
timezone: str = "UTC"
# Recurrence (optional)
is_recurring: bool = False
recurrence_pattern: Optional[str] = None # cron expression
recurrence_end: Optional[datetime] = None
# Status
status: str = "pending" # pending, sent, cancelled
last_sent_at: Optional[datetime] = None
# The notification to send
notification_payload: dict = None
class SchedulerService:
"""
Service for scheduled notification delivery.
Uses database polling with leader election for reliability.
Week 5 Concept: Leader Election
"""
def __init__(
self,
scheduled_repo: ScheduledNotificationRepository,
notification_service: NotificationService,
leader_election,
poll_interval_seconds: int = 10
):
self.scheduled = scheduled_repo
self.notifications = notification_service
self.leader = leader_election
self.poll_interval = poll_interval_seconds
self.running = False
async def schedule(
self,
notification: dict,
scheduled_at: datetime,
timezone: str = "UTC",
recurrence: Optional[str] = None
) -> str:
"""
Schedule a notification for future delivery.
Returns schedule_id.
"""
# Convert to UTC if timezone specified
if timezone != "UTC":
scheduled_at = self._to_utc(scheduled_at, timezone)
# Validate scheduled time is in future
if scheduled_at <= datetime.utcnow():
raise ValueError("scheduled_at must be in the future")
# Create scheduled notification
scheduled = ScheduledNotification(
schedule_id=str(uuid.uuid4()),
notification_id=notification.get("notification_id") or str(uuid.uuid4()),
user_id=notification["user_id"],
scheduled_at=scheduled_at,
timezone=timezone,
is_recurring=recurrence is not None,
recurrence_pattern=recurrence,
notification_payload=notification
)
await self.scheduled.save(scheduled)
logger.info(
f"Scheduled notification {scheduled.schedule_id} "
f"for {scheduled_at}"
)
return scheduled.schedule_id
async def cancel(self, schedule_id: str) -> bool:
"""Cancel a scheduled notification."""
scheduled = await self.scheduled.get(schedule_id)
if not scheduled:
return False
if scheduled.status == "sent":
return False # Already sent
await self.scheduled.update_status(schedule_id, "cancelled")
logger.info(f"Cancelled scheduled notification {schedule_id}")
return True
async def start_processor(self):
"""
Start the scheduled notification processor.
Only runs on the leader node.
"""
self.running = True
while self.running:
try:
# Only process if we're the leader
if await self.leader.is_leader():
await self._process_due_notifications()
except Exception as e:
logger.error(f"Scheduler error: {e}")
await asyncio.sleep(self.poll_interval)
async def _process_due_notifications(self):
"""Process notifications that are due for delivery."""
now = datetime.utcnow()
# Get due notifications (with some buffer for processing time)
due = await self.scheduled.get_due(
before=now + timedelta(seconds=5),
limit=100
)
for scheduled in due:
try:
await self._send_scheduled(scheduled)
except Exception as e:
logger.error(
f"Failed to send scheduled {scheduled.schedule_id}: {e}"
)
async def _send_scheduled(self, scheduled: ScheduledNotification):
"""Send a scheduled notification."""
# Send the notification
await self.notifications.send(scheduled.notification_payload)
# Update status
if scheduled.is_recurring:
# Calculate next occurrence
next_time = self._calculate_next_occurrence(
scheduled.scheduled_at,
scheduled.recurrence_pattern
)
if next_time and (not scheduled.recurrence_end or
next_time <= scheduled.recurrence_end):
# Schedule next occurrence
await self.scheduled.update(
scheduled.schedule_id,
{
"scheduled_at": next_time,
"last_sent_at": datetime.utcnow()
}
)
else:
# Recurrence ended
await self.scheduled.update_status(scheduled.schedule_id, "completed")
else:
# One-time notification - mark as sent
await self.scheduled.update_status(scheduled.schedule_id, "sent")
logger.info(f"Sent scheduled notification {scheduled.schedule_id}")
def _to_utc(self, dt: datetime, timezone: str) -> datetime:
"""Convert datetime to UTC."""
import pytz
tz = pytz.timezone(timezone)
local_dt = tz.localize(dt)
return local_dt.astimezone(pytz.UTC).replace(tzinfo=None)
def _calculate_next_occurrence(
self,
current: datetime,
pattern: str
) -> Optional[datetime]:
"""Calculate next occurrence from cron pattern."""
from croniter import croniter
try:
cron = croniter(pattern, current)
return cron.get_next(datetime)
except Exception:
return None
4.2 "Best Time to Send" Feature
# services/send_time_optimizer.py
from datetime import datetime, time, timedelta
from typing import Optional
import pytz
from repositories.user_engagement import UserEngagementRepository
class SendTimeOptimizer:
"""
Optimizes notification delivery time based on user engagement patterns.
Analyzes when users typically engage with notifications
and suggests optimal send times.
"""
def __init__(self, engagement_repo: UserEngagementRepository):
self.engagement = engagement_repo
async def get_optimal_time(
self,
user_id: str,
channel: str,
default_time: Optional[time] = None
) -> time:
"""
Get optimal send time for a user.
Based on historical engagement patterns.
"""
# Get user's engagement history
history = await self.engagement.get_hourly_engagement(
user_id,
channel,
days=30
)
if not history or sum(history.values()) < 10:
# Not enough data - use default or fallback
return default_time or time(10, 0) # 10 AM default
# Find hour with highest engagement
best_hour = max(history, key=history.get)
# Add some randomization to avoid thundering herd
import random
minute = random.randint(0, 59)
return time(best_hour, minute)
async def record_engagement(
self,
user_id: str,
channel: str,
engaged_at: datetime
):
"""Record when a user engaged with a notification."""
hour = engaged_at.hour
await self.engagement.increment(user_id, channel, hour)
Part IV: Multi-Channel Orchestration
Chapter 5: Saga Pattern for Multi-Channel Delivery
Critical notifications need to reach the user through multiple channels with proper coordination.
5.1 Multi-Channel Saga
# services/multichannel_saga.py
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
class ChannelStatus(Enum):
PENDING = "pending"
SENDING = "sending"
SENT = "sent"
DELIVERED = "delivered"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class ChannelAttempt:
"""Track delivery attempt for one channel."""
channel: str
status: ChannelStatus = ChannelStatus.PENDING
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
provider_message_id: Optional[str] = None
error: Optional[str] = None
@dataclass
class MultiChannelDelivery:
"""
Orchestrates delivery across multiple channels.
Week 5 Concept: Saga Pattern
"""
delivery_id: str
notification_id: str
user_id: str
# Channel configuration
primary_channels: list[str] = field(default_factory=list) # Send simultaneously
fallback_channels: list[str] = field(default_factory=list) # Try if primary fails
# Timing
fallback_delay_seconds: int = 60 # Wait before trying fallback
# State
channel_attempts: dict[str, ChannelAttempt] = field(default_factory=dict)
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
# Result
any_delivered: bool = False
all_failed: bool = False
class MultiChannelSagaService:
"""
Saga orchestrator for multi-channel notification delivery.
Handles:
- Simultaneous delivery to multiple primary channels
- Fallback to secondary channels on failure
- Tracking and status aggregation
"""
def __init__(
self,
channel_workers: dict, # channel -> worker
delivery_repo,
notification_service
):
self.workers = channel_workers
self.deliveries = delivery_repo
self.notifications = notification_service
async def execute(
self,
notification: dict,
primary_channels: list[str],
fallback_channels: list[str] = None,
fallback_delay: int = 60
) -> MultiChannelDelivery:
"""
Execute multi-channel delivery saga.
Strategy:
1. Send to all primary channels simultaneously
2. If any primary succeeds, mark as delivered
3. If all primary fail, wait and try fallbacks
4. Track all attempts for debugging
"""
delivery = MultiChannelDelivery(
delivery_id=str(uuid.uuid4()),
notification_id=notification["notification_id"],
user_id=notification["user_id"],
primary_channels=primary_channels,
fallback_channels=fallback_channels or [],
fallback_delay_seconds=fallback_delay,
started_at=datetime.utcnow()
)
# Initialize channel attempts
for channel in primary_channels + (fallback_channels or []):
delivery.channel_attempts[channel] = ChannelAttempt(channel=channel)
# Save initial state
await self.deliveries.save(delivery)
# Step 1: Send to primary channels simultaneously
primary_results = await self._send_to_channels(
notification,
primary_channels,
delivery
)
# Check if any primary succeeded
primary_success = any(
r.status in [ChannelStatus.SENT, ChannelStatus.DELIVERED]
for r in primary_results
)
if primary_success:
delivery.any_delivered = True
delivery.completed_at = datetime.utcnow()
await self.deliveries.update(delivery)
return delivery
# Step 2: All primary failed - try fallbacks
if fallback_channels:
logger.info(
f"Primary channels failed for {delivery.delivery_id}, "
f"trying fallbacks in {fallback_delay}s"
)
# Wait before fallback (give primary channels time to recover)
await asyncio.sleep(fallback_delay)
# Check again if any primary succeeded (via webhook callback)
delivery = await self.deliveries.get(delivery.delivery_id)
if delivery.any_delivered:
return delivery
# Try fallback channels
fallback_results = await self._send_to_channels(
notification,
fallback_channels,
delivery
)
fallback_success = any(
r.status in [ChannelStatus.SENT, ChannelStatus.DELIVERED]
for r in fallback_results
)
if fallback_success:
delivery.any_delivered = True
# All channels failed
if not delivery.any_delivered:
delivery.all_failed = True
logger.error(
f"All channels failed for notification {notification['notification_id']}"
)
delivery.completed_at = datetime.utcnow()
await self.deliveries.update(delivery)
return delivery
async def _send_to_channels(
self,
notification: dict,
channels: list[str],
delivery: MultiChannelDelivery
) -> list[ChannelAttempt]:
"""Send to multiple channels simultaneously."""
tasks = []
for channel in channels:
task = self._send_to_channel(notification, channel, delivery)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
r if isinstance(r, ChannelAttempt)
else ChannelAttempt(channel=channels[i], status=ChannelStatus.FAILED, error=str(r))
for i, r in enumerate(results)
]
async def _send_to_channel(
self,
notification: dict,
channel: str,
delivery: MultiChannelDelivery
) -> ChannelAttempt:
"""Send to a single channel."""
attempt = delivery.channel_attempts[channel]
attempt.status = ChannelStatus.SENDING
attempt.started_at = datetime.utcnow()
try:
worker = self.workers.get(channel)
if not worker:
raise ValueError(f"No worker for channel: {channel}")
result = await worker.deliver(notification)
if result.success:
attempt.status = ChannelStatus.SENT
attempt.provider_message_id = result.provider_message_id
else:
attempt.status = ChannelStatus.FAILED
attempt.error = result.error_message
except Exception as e:
attempt.status = ChannelStatus.FAILED
attempt.error = str(e)
attempt.completed_at = datetime.utcnow()
# Update delivery state
await self.deliveries.update_channel_attempt(
delivery.delivery_id,
channel,
attempt
)
return attempt
async def handle_delivery_callback(
self,
notification_id: str,
channel: str,
status: ChannelStatus
):
"""
Handle delivery confirmation callback from provider.
Updates saga state when delivery is confirmed.
"""
delivery = await self.deliveries.get_by_notification(notification_id)
if not delivery:
return
if channel in delivery.channel_attempts:
delivery.channel_attempts[channel].status = status
if status == ChannelStatus.DELIVERED:
delivery.any_delivered = True
await self.deliveries.update(delivery)
5.2 Escalation Workflows
# services/escalation.py
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class EscalationRule:
"""Rule for notification escalation."""
name: str
trigger_condition: str # no_response, no_open, no_click
wait_duration: timedelta
escalation_channel: str
max_escalations: int = 3
class EscalationService:
"""
Handles notification escalation for critical messages.
If user doesn't respond to notification within time window,
escalate to more intrusive channel.
Example: Push → SMS → Phone call (for fraud alerts)
"""
ESCALATION_RULES = {
"security_alert": [
EscalationRule(
name="push_to_sms",
trigger_condition="no_open",
wait_duration=timedelta(minutes=5),
escalation_channel="sms"
),
EscalationRule(
name="sms_to_call",
trigger_condition="no_response",
wait_duration=timedelta(minutes=15),
escalation_channel="voice_call"
),
],
"payment_confirmation": [
EscalationRule(
name="push_to_email",
trigger_condition="no_open",
wait_duration=timedelta(hours=1),
escalation_channel="email"
),
],
}
def __init__(
self,
notification_repo,
notification_service,
scheduler
):
self.notifications = notification_repo
self.send = notification_service
self.scheduler = scheduler
async def setup_escalation(
self,
notification_id: str,
category: str
):
"""Set up escalation workflow for a notification."""
rules = self.ESCALATION_RULES.get(category, [])
for i, rule in enumerate(rules):
# Schedule escalation check
check_time = datetime.utcnow() + rule.wait_duration
await self.scheduler.schedule(
{
"type": "escalation_check",
"notification_id": notification_id,
"rule_index": i,
"category": category,
},
scheduled_at=check_time
)
async def check_escalation(
self,
notification_id: str,
rule_index: int,
category: str
):
"""Check if escalation is needed and execute if so."""
rules = self.ESCALATION_RULES.get(category, [])
if rule_index >= len(rules):
return
rule = rules[rule_index]
# Get notification status
notification = await self.notifications.get(notification_id)
if not notification:
return
# Check trigger condition
should_escalate = False
if rule.trigger_condition == "no_open":
should_escalate = notification.read_at is None
elif rule.trigger_condition == "no_response":
# Check if user took any action
should_escalate = not notification.metadata.get("user_responded", False)
if should_escalate:
# Send via escalation channel
await self.send.send({
"user_id": notification.user_id,
"type": notification.type,
"template": f"{notification.template}_escalation",
"variables": notification.variables,
"channels": [rule.escalation_channel],
"metadata": {
"escalation_level": rule_index + 1,
"original_notification_id": notification_id,
}
})
logger.info(
f"Escalated notification {notification_id} to {rule.escalation_channel}"
)
Part V: Real-Time In-App Notifications
Chapter 6: Notification Center
6.1 In-App Notification Model
# models/in_app.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List
from enum import Enum
class InAppNotificationState(Enum):
UNREAD = "unread"
READ = "read"
ARCHIVED = "archived"
@dataclass
class InAppNotification:
"""In-app notification for notification center."""
notification_id: str
user_id: str
# Content
title: str
body: str
icon: Optional[str] = None
image_url: Optional[str] = None
action_url: Optional[str] = None
# Categorization
category: str = "general"
priority: str = "medium"
# State
state: InAppNotificationState = InAppNotificationState.UNREAD
# Timestamps
created_at: datetime = field(default_factory=datetime.utcnow)
read_at: Optional[datetime] = None
archived_at: Optional[datetime] = None
# Grouping
group_key: Optional[str] = None # For notification grouping
# Metadata
metadata: dict = field(default_factory=dict)
@dataclass
class NotificationGroup:
"""Grouped notifications in notification center."""
group_key: str
title: str
notifications: List[InAppNotification]
unread_count: int
latest_at: datetime
6.2 Notification Center Service
# services/notification_center.py
from datetime import datetime
from typing import Optional, List
import json
import logging
from models.in_app import InAppNotification, InAppNotificationState, NotificationGroup
logger = logging.getLogger(__name__)
class NotificationCenterService:
"""
Service for managing in-app notification center.
Features:
- Store notifications for offline users
- Track read/unread state
- Group similar notifications
- Sync across devices
"""
def __init__(
self,
redis_client,
websocket_gateway,
max_notifications: int = 100
):
self.redis = redis_client
self.ws = websocket_gateway
self.max_notifications = max_notifications
async def add_notification(
self,
notification: InAppNotification
) -> bool:
"""
Add notification to user's notification center.
Also pushes via WebSocket if user is online.
"""
user_id = notification.user_id
# Store in Redis list
key = f"inbox:{user_id}"
await self.redis.lpush(key, json.dumps(notification.to_dict()))
# Trim to max size
await self.redis.ltrim(key, 0, self.max_notifications - 1)
# Increment unread count
await self.redis.incr(f"unread:{user_id}")
# Push via WebSocket if online
if await self._is_user_online(user_id):
await self.ws.send_to_user(
user_id,
{
"type": "notification",
"payload": notification.to_dict()
}
)
return True
async def get_notifications(
self,
user_id: str,
limit: int = 50,
offset: int = 0,
state: Optional[InAppNotificationState] = None
) -> List[InAppNotification]:
"""Get notifications for user's notification center."""
key = f"inbox:{user_id}"
# Get from Redis
data = await self.redis.lrange(key, offset, offset + limit - 1)
notifications = []
for item in data:
notif = InAppNotification.from_dict(json.loads(item))
# Filter by state if specified
if state and notif.state != state:
continue
notifications.append(notif)
return notifications
async def get_grouped_notifications(
self,
user_id: str,
limit: int = 50
) -> List[NotificationGroup]:
"""Get notifications grouped by category/type."""
notifications = await self.get_notifications(user_id, limit=limit)
# Group by group_key or category
groups = {}
for notif in notifications:
key = notif.group_key or notif.category
if key not in groups:
groups[key] = NotificationGroup(
group_key=key,
title=self._get_group_title(key),
notifications=[],
unread_count=0,
latest_at=notif.created_at
)
groups[key].notifications.append(notif)
if notif.state == InAppNotificationState.UNREAD:
groups[key].unread_count += 1
if notif.created_at > groups[key].latest_at:
groups[key].latest_at = notif.created_at
# Sort by latest
sorted_groups = sorted(
groups.values(),
key=lambda g: g.latest_at,
reverse=True
)
return sorted_groups
async def mark_as_read(
self,
user_id: str,
notification_ids: List[str]
) -> int:
"""Mark notifications as read."""
key = f"inbox:{user_id}"
marked_count = 0
# Get all notifications
data = await self.redis.lrange(key, 0, -1)
updated = []
for item in data:
notif = json.loads(item)
if notif["notification_id"] in notification_ids:
if notif["state"] == InAppNotificationState.UNREAD.value:
notif["state"] = InAppNotificationState.READ.value
notif["read_at"] = datetime.utcnow().isoformat()
marked_count += 1
updated.append(json.dumps(notif))
# Update list
if updated:
pipe = self.redis.pipeline()
pipe.delete(key)
pipe.rpush(key, *updated)
await pipe.execute()
# Decrement unread count
if marked_count > 0:
await self.redis.decrby(f"unread:{user_id}", marked_count)
return marked_count
async def mark_all_as_read(self, user_id: str) -> int:
"""Mark all notifications as read."""
notifications = await self.get_notifications(user_id, limit=self.max_notifications)
unread_ids = [
n.notification_id
for n in notifications
if n.state == InAppNotificationState.UNREAD
]
if unread_ids:
return await self.mark_as_read(user_id, unread_ids)
return 0
async def get_unread_count(self, user_id: str) -> int:
"""Get unread notification count."""
count = await self.redis.get(f"unread:{user_id}")
return int(count) if count else 0
async def delete_notification(
self,
user_id: str,
notification_id: str
) -> bool:
"""Delete a notification."""
key = f"inbox:{user_id}"
# Find and remove
data = await self.redis.lrange(key, 0, -1)
for item in data:
notif = json.loads(item)
if notif["notification_id"] == notification_id:
await self.redis.lrem(key, 1, item)
# Update unread count if was unread
if notif["state"] == InAppNotificationState.UNREAD.value:
await self.redis.decr(f"unread:{user_id}")
return True
return False
async def _is_user_online(self, user_id: str) -> bool:
"""Check if user has active WebSocket connection."""
return await self.redis.exists(f"online:{user_id}")
def _get_group_title(self, group_key: str) -> str:
"""Get display title for notification group."""
titles = {
"transaction": "Transactions",
"social": "Social",
"marketing": "Promotions",
"security": "Security",
}
return titles.get(group_key, group_key.replace("_", " ").title())
6.3 WebSocket Gateway
# services/websocket_gateway.py
import asyncio
import json
from datetime import datetime
from typing import Dict, Optional
import logging
from fastapi import WebSocket, WebSocketDisconnect
logger = logging.getLogger(__name__)
class WebSocketGateway:
"""
WebSocket gateway for real-time notifications.
Manages WebSocket connections and message delivery.
"""
def __init__(self, redis_client):
self.redis = redis_client
self.connections: Dict[str, WebSocket] = {} # user_id -> websocket
self.heartbeat_interval = 30 # seconds
async def connect(self, websocket: WebSocket, user_id: str):
"""Handle new WebSocket connection."""
await websocket.accept()
self.connections[user_id] = websocket
# Mark user as online in Redis (for distributed setup)
await self.redis.setex(
f"online:{user_id}",
self.heartbeat_interval + 10,
self._get_server_id()
)
logger.info(f"User {user_id} connected via WebSocket")
# Send initial state
await self._send_initial_state(websocket, user_id)
# Start heartbeat
asyncio.create_task(self._heartbeat_loop(user_id))
async def disconnect(self, user_id: str):
"""Handle WebSocket disconnection."""
if user_id in self.connections:
del self.connections[user_id]
await self.redis.delete(f"online:{user_id}")
logger.info(f"User {user_id} disconnected")
async def send_to_user(
self,
user_id: str,
message: dict
) -> bool:
"""Send message to a specific user."""
# Check local connections first
if user_id in self.connections:
websocket = self.connections[user_id]
try:
await websocket.send_json(message)
return True
except Exception as e:
logger.error(f"Failed to send to {user_id}: {e}")
await self.disconnect(user_id)
return False
# User might be connected to different server
# Publish to Redis pub/sub for distributed delivery
await self.redis.publish(
f"ws:{user_id}",
json.dumps(message)
)
return True
async def broadcast(
self,
user_ids: list[str],
message: dict
):
"""Broadcast message to multiple users."""
tasks = [
self.send_to_user(user_id, message)
for user_id in user_ids
]
await asyncio.gather(*tasks, return_exceptions=True)
async def handle_message(
self,
user_id: str,
message: dict
):
"""Handle incoming WebSocket message from client."""
msg_type = message.get("type")
if msg_type == "ping":
await self.send_to_user(user_id, {"type": "pong"})
elif msg_type == "mark_read":
notification_ids = message.get("notification_ids", [])
# Handle mark as read
elif msg_type == "subscribe":
# Handle subscription to specific channels
pass
async def _send_initial_state(self, websocket: WebSocket, user_id: str):
"""Send initial state when user connects."""
# Get unread count
unread_count = await self.redis.get(f"unread:{user_id}")
await websocket.send_json({
"type": "init",
"payload": {
"unread_count": int(unread_count) if unread_count else 0,
"connected_at": datetime.utcnow().isoformat(),
}
})
async def _heartbeat_loop(self, user_id: str):
"""Send periodic heartbeats to keep connection alive."""
while user_id in self.connections:
await asyncio.sleep(self.heartbeat_interval)
if user_id not in self.connections:
break
try:
await self.connections[user_id].send_json({"type": "ping"})
# Refresh online status in Redis
await self.redis.setex(
f"online:{user_id}",
self.heartbeat_interval + 10,
self._get_server_id()
)
except Exception:
await self.disconnect(user_id)
break
def _get_server_id(self) -> str:
"""Get unique identifier for this server instance."""
import socket
return socket.gethostname()
# FastAPI WebSocket endpoint
from fastapi import APIRouter
router = APIRouter()
@router.websocket("/ws/{user_id}")
async def websocket_endpoint(
websocket: WebSocket,
user_id: str,
gateway: WebSocketGateway
):
await gateway.connect(websocket, user_id)
try:
while True:
data = await websocket.receive_json()
await gateway.handle_message(user_id, data)
except WebSocketDisconnect:
await gateway.disconnect(user_id)
Part VI: User Preference Management
Chapter 7: Preference Service
7.1 Hierarchical Preferences
# services/preferences.py
from dataclasses import dataclass
from typing import Optional, Dict, Any
import logging
from models.preferences import UserPreferences, ChannelPreference, CategoryPreference
from repositories.preferences import PreferencesRepository
from services.cache import CacheService
logger = logging.getLogger(__name__)
class PreferenceService:
"""
Manages user notification preferences.
Features:
- Hierarchical preferences (global → category → channel)
- Optimistic locking for concurrent updates
- Cache with invalidation
Week 5 Concept: Consistency (Optimistic Locking)
"""
def __init__(
self,
preference_repo: PreferencesRepository,
cache: CacheService
):
self.preferences = preference_repo
self.cache = cache
async def get_preferences(self, user_id: str) -> UserPreferences:
"""Get user preferences with caching."""
# Check cache
cache_key = f"prefs:{user_id}"
cached = await self.cache.get(cache_key)
if cached:
return UserPreferences.from_dict(cached)
# Get from database
prefs = await self.preferences.get(user_id)
if not prefs:
# Create default preferences
prefs = UserPreferences(user_id=user_id)
await self.preferences.save(prefs)
# Cache
await self.cache.set(cache_key, prefs.to_dict(), ttl=300)
return prefs
async def update_preferences(
self,
user_id: str,
updates: Dict[str, Any],
expected_version: int
) -> UserPreferences:
"""
Update user preferences with optimistic locking.
Raises ConflictError if version mismatch.
"""
# Get current preferences
current = await self.preferences.get(user_id)
if not current:
raise NotFoundError(f"Preferences not found for {user_id}")
# Check version for optimistic locking
if current.version != expected_version:
raise ConflictError(
f"Version mismatch: expected {expected_version}, got {current.version}"
)
# Apply updates
for key, value in updates.items():
if hasattr(current, key):
setattr(current, key, value)
# Increment version
current.version += 1
# Save
await self.preferences.update(current)
# Invalidate cache
await self.cache.delete(f"prefs:{user_id}")
logger.info(f"Updated preferences for {user_id} to version {current.version}")
return current
async def set_channel_preference(
self,
user_id: str,
channel: str,
enabled: bool,
expected_version: int
) -> UserPreferences:
"""Set preference for a specific channel."""
prefs = await self.get_preferences(user_id)
if channel not in prefs.channels:
prefs.channels[channel] = ChannelPreference()
prefs.channels[channel].enabled = enabled
return await self.update_preferences(
user_id,
{"channels": prefs.channels},
expected_version
)
async def set_category_preference(
self,
user_id: str,
category: str,
enabled: bool,
channels: list[str],
expected_version: int
) -> UserPreferences:
"""Set preference for a notification category."""
prefs = await self.get_preferences(user_id)
prefs.categories[category] = CategoryPreference(
enabled=enabled,
channels=channels
)
return await self.update_preferences(
user_id,
{"categories": prefs.categories},
expected_version
)
async def set_quiet_hours(
self,
user_id: str,
enabled: bool,
start: Optional[str] = None, # "22:00"
end: Optional[str] = None, # "08:00"
expected_version: int = None
) -> UserPreferences:
"""Set quiet hours configuration."""
prefs = await self.get_preferences(user_id)
prefs.quiet_hours.enabled = enabled
if start:
prefs.quiet_hours.start = time.fromisoformat(start)
if end:
prefs.quiet_hours.end = time.fromisoformat(end)
return await self.update_preferences(
user_id,
{"quiet_hours": prefs.quiet_hours},
expected_version or prefs.version
)
async def unsubscribe(
self,
user_id: str,
category: str,
channel: Optional[str] = None
):
"""
Handle unsubscribe request.
If channel specified, unsubscribe from that channel for category.
Otherwise, unsubscribe from category entirely.
"""
prefs = await self.get_preferences(user_id)
if category not in prefs.categories:
prefs.categories[category] = CategoryPreference()
if channel:
# Remove channel from category
if channel in prefs.categories[category].channels:
prefs.categories[category].channels.remove(channel)
else:
# Disable entire category
prefs.categories[category].enabled = False
await self.update_preferences(
user_id,
{"categories": prefs.categories},
prefs.version
)
logger.info(f"User {user_id} unsubscribed from {category}/{channel or 'all'}")
def is_notification_allowed(
self,
prefs: UserPreferences,
category: str,
channel: str
) -> bool:
"""Check if notification is allowed based on preferences."""
# Global check
if not prefs.global_enabled:
return False
# Channel check
channel_pref = prefs.channels.get(channel)
if channel_pref and not channel_pref.enabled:
return False
# Category check
cat_pref = prefs.categories.get(category)
if cat_pref:
if not cat_pref.enabled:
return False
if channel not in cat_pref.channels:
return False
return True
Summary
What We Built Today
DAY 3 SUMMARY: ADVANCED FEATURES
TEMPLATE SYSTEM
├── A/B testing with variants
├── Weighted random selection
├── Consistent user assignment
└── Statistical significance checking
BATCHING
├── Count-based batching
├── Time-window batching
├── Hybrid strategy
└── Automatic flush worker
DIGESTS
├── Daily/weekly/monthly summaries
├── Timezone-aware scheduling
├── Configurable per user
└── Category-specific digests
SCHEDULING
├── Future delivery scheduling
├── Recurring notifications (cron)
├── Timezone conversion
├── Cancellation support
MULTI-CHANNEL SAGA
├── Simultaneous primary channels
├── Fallback on failure
├── Delivery tracking per channel
└── Escalation workflows
REAL-TIME IN-APP
├── Notification center
├── Read/unread state
├── Notification grouping
├── WebSocket delivery
PREFERENCES
├── Hierarchical (global → category → channel)
├── Quiet hours
├── Optimistic locking
└── Unsubscribe handling
Week 1-5 Concepts Applied
| Concept | Where Applied |
|---|---|
| Leader Election (Week 5) | Scheduler processor runs only on leader |
| Saga Pattern (Week 5) | Multi-channel delivery orchestration |
| Optimistic Locking (Week 5) | Preference updates with version check |
| Caching (Week 4) | Template and preference caching |
| Background Workers (Week 3) | Batch flush, digest generation |
What's Coming Tomorrow
Day 4: Scale, Reliability & Edge Cases — "What breaks at scale? Everything."
- Scaling challenges (campaigns, hot users)
- Circuit breakers and provider failover
- Every edge case you'll face in production
- Failure scenarios and recovery
- Cost optimization strategies
Interview Tip of the Day
INTERVIEW TIP: SHOW DEPTH ON ADVANCED FEATURES
When asked about a notification system, mention advanced features:
"Beyond basic delivery, a production notification system needs:
1. A/B testing for optimizing message engagement
2. Batching to avoid overwhelming users
3. Scheduled delivery with timezone awareness
4. Multi-channel orchestration with fallbacks
For example, a security alert might go to push AND SMS
simultaneously. If neither gets a response in 5 minutes,
we escalate to a phone call. This is a saga pattern where
we track each channel's state independently."
This shows:
- You've built real systems
- You think about user experience
- You understand operational complexity
End of Week 6, Day 3
Tomorrow: Day 4 — Scale, Reliability & Edge Cases