Himanshu Kukreja
0%
LearnSystem DesignWeek 6Advanced Features
Day 03

Week 6 — Day 3: Advanced Features

System Design Mastery Series — Practical Application Week


Introduction

Yesterday we built the core notification flow — ingestion, routing, delivery, and status tracking. Today we add the advanced features that separate a basic notification system from a production-grade platform.

Today's Theme: "Features that separate good from great"

We'll build:

  • Sophisticated template system with A/B testing
  • Batching and digest notifications
  • Scheduled and delayed delivery
  • Multi-channel orchestration with fallbacks
  • Real-time in-app notification center

Part I: Advanced Template System

Chapter 1: Template Versioning and A/B Testing

Real notification systems don't just render templates — they experiment, measure, and optimize.

1.1 Template with Variants

# models/template.py (extended)

from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
from enum import Enum


class VariantStatus(Enum):
    DRAFT = "draft"
    ACTIVE = "active"
    PAUSED = "paused"
    ARCHIVED = "archived"


@dataclass
class TemplateVariant:
    """A variant of a template for A/B testing."""
    variant_id: str
    template_id: str
    name: str  # e.g., "control", "variant_a", "variant_b"
    
    # Content
    title: Optional[str] = None
    subject: Optional[str] = None
    body: str = ""
    html_body: Optional[str] = None
    
    # A/B test config
    weight: int = 100  # Traffic percentage (0-100)
    status: VariantStatus = VariantStatus.DRAFT
    
    # Metrics
    sent_count: int = 0
    delivered_count: int = 0
    opened_count: int = 0
    clicked_count: int = 0
    
    created_at: datetime = field(default_factory=datetime.utcnow)
    
    @property
    def delivery_rate(self) -> float:
        if self.sent_count == 0:
            return 0.0
        return self.delivered_count / self.sent_count
    
    @property
    def open_rate(self) -> float:
        if self.delivered_count == 0:
            return 0.0
        return self.opened_count / self.delivered_count
    
    @property
    def click_rate(self) -> float:
        if self.opened_count == 0:
            return 0.0
        return self.clicked_count / self.opened_count


@dataclass
class ABTest:
    """An A/B test configuration."""
    test_id: str
    template_id: str
    name: str
    
    variants: list[TemplateVariant] = field(default_factory=list)
    
    # Test parameters
    start_date: Optional[datetime] = None
    end_date: Optional[datetime] = None
    min_sample_size: int = 1000
    confidence_level: float = 0.95
    
    # Status
    is_active: bool = False
    winner_variant_id: Optional[str] = None
    
    def is_running(self) -> bool:
        if not self.is_active:
            return False
        now = datetime.utcnow()
        if self.start_date and now < self.start_date:
            return False
        if self.end_date and now > self.end_date:
            return False
        return True

1.2 Variant Selection Service

# services/ab_testing.py

import random
import hashlib
from typing import Optional
from dataclasses import dataclass
import logging

from models.template import TemplateVariant, ABTest
from repositories.ab_test import ABTestRepository
from services.cache import CacheService

logger = logging.getLogger(__name__)


@dataclass
class VariantSelection:
    variant: TemplateVariant
    test_id: Optional[str] = None
    is_control: bool = False


class ABTestingService:
    """
    A/B testing service for template variants.
    
    Features:
    - Consistent user assignment (same user always sees same variant)
    - Weighted random selection
    - Automatic winner detection
    """
    
    def __init__(
        self,
        ab_test_repo: ABTestRepository,
        cache: CacheService
    ):
        self.tests = ab_test_repo
        self.cache = cache
    
    async def select_variant(
        self,
        template_id: str,
        user_id: str,
        channel: str
    ) -> VariantSelection:
        """
        Select a template variant for a user.
        
        Uses consistent hashing so the same user always
        gets the same variant within a test.
        """
        
        # Check for active A/B test
        test = await self._get_active_test(template_id, channel)
        
        if not test or not test.is_running():
            # No active test - return default variant
            default = await self._get_default_variant(template_id, channel)
            return VariantSelection(variant=default, is_control=True)
        
        # Get active variants
        active_variants = [
            v for v in test.variants 
            if v.status == VariantStatus.ACTIVE and v.weight > 0
        ]
        
        if not active_variants:
            default = await self._get_default_variant(template_id, channel)
            return VariantSelection(variant=default, is_control=True)
        
        # Consistent selection based on user_id
        selected = self._select_by_weight(user_id, test.test_id, active_variants)
        
        is_control = selected.name == "control"
        
        return VariantSelection(
            variant=selected,
            test_id=test.test_id,
            is_control=is_control
        )
    
    def _select_by_weight(
        self,
        user_id: str,
        test_id: str,
        variants: list[TemplateVariant]
    ) -> TemplateVariant:
        """
        Select variant using weighted consistent hashing.
        
        Same user + test always returns same variant.
        """
        
        # Create deterministic hash from user_id + test_id
        hash_input = f"{user_id}:{test_id}"
        hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
        
        # Normalize to 0-100
        bucket = hash_value % 100
        
        # Select based on cumulative weights
        cumulative = 0
        for variant in variants:
            cumulative += variant.weight
            if bucket < cumulative:
                return variant
        
        # Fallback to last variant
        return variants[-1]
    
    async def _get_active_test(
        self,
        template_id: str,
        channel: str
    ) -> Optional[ABTest]:
        """Get active A/B test for template."""
        
        cache_key = f"ab_test:{template_id}:{channel}"
        cached = await self.cache.get(cache_key)
        
        if cached:
            return ABTest.from_dict(cached)
        
        test = await self.tests.get_active_for_template(template_id, channel)
        
        if test:
            await self.cache.set(cache_key, test.to_dict(), ttl=60)
        
        return test
    
    async def _get_default_variant(
        self,
        template_id: str,
        channel: str
    ) -> TemplateVariant:
        """Get the default (control) variant."""
        # Return the primary template as a variant
        pass
    
    async def record_event(
        self,
        variant_id: str,
        event_type: str  # sent, delivered, opened, clicked
    ):
        """Record an event for A/B test metrics."""
        
        field_map = {
            "sent": "sent_count",
            "delivered": "delivered_count",
            "opened": "opened_count",
            "clicked": "clicked_count"
        }
        
        field = field_map.get(event_type)
        if field:
            await self.tests.increment_variant_metric(variant_id, field)
    
    async def check_for_winner(self, test_id: str) -> Optional[str]:
        """
        Check if a test has a statistically significant winner.
        
        Uses chi-squared test for significance.
        """
        test = await self.tests.get(test_id)
        
        if not test or len(test.variants) < 2:
            return None
        
        # Need minimum sample size
        total_sent = sum(v.sent_count for v in test.variants)
        if total_sent < test.min_sample_size:
            return None
        
        # Find best performing variant
        best_variant = max(test.variants, key=lambda v: v.click_rate)
        
        # Check statistical significance
        # (Simplified - real implementation would use proper statistics)
        if self._is_significant(test.variants, best_variant, test.confidence_level):
            return best_variant.variant_id
        
        return None
    
    def _is_significant(
        self,
        variants: list[TemplateVariant],
        best: TemplateVariant,
        confidence: float
    ) -> bool:
        """Check if the best variant is statistically significant."""
        # Simplified significance check
        # Real implementation would use scipy.stats
        
        if best.sent_count < 100:
            return False
        
        # Check if best is significantly better than all others
        for variant in variants:
            if variant.variant_id == best.variant_id:
                continue
            
            # Require at least 10% improvement with enough data
            if variant.sent_count < 100:
                continue
            
            improvement = (best.click_rate - variant.click_rate) / max(variant.click_rate, 0.001)
            if improvement < 0.1:  # Less than 10% improvement
                return False
        
        return True

1.3 Enhanced Template Service

# services/template.py (enhanced)

class EnhancedTemplateService:
    """
    Template service with A/B testing support.
    """
    
    def __init__(
        self,
        template_repo: TemplateRepository,
        ab_testing: ABTestingService,
        cache: CacheService
    ):
        self.templates = template_repo
        self.ab_testing = ab_testing
        self.cache = cache
        self.var_pattern = re.compile(r'\{\{(\w+)\}\}')
    
    async def render(
        self,
        template_name: str,
        channel: str,
        variables: dict,
        user_id: str,
        locale: str = "en-US"
    ) -> dict:
        """
        Render a template with A/B testing.
        
        Returns rendered content and variant info for tracking.
        """
        
        # Select variant (may be from A/B test)
        selection = await self.ab_testing.select_variant(
            template_name,
            user_id,
            channel
        )
        
        variant = selection.variant
        
        # Render the variant
        result = {
            "template_id": variant.template_id,
            "variant_id": variant.variant_id,
            "test_id": selection.test_id,
            "channel": channel,
        }
        
        if variant.title:
            result["title"] = self._substitute(variant.title, variables)
        
        if variant.subject:
            result["subject"] = self._substitute(variant.subject, variables)
        
        if variant.body:
            result["body"] = self._substitute(variant.body, variables)
        
        if variant.html_body:
            result["html_body"] = self._substitute(variant.html_body, variables)
        
        # Record that we sent this variant
        if selection.test_id:
            await self.ab_testing.record_event(variant.variant_id, "sent")
        
        return result
    
    def _substitute(self, text: str, variables: dict) -> str:
        """Substitute variables in text."""
        def replace(match):
            var_name = match.group(1)
            return str(variables.get(var_name, ""))
        return self.var_pattern.sub(replace, text)

Part II: Batching and Digests

Chapter 2: Notification Batching

Instead of sending every notification immediately, some should be batched for better user experience.

2.1 When to Batch

BATCHING USE CASES

1. TRADING NOTIFICATIONS
   User makes 50 trades in a day
   Instead of: 50 separate "Trade executed" notifications
   Send: "You made 50 trades today. View summary →"

2. SOCIAL ACTIVITY
   User gets 20 likes on a post
   Instead of: 20 "X liked your post" notifications
   Send: "20 people liked your post"

3. TRANSACTION SUMMARIES
   User has 10 transactions
   Instead of: 10 separate notifications
   Send: Daily digest at 6pm

4. MARKETING
   Multiple promotional messages
   Instead of: Separate emails
   Send: Weekly newsletter

2.2 Batching Service

# services/batching.py

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
from enum import Enum
import asyncio
import logging

logger = logging.getLogger(__name__)


class BatchStrategy(Enum):
    COUNT = "count"       # Batch after N notifications
    TIME = "time"         # Batch after T seconds
    HYBRID = "hybrid"     # Whichever comes first


@dataclass
class BatchConfig:
    """Configuration for batching notifications."""
    category: str
    strategy: BatchStrategy
    
    # For COUNT strategy
    max_count: int = 10
    
    # For TIME strategy
    window_seconds: int = 300  # 5 minutes
    
    # Template for batched notification
    batch_template: str = ""
    
    # Which fields to aggregate
    aggregate_fields: list[str] = field(default_factory=list)


@dataclass
class PendingBatch:
    """A batch being accumulated."""
    batch_id: str
    user_id: str
    category: str
    notifications: list[dict] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.utcnow)
    
    def add(self, notification: dict):
        self.notifications.append(notification)
    
    @property
    def count(self) -> int:
        return len(self.notifications)
    
    def age_seconds(self) -> float:
        return (datetime.utcnow() - self.created_at).total_seconds()


class BatchingService:
    """
    Service for batching notifications.
    
    Accumulates notifications and sends them together.
    """
    
    # Default batch configs
    DEFAULT_CONFIGS = {
        "social_like": BatchConfig(
            category="social_like",
            strategy=BatchStrategy.HYBRID,
            max_count=10,
            window_seconds=300,
            batch_template="social_likes_batch",
            aggregate_fields=["liker_name", "post_id"]
        ),
        "trade_executed": BatchConfig(
            category="trade_executed",
            strategy=BatchStrategy.TIME,
            window_seconds=60,
            batch_template="trades_batch",
            aggregate_fields=["symbol", "amount", "price"]
        ),
    }
    
    def __init__(
        self,
        redis_client,
        notification_service,
        configs: dict[str, BatchConfig] = None
    ):
        self.redis = redis_client
        self.notifications = notification_service
        self.configs = configs or self.DEFAULT_CONFIGS
        self.running = False
    
    async def should_batch(self, notification: dict) -> bool:
        """Check if this notification should be batched."""
        category = notification.get("category")
        return category in self.configs
    
    async def add_to_batch(self, notification: dict) -> Optional[str]:
        """
        Add notification to batch.
        
        Returns batch_id if added, None if batch was flushed.
        """
        user_id = notification["user_id"]
        category = notification["category"]
        config = self.configs[category]
        
        batch_key = f"batch:{user_id}:{category}"
        
        # Add to batch in Redis
        await self.redis.rpush(batch_key, json.dumps(notification))
        
        # Set expiry if new batch
        if await self.redis.llen(batch_key) == 1:
            await self.redis.expire(batch_key, config.window_seconds + 60)
        
        # Check if batch should be flushed
        count = await self.redis.llen(batch_key)
        
        should_flush = False
        
        if config.strategy == BatchStrategy.COUNT:
            should_flush = count >= config.max_count
            
        elif config.strategy == BatchStrategy.TIME:
            # Time-based batches are flushed by background job
            pass
            
        elif config.strategy == BatchStrategy.HYBRID:
            should_flush = count >= config.max_count
        
        if should_flush:
            await self.flush_batch(user_id, category)
            return None
        
        return batch_key
    
    async def flush_batch(self, user_id: str, category: str):
        """Flush a batch and send the aggregated notification."""
        
        batch_key = f"batch:{user_id}:{category}"
        config = self.configs[category]
        
        # Get all notifications atomically
        pipe = self.redis.pipeline()
        pipe.lrange(batch_key, 0, -1)
        pipe.delete(batch_key)
        results = await pipe.execute()
        
        notifications_data = results[0]
        
        if not notifications_data:
            return
        
        notifications = [json.loads(n) for n in notifications_data]
        
        # Create batched notification
        batched = self._create_batch_notification(
            user_id,
            category,
            notifications,
            config
        )
        
        # Send the batched notification
        await self.notifications.send(batched)
        
        logger.info(
            f"Flushed batch for {user_id}/{category}: "
            f"{len(notifications)} notifications"
        )
    
    def _create_batch_notification(
        self,
        user_id: str,
        category: str,
        notifications: list[dict],
        config: BatchConfig
    ) -> dict:
        """Create a single notification from a batch."""
        
        # Aggregate specified fields
        aggregated = {}
        for field in config.aggregate_fields:
            aggregated[field] = [n.get("variables", {}).get(field) for n in notifications]
        
        # Create variables for batch template
        variables = {
            "count": len(notifications),
            "items": aggregated,
            "first_item": notifications[0].get("variables", {}),
            "last_item": notifications[-1].get("variables", {}),
        }
        
        return {
            "user_id": user_id,
            "type": "batched",
            "category": category,
            "template": config.batch_template,
            "variables": variables,
            "metadata": {
                "batched_count": len(notifications),
                "original_notification_ids": [n.get("notification_id") for n in notifications]
            }
        }
    
    async def start_flush_worker(self):
        """Background worker to flush time-based batches."""
        self.running = True
        
        while self.running:
            try:
                await self._flush_expired_batches()
            except Exception as e:
                logger.error(f"Batch flush error: {e}")
            
            await asyncio.sleep(10)  # Check every 10 seconds
    
    async def _flush_expired_batches(self):
        """Find and flush batches that have exceeded their time window."""
        
        # Scan for batch keys
        cursor = 0
        while True:
            cursor, keys = await self.redis.scan(cursor, match="batch:*")
            
            for key in keys:
                # Parse key: batch:{user_id}:{category}
                parts = key.decode().split(":")
                if len(parts) != 3:
                    continue
                
                _, user_id, category = parts
                
                if category not in self.configs:
                    continue
                
                config = self.configs[category]
                
                # Check batch age
                # (In production, store created_at in batch metadata)
                ttl = await self.redis.ttl(key)
                age = config.window_seconds + 60 - ttl
                
                if age >= config.window_seconds:
                    await self.flush_batch(user_id, category)
            
            if cursor == 0:
                break

Chapter 3: Digest Notifications

Digests are scheduled summaries sent at specific times.

3.1 Digest Configuration

# models/digest.py

from dataclasses import dataclass, field
from datetime import time
from typing import Optional
from enum import Enum


class DigestFrequency(Enum):
    DAILY = "daily"
    WEEKLY = "weekly"
    MONTHLY = "monthly"


@dataclass
class DigestConfig:
    """User's digest preferences."""
    user_id: str
    
    # Enabled digests
    transaction_digest: bool = True
    activity_digest: bool = True
    marketing_digest: bool = False
    
    # Timing
    frequency: DigestFrequency = DigestFrequency.DAILY
    preferred_time: time = time(18, 0)  # 6 PM
    timezone: str = "UTC"
    
    # Day of week for weekly (0=Monday)
    weekly_day: int = 0
    
    # Day of month for monthly
    monthly_day: int = 1


@dataclass
class DigestContent:
    """Content for a digest notification."""
    user_id: str
    digest_type: str
    period_start: datetime
    period_end: datetime
    
    # Summary data
    items: list[dict] = field(default_factory=list)
    summary: dict = field(default_factory=dict)
    
    @property
    def item_count(self) -> int:
        return len(self.items)

3.2 Digest Service

# services/digest.py

from datetime import datetime, timedelta
import pytz
import logging

from models.digest import DigestConfig, DigestContent, DigestFrequency
from repositories.digest_config import DigestConfigRepository
from repositories.notification import NotificationRepository
from services.notification import NotificationService

logger = logging.getLogger(__name__)


class DigestService:
    """
    Service for generating and sending digest notifications.
    
    Runs on a schedule to send daily/weekly/monthly summaries.
    """
    
    def __init__(
        self,
        config_repo: DigestConfigRepository,
        notification_repo: NotificationRepository,
        notification_service: NotificationService
    ):
        self.configs = config_repo
        self.notifications = notification_repo
        self.send = notification_service
    
    async def process_digests(self, frequency: DigestFrequency):
        """
        Process digests for all users with this frequency.
        
        Called by scheduled job at appropriate time.
        """
        
        # Get users who should receive digest now
        users = await self._get_users_for_digest(frequency)
        
        logger.info(f"Processing {frequency.value} digests for {len(users)} users")
        
        for config in users:
            try:
                await self._generate_and_send_digest(config)
            except Exception as e:
                logger.error(f"Failed to send digest to {config.user_id}: {e}")
    
    async def _get_users_for_digest(
        self,
        frequency: DigestFrequency
    ) -> list[DigestConfig]:
        """Get users who should receive digest now."""
        
        now = datetime.utcnow()
        
        # Get all configs with this frequency
        configs = await self.configs.get_by_frequency(frequency)
        
        # Filter by preferred time (with 30-minute window)
        eligible = []
        
        for config in configs:
            user_tz = pytz.timezone(config.timezone)
            user_now = now.astimezone(user_tz)
            
            # Check if current time is within window of preferred time
            pref_hour = config.preferred_time.hour
            pref_minute = config.preferred_time.minute
            
            if (user_now.hour == pref_hour and 
                abs(user_now.minute - pref_minute) < 30):
                
                # For weekly, check day of week
                if frequency == DigestFrequency.WEEKLY:
                    if user_now.weekday() != config.weekly_day:
                        continue
                
                # For monthly, check day of month
                if frequency == DigestFrequency.MONTHLY:
                    if user_now.day != config.monthly_day:
                        continue
                
                eligible.append(config)
        
        return eligible
    
    async def _generate_and_send_digest(self, config: DigestConfig):
        """Generate and send digest for a user."""
        
        # Determine period
        period_end = datetime.utcnow()
        
        if config.frequency == DigestFrequency.DAILY:
            period_start = period_end - timedelta(days=1)
        elif config.frequency == DigestFrequency.WEEKLY:
            period_start = period_end - timedelta(weeks=1)
        else:
            period_start = period_end - timedelta(days=30)
        
        # Generate each enabled digest type
        if config.transaction_digest:
            await self._send_transaction_digest(config, period_start, period_end)
        
        if config.activity_digest:
            await self._send_activity_digest(config, period_start, period_end)
    
    async def _send_transaction_digest(
        self,
        config: DigestConfig,
        period_start: datetime,
        period_end: datetime
    ):
        """Generate and send transaction digest."""
        
        # Get transactions for period
        transactions = await self.notifications.get_by_type_and_period(
            user_id=config.user_id,
            notification_type="transaction",
            start=period_start,
            end=period_end
        )
        
        if not transactions:
            return  # No transactions, no digest
        
        # Calculate summary
        summary = {
            "total_count": len(transactions),
            "total_received": sum(
                t.variables.get("amount", 0) 
                for t in transactions 
                if t.variables.get("direction") == "received"
            ),
            "total_sent": sum(
                t.variables.get("amount", 0)
                for t in transactions
                if t.variables.get("direction") == "sent"
            ),
        }
        
        # Create digest content
        content = DigestContent(
            user_id=config.user_id,
            digest_type="transaction",
            period_start=period_start,
            period_end=period_end,
            items=[t.to_dict() for t in transactions[:10]],  # Top 10
            summary=summary
        )
        
        # Send digest notification
        await self.send.send({
            "user_id": config.user_id,
            "type": "digest",
            "template": "transaction_digest",
            "variables": {
                "period": config.frequency.value,
                "summary": summary,
                "items": content.items,
                "item_count": content.item_count,
            },
            "channels": ["email"],  # Digests typically go to email
        })
        
        logger.info(f"Sent transaction digest to {config.user_id}")

Part III: Scheduling and Delayed Delivery

Chapter 4: Scheduled Notifications

Users and systems need to schedule notifications for future delivery.

4.1 Scheduler Service

# services/scheduler.py

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import asyncio
import logging

from models.notification import Notification, NotificationStatus
from repositories.scheduled_notification import ScheduledNotificationRepository
from services.notification import NotificationService

logger = logging.getLogger(__name__)


@dataclass
class ScheduledNotification:
    """A notification scheduled for future delivery."""
    schedule_id: str
    notification_id: str
    user_id: str
    
    # Schedule details
    scheduled_at: datetime
    timezone: str = "UTC"
    
    # Recurrence (optional)
    is_recurring: bool = False
    recurrence_pattern: Optional[str] = None  # cron expression
    recurrence_end: Optional[datetime] = None
    
    # Status
    status: str = "pending"  # pending, sent, cancelled
    last_sent_at: Optional[datetime] = None
    
    # The notification to send
    notification_payload: dict = None


class SchedulerService:
    """
    Service for scheduled notification delivery.
    
    Uses database polling with leader election for reliability.
    Week 5 Concept: Leader Election
    """
    
    def __init__(
        self,
        scheduled_repo: ScheduledNotificationRepository,
        notification_service: NotificationService,
        leader_election,
        poll_interval_seconds: int = 10
    ):
        self.scheduled = scheduled_repo
        self.notifications = notification_service
        self.leader = leader_election
        self.poll_interval = poll_interval_seconds
        self.running = False
    
    async def schedule(
        self,
        notification: dict,
        scheduled_at: datetime,
        timezone: str = "UTC",
        recurrence: Optional[str] = None
    ) -> str:
        """
        Schedule a notification for future delivery.
        
        Returns schedule_id.
        """
        
        # Convert to UTC if timezone specified
        if timezone != "UTC":
            scheduled_at = self._to_utc(scheduled_at, timezone)
        
        # Validate scheduled time is in future
        if scheduled_at <= datetime.utcnow():
            raise ValueError("scheduled_at must be in the future")
        
        # Create scheduled notification
        scheduled = ScheduledNotification(
            schedule_id=str(uuid.uuid4()),
            notification_id=notification.get("notification_id") or str(uuid.uuid4()),
            user_id=notification["user_id"],
            scheduled_at=scheduled_at,
            timezone=timezone,
            is_recurring=recurrence is not None,
            recurrence_pattern=recurrence,
            notification_payload=notification
        )
        
        await self.scheduled.save(scheduled)
        
        logger.info(
            f"Scheduled notification {scheduled.schedule_id} "
            f"for {scheduled_at}"
        )
        
        return scheduled.schedule_id
    
    async def cancel(self, schedule_id: str) -> bool:
        """Cancel a scheduled notification."""
        
        scheduled = await self.scheduled.get(schedule_id)
        if not scheduled:
            return False
        
        if scheduled.status == "sent":
            return False  # Already sent
        
        await self.scheduled.update_status(schedule_id, "cancelled")
        
        logger.info(f"Cancelled scheduled notification {schedule_id}")
        return True
    
    async def start_processor(self):
        """
        Start the scheduled notification processor.
        
        Only runs on the leader node.
        """
        self.running = True
        
        while self.running:
            try:
                # Only process if we're the leader
                if await self.leader.is_leader():
                    await self._process_due_notifications()
                    
            except Exception as e:
                logger.error(f"Scheduler error: {e}")
            
            await asyncio.sleep(self.poll_interval)
    
    async def _process_due_notifications(self):
        """Process notifications that are due for delivery."""
        
        now = datetime.utcnow()
        
        # Get due notifications (with some buffer for processing time)
        due = await self.scheduled.get_due(
            before=now + timedelta(seconds=5),
            limit=100
        )
        
        for scheduled in due:
            try:
                await self._send_scheduled(scheduled)
            except Exception as e:
                logger.error(
                    f"Failed to send scheduled {scheduled.schedule_id}: {e}"
                )
    
    async def _send_scheduled(self, scheduled: ScheduledNotification):
        """Send a scheduled notification."""
        
        # Send the notification
        await self.notifications.send(scheduled.notification_payload)
        
        # Update status
        if scheduled.is_recurring:
            # Calculate next occurrence
            next_time = self._calculate_next_occurrence(
                scheduled.scheduled_at,
                scheduled.recurrence_pattern
            )
            
            if next_time and (not scheduled.recurrence_end or 
                             next_time <= scheduled.recurrence_end):
                # Schedule next occurrence
                await self.scheduled.update(
                    scheduled.schedule_id,
                    {
                        "scheduled_at": next_time,
                        "last_sent_at": datetime.utcnow()
                    }
                )
            else:
                # Recurrence ended
                await self.scheduled.update_status(scheduled.schedule_id, "completed")
        else:
            # One-time notification - mark as sent
            await self.scheduled.update_status(scheduled.schedule_id, "sent")
        
        logger.info(f"Sent scheduled notification {scheduled.schedule_id}")
    
    def _to_utc(self, dt: datetime, timezone: str) -> datetime:
        """Convert datetime to UTC."""
        import pytz
        tz = pytz.timezone(timezone)
        local_dt = tz.localize(dt)
        return local_dt.astimezone(pytz.UTC).replace(tzinfo=None)
    
    def _calculate_next_occurrence(
        self,
        current: datetime,
        pattern: str
    ) -> Optional[datetime]:
        """Calculate next occurrence from cron pattern."""
        from croniter import croniter
        
        try:
            cron = croniter(pattern, current)
            return cron.get_next(datetime)
        except Exception:
            return None

4.2 "Best Time to Send" Feature

# services/send_time_optimizer.py

from datetime import datetime, time, timedelta
from typing import Optional
import pytz

from repositories.user_engagement import UserEngagementRepository


class SendTimeOptimizer:
    """
    Optimizes notification delivery time based on user engagement patterns.
    
    Analyzes when users typically engage with notifications
    and suggests optimal send times.
    """
    
    def __init__(self, engagement_repo: UserEngagementRepository):
        self.engagement = engagement_repo
    
    async def get_optimal_time(
        self,
        user_id: str,
        channel: str,
        default_time: Optional[time] = None
    ) -> time:
        """
        Get optimal send time for a user.
        
        Based on historical engagement patterns.
        """
        
        # Get user's engagement history
        history = await self.engagement.get_hourly_engagement(
            user_id,
            channel,
            days=30
        )
        
        if not history or sum(history.values()) < 10:
            # Not enough data - use default or fallback
            return default_time or time(10, 0)  # 10 AM default
        
        # Find hour with highest engagement
        best_hour = max(history, key=history.get)
        
        # Add some randomization to avoid thundering herd
        import random
        minute = random.randint(0, 59)
        
        return time(best_hour, minute)
    
    async def record_engagement(
        self,
        user_id: str,
        channel: str,
        engaged_at: datetime
    ):
        """Record when a user engaged with a notification."""
        
        hour = engaged_at.hour
        await self.engagement.increment(user_id, channel, hour)

Part IV: Multi-Channel Orchestration

Chapter 5: Saga Pattern for Multi-Channel Delivery

Critical notifications need to reach the user through multiple channels with proper coordination.

5.1 Multi-Channel Saga

# services/multichannel_saga.py

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
import asyncio
import logging

logger = logging.getLogger(__name__)


class ChannelStatus(Enum):
    PENDING = "pending"
    SENDING = "sending"
    SENT = "sent"
    DELIVERED = "delivered"
    FAILED = "failed"
    SKIPPED = "skipped"


@dataclass
class ChannelAttempt:
    """Track delivery attempt for one channel."""
    channel: str
    status: ChannelStatus = ChannelStatus.PENDING
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    provider_message_id: Optional[str] = None
    error: Optional[str] = None


@dataclass
class MultiChannelDelivery:
    """
    Orchestrates delivery across multiple channels.
    
    Week 5 Concept: Saga Pattern
    """
    delivery_id: str
    notification_id: str
    user_id: str
    
    # Channel configuration
    primary_channels: list[str] = field(default_factory=list)  # Send simultaneously
    fallback_channels: list[str] = field(default_factory=list)  # Try if primary fails
    
    # Timing
    fallback_delay_seconds: int = 60  # Wait before trying fallback
    
    # State
    channel_attempts: dict[str, ChannelAttempt] = field(default_factory=dict)
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    
    # Result
    any_delivered: bool = False
    all_failed: bool = False


class MultiChannelSagaService:
    """
    Saga orchestrator for multi-channel notification delivery.
    
    Handles:
    - Simultaneous delivery to multiple primary channels
    - Fallback to secondary channels on failure
    - Tracking and status aggregation
    """
    
    def __init__(
        self,
        channel_workers: dict,  # channel -> worker
        delivery_repo,
        notification_service
    ):
        self.workers = channel_workers
        self.deliveries = delivery_repo
        self.notifications = notification_service
    
    async def execute(
        self,
        notification: dict,
        primary_channels: list[str],
        fallback_channels: list[str] = None,
        fallback_delay: int = 60
    ) -> MultiChannelDelivery:
        """
        Execute multi-channel delivery saga.
        
        Strategy:
        1. Send to all primary channels simultaneously
        2. If any primary succeeds, mark as delivered
        3. If all primary fail, wait and try fallbacks
        4. Track all attempts for debugging
        """
        
        delivery = MultiChannelDelivery(
            delivery_id=str(uuid.uuid4()),
            notification_id=notification["notification_id"],
            user_id=notification["user_id"],
            primary_channels=primary_channels,
            fallback_channels=fallback_channels or [],
            fallback_delay_seconds=fallback_delay,
            started_at=datetime.utcnow()
        )
        
        # Initialize channel attempts
        for channel in primary_channels + (fallback_channels or []):
            delivery.channel_attempts[channel] = ChannelAttempt(channel=channel)
        
        # Save initial state
        await self.deliveries.save(delivery)
        
        # Step 1: Send to primary channels simultaneously
        primary_results = await self._send_to_channels(
            notification,
            primary_channels,
            delivery
        )
        
        # Check if any primary succeeded
        primary_success = any(
            r.status in [ChannelStatus.SENT, ChannelStatus.DELIVERED]
            for r in primary_results
        )
        
        if primary_success:
            delivery.any_delivered = True
            delivery.completed_at = datetime.utcnow()
            await self.deliveries.update(delivery)
            return delivery
        
        # Step 2: All primary failed - try fallbacks
        if fallback_channels:
            logger.info(
                f"Primary channels failed for {delivery.delivery_id}, "
                f"trying fallbacks in {fallback_delay}s"
            )
            
            # Wait before fallback (give primary channels time to recover)
            await asyncio.sleep(fallback_delay)
            
            # Check again if any primary succeeded (via webhook callback)
            delivery = await self.deliveries.get(delivery.delivery_id)
            if delivery.any_delivered:
                return delivery
            
            # Try fallback channels
            fallback_results = await self._send_to_channels(
                notification,
                fallback_channels,
                delivery
            )
            
            fallback_success = any(
                r.status in [ChannelStatus.SENT, ChannelStatus.DELIVERED]
                for r in fallback_results
            )
            
            if fallback_success:
                delivery.any_delivered = True
        
        # All channels failed
        if not delivery.any_delivered:
            delivery.all_failed = True
            logger.error(
                f"All channels failed for notification {notification['notification_id']}"
            )
        
        delivery.completed_at = datetime.utcnow()
        await self.deliveries.update(delivery)
        
        return delivery
    
    async def _send_to_channels(
        self,
        notification: dict,
        channels: list[str],
        delivery: MultiChannelDelivery
    ) -> list[ChannelAttempt]:
        """Send to multiple channels simultaneously."""
        
        tasks = []
        for channel in channels:
            task = self._send_to_channel(notification, channel, delivery)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return [
            r if isinstance(r, ChannelAttempt) 
            else ChannelAttempt(channel=channels[i], status=ChannelStatus.FAILED, error=str(r))
            for i, r in enumerate(results)
        ]
    
    async def _send_to_channel(
        self,
        notification: dict,
        channel: str,
        delivery: MultiChannelDelivery
    ) -> ChannelAttempt:
        """Send to a single channel."""
        
        attempt = delivery.channel_attempts[channel]
        attempt.status = ChannelStatus.SENDING
        attempt.started_at = datetime.utcnow()
        
        try:
            worker = self.workers.get(channel)
            if not worker:
                raise ValueError(f"No worker for channel: {channel}")
            
            result = await worker.deliver(notification)
            
            if result.success:
                attempt.status = ChannelStatus.SENT
                attempt.provider_message_id = result.provider_message_id
            else:
                attempt.status = ChannelStatus.FAILED
                attempt.error = result.error_message
                
        except Exception as e:
            attempt.status = ChannelStatus.FAILED
            attempt.error = str(e)
        
        attempt.completed_at = datetime.utcnow()
        
        # Update delivery state
        await self.deliveries.update_channel_attempt(
            delivery.delivery_id,
            channel,
            attempt
        )
        
        return attempt
    
    async def handle_delivery_callback(
        self,
        notification_id: str,
        channel: str,
        status: ChannelStatus
    ):
        """
        Handle delivery confirmation callback from provider.
        
        Updates saga state when delivery is confirmed.
        """
        
        delivery = await self.deliveries.get_by_notification(notification_id)
        if not delivery:
            return
        
        if channel in delivery.channel_attempts:
            delivery.channel_attempts[channel].status = status
            
            if status == ChannelStatus.DELIVERED:
                delivery.any_delivered = True
            
            await self.deliveries.update(delivery)

5.2 Escalation Workflows

# services/escalation.py

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import logging

logger = logging.getLogger(__name__)


@dataclass
class EscalationRule:
    """Rule for notification escalation."""
    name: str
    trigger_condition: str  # no_response, no_open, no_click
    wait_duration: timedelta
    escalation_channel: str
    max_escalations: int = 3


class EscalationService:
    """
    Handles notification escalation for critical messages.
    
    If user doesn't respond to notification within time window,
    escalate to more intrusive channel.
    
    Example: Push → SMS → Phone call (for fraud alerts)
    """
    
    ESCALATION_RULES = {
        "security_alert": [
            EscalationRule(
                name="push_to_sms",
                trigger_condition="no_open",
                wait_duration=timedelta(minutes=5),
                escalation_channel="sms"
            ),
            EscalationRule(
                name="sms_to_call",
                trigger_condition="no_response",
                wait_duration=timedelta(minutes=15),
                escalation_channel="voice_call"
            ),
        ],
        "payment_confirmation": [
            EscalationRule(
                name="push_to_email",
                trigger_condition="no_open",
                wait_duration=timedelta(hours=1),
                escalation_channel="email"
            ),
        ],
    }
    
    def __init__(
        self,
        notification_repo,
        notification_service,
        scheduler
    ):
        self.notifications = notification_repo
        self.send = notification_service
        self.scheduler = scheduler
    
    async def setup_escalation(
        self,
        notification_id: str,
        category: str
    ):
        """Set up escalation workflow for a notification."""
        
        rules = self.ESCALATION_RULES.get(category, [])
        
        for i, rule in enumerate(rules):
            # Schedule escalation check
            check_time = datetime.utcnow() + rule.wait_duration
            
            await self.scheduler.schedule(
                {
                    "type": "escalation_check",
                    "notification_id": notification_id,
                    "rule_index": i,
                    "category": category,
                },
                scheduled_at=check_time
            )
    
    async def check_escalation(
        self,
        notification_id: str,
        rule_index: int,
        category: str
    ):
        """Check if escalation is needed and execute if so."""
        
        rules = self.ESCALATION_RULES.get(category, [])
        if rule_index >= len(rules):
            return
        
        rule = rules[rule_index]
        
        # Get notification status
        notification = await self.notifications.get(notification_id)
        if not notification:
            return
        
        # Check trigger condition
        should_escalate = False
        
        if rule.trigger_condition == "no_open":
            should_escalate = notification.read_at is None
            
        elif rule.trigger_condition == "no_response":
            # Check if user took any action
            should_escalate = not notification.metadata.get("user_responded", False)
        
        if should_escalate:
            # Send via escalation channel
            await self.send.send({
                "user_id": notification.user_id,
                "type": notification.type,
                "template": f"{notification.template}_escalation",
                "variables": notification.variables,
                "channels": [rule.escalation_channel],
                "metadata": {
                    "escalation_level": rule_index + 1,
                    "original_notification_id": notification_id,
                }
            })
            
            logger.info(
                f"Escalated notification {notification_id} to {rule.escalation_channel}"
            )

Part V: Real-Time In-App Notifications

Chapter 6: Notification Center

6.1 In-App Notification Model

# models/in_app.py

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List
from enum import Enum


class InAppNotificationState(Enum):
    UNREAD = "unread"
    READ = "read"
    ARCHIVED = "archived"


@dataclass
class InAppNotification:
    """In-app notification for notification center."""
    notification_id: str
    user_id: str
    
    # Content
    title: str
    body: str
    icon: Optional[str] = None
    image_url: Optional[str] = None
    action_url: Optional[str] = None
    
    # Categorization
    category: str = "general"
    priority: str = "medium"
    
    # State
    state: InAppNotificationState = InAppNotificationState.UNREAD
    
    # Timestamps
    created_at: datetime = field(default_factory=datetime.utcnow)
    read_at: Optional[datetime] = None
    archived_at: Optional[datetime] = None
    
    # Grouping
    group_key: Optional[str] = None  # For notification grouping
    
    # Metadata
    metadata: dict = field(default_factory=dict)


@dataclass
class NotificationGroup:
    """Grouped notifications in notification center."""
    group_key: str
    title: str
    notifications: List[InAppNotification]
    unread_count: int
    latest_at: datetime

6.2 Notification Center Service

# services/notification_center.py

from datetime import datetime
from typing import Optional, List
import json
import logging

from models.in_app import InAppNotification, InAppNotificationState, NotificationGroup

logger = logging.getLogger(__name__)


class NotificationCenterService:
    """
    Service for managing in-app notification center.
    
    Features:
    - Store notifications for offline users
    - Track read/unread state
    - Group similar notifications
    - Sync across devices
    """
    
    def __init__(
        self,
        redis_client,
        websocket_gateway,
        max_notifications: int = 100
    ):
        self.redis = redis_client
        self.ws = websocket_gateway
        self.max_notifications = max_notifications
    
    async def add_notification(
        self,
        notification: InAppNotification
    ) -> bool:
        """
        Add notification to user's notification center.
        
        Also pushes via WebSocket if user is online.
        """
        
        user_id = notification.user_id
        
        # Store in Redis list
        key = f"inbox:{user_id}"
        
        await self.redis.lpush(key, json.dumps(notification.to_dict()))
        
        # Trim to max size
        await self.redis.ltrim(key, 0, self.max_notifications - 1)
        
        # Increment unread count
        await self.redis.incr(f"unread:{user_id}")
        
        # Push via WebSocket if online
        if await self._is_user_online(user_id):
            await self.ws.send_to_user(
                user_id,
                {
                    "type": "notification",
                    "payload": notification.to_dict()
                }
            )
        
        return True
    
    async def get_notifications(
        self,
        user_id: str,
        limit: int = 50,
        offset: int = 0,
        state: Optional[InAppNotificationState] = None
    ) -> List[InAppNotification]:
        """Get notifications for user's notification center."""
        
        key = f"inbox:{user_id}"
        
        # Get from Redis
        data = await self.redis.lrange(key, offset, offset + limit - 1)
        
        notifications = []
        for item in data:
            notif = InAppNotification.from_dict(json.loads(item))
            
            # Filter by state if specified
            if state and notif.state != state:
                continue
            
            notifications.append(notif)
        
        return notifications
    
    async def get_grouped_notifications(
        self,
        user_id: str,
        limit: int = 50
    ) -> List[NotificationGroup]:
        """Get notifications grouped by category/type."""
        
        notifications = await self.get_notifications(user_id, limit=limit)
        
        # Group by group_key or category
        groups = {}
        
        for notif in notifications:
            key = notif.group_key or notif.category
            
            if key not in groups:
                groups[key] = NotificationGroup(
                    group_key=key,
                    title=self._get_group_title(key),
                    notifications=[],
                    unread_count=0,
                    latest_at=notif.created_at
                )
            
            groups[key].notifications.append(notif)
            
            if notif.state == InAppNotificationState.UNREAD:
                groups[key].unread_count += 1
            
            if notif.created_at > groups[key].latest_at:
                groups[key].latest_at = notif.created_at
        
        # Sort by latest
        sorted_groups = sorted(
            groups.values(),
            key=lambda g: g.latest_at,
            reverse=True
        )
        
        return sorted_groups
    
    async def mark_as_read(
        self,
        user_id: str,
        notification_ids: List[str]
    ) -> int:
        """Mark notifications as read."""
        
        key = f"inbox:{user_id}"
        marked_count = 0
        
        # Get all notifications
        data = await self.redis.lrange(key, 0, -1)
        
        updated = []
        for item in data:
            notif = json.loads(item)
            
            if notif["notification_id"] in notification_ids:
                if notif["state"] == InAppNotificationState.UNREAD.value:
                    notif["state"] = InAppNotificationState.READ.value
                    notif["read_at"] = datetime.utcnow().isoformat()
                    marked_count += 1
            
            updated.append(json.dumps(notif))
        
        # Update list
        if updated:
            pipe = self.redis.pipeline()
            pipe.delete(key)
            pipe.rpush(key, *updated)
            await pipe.execute()
        
        # Decrement unread count
        if marked_count > 0:
            await self.redis.decrby(f"unread:{user_id}", marked_count)
        
        return marked_count
    
    async def mark_all_as_read(self, user_id: str) -> int:
        """Mark all notifications as read."""
        
        notifications = await self.get_notifications(user_id, limit=self.max_notifications)
        
        unread_ids = [
            n.notification_id 
            for n in notifications 
            if n.state == InAppNotificationState.UNREAD
        ]
        
        if unread_ids:
            return await self.mark_as_read(user_id, unread_ids)
        
        return 0
    
    async def get_unread_count(self, user_id: str) -> int:
        """Get unread notification count."""
        count = await self.redis.get(f"unread:{user_id}")
        return int(count) if count else 0
    
    async def delete_notification(
        self,
        user_id: str,
        notification_id: str
    ) -> bool:
        """Delete a notification."""
        
        key = f"inbox:{user_id}"
        
        # Find and remove
        data = await self.redis.lrange(key, 0, -1)
        
        for item in data:
            notif = json.loads(item)
            if notif["notification_id"] == notification_id:
                await self.redis.lrem(key, 1, item)
                
                # Update unread count if was unread
                if notif["state"] == InAppNotificationState.UNREAD.value:
                    await self.redis.decr(f"unread:{user_id}")
                
                return True
        
        return False
    
    async def _is_user_online(self, user_id: str) -> bool:
        """Check if user has active WebSocket connection."""
        return await self.redis.exists(f"online:{user_id}")
    
    def _get_group_title(self, group_key: str) -> str:
        """Get display title for notification group."""
        titles = {
            "transaction": "Transactions",
            "social": "Social",
            "marketing": "Promotions",
            "security": "Security",
        }
        return titles.get(group_key, group_key.replace("_", " ").title())

6.3 WebSocket Gateway

# services/websocket_gateway.py

import asyncio
import json
from datetime import datetime
from typing import Dict, Optional
import logging
from fastapi import WebSocket, WebSocketDisconnect

logger = logging.getLogger(__name__)


class WebSocketGateway:
    """
    WebSocket gateway for real-time notifications.
    
    Manages WebSocket connections and message delivery.
    """
    
    def __init__(self, redis_client):
        self.redis = redis_client
        self.connections: Dict[str, WebSocket] = {}  # user_id -> websocket
        self.heartbeat_interval = 30  # seconds
    
    async def connect(self, websocket: WebSocket, user_id: str):
        """Handle new WebSocket connection."""
        
        await websocket.accept()
        
        self.connections[user_id] = websocket
        
        # Mark user as online in Redis (for distributed setup)
        await self.redis.setex(
            f"online:{user_id}",
            self.heartbeat_interval + 10,
            self._get_server_id()
        )
        
        logger.info(f"User {user_id} connected via WebSocket")
        
        # Send initial state
        await self._send_initial_state(websocket, user_id)
        
        # Start heartbeat
        asyncio.create_task(self._heartbeat_loop(user_id))
    
    async def disconnect(self, user_id: str):
        """Handle WebSocket disconnection."""
        
        if user_id in self.connections:
            del self.connections[user_id]
        
        await self.redis.delete(f"online:{user_id}")
        
        logger.info(f"User {user_id} disconnected")
    
    async def send_to_user(
        self,
        user_id: str,
        message: dict
    ) -> bool:
        """Send message to a specific user."""
        
        # Check local connections first
        if user_id in self.connections:
            websocket = self.connections[user_id]
            try:
                await websocket.send_json(message)
                return True
            except Exception as e:
                logger.error(f"Failed to send to {user_id}: {e}")
                await self.disconnect(user_id)
                return False
        
        # User might be connected to different server
        # Publish to Redis pub/sub for distributed delivery
        await self.redis.publish(
            f"ws:{user_id}",
            json.dumps(message)
        )
        
        return True
    
    async def broadcast(
        self,
        user_ids: list[str],
        message: dict
    ):
        """Broadcast message to multiple users."""
        
        tasks = [
            self.send_to_user(user_id, message)
            for user_id in user_ids
        ]
        
        await asyncio.gather(*tasks, return_exceptions=True)
    
    async def handle_message(
        self,
        user_id: str,
        message: dict
    ):
        """Handle incoming WebSocket message from client."""
        
        msg_type = message.get("type")
        
        if msg_type == "ping":
            await self.send_to_user(user_id, {"type": "pong"})
            
        elif msg_type == "mark_read":
            notification_ids = message.get("notification_ids", [])
            # Handle mark as read
            
        elif msg_type == "subscribe":
            # Handle subscription to specific channels
            pass
    
    async def _send_initial_state(self, websocket: WebSocket, user_id: str):
        """Send initial state when user connects."""
        
        # Get unread count
        unread_count = await self.redis.get(f"unread:{user_id}")
        
        await websocket.send_json({
            "type": "init",
            "payload": {
                "unread_count": int(unread_count) if unread_count else 0,
                "connected_at": datetime.utcnow().isoformat(),
            }
        })
    
    async def _heartbeat_loop(self, user_id: str):
        """Send periodic heartbeats to keep connection alive."""
        
        while user_id in self.connections:
            await asyncio.sleep(self.heartbeat_interval)
            
            if user_id not in self.connections:
                break
            
            try:
                await self.connections[user_id].send_json({"type": "ping"})
                
                # Refresh online status in Redis
                await self.redis.setex(
                    f"online:{user_id}",
                    self.heartbeat_interval + 10,
                    self._get_server_id()
                )
                
            except Exception:
                await self.disconnect(user_id)
                break
    
    def _get_server_id(self) -> str:
        """Get unique identifier for this server instance."""
        import socket
        return socket.gethostname()


# FastAPI WebSocket endpoint
from fastapi import APIRouter

router = APIRouter()

@router.websocket("/ws/{user_id}")
async def websocket_endpoint(
    websocket: WebSocket,
    user_id: str,
    gateway: WebSocketGateway
):
    await gateway.connect(websocket, user_id)
    
    try:
        while True:
            data = await websocket.receive_json()
            await gateway.handle_message(user_id, data)
            
    except WebSocketDisconnect:
        await gateway.disconnect(user_id)

Part VI: User Preference Management

Chapter 7: Preference Service

7.1 Hierarchical Preferences

# services/preferences.py

from dataclasses import dataclass
from typing import Optional, Dict, Any
import logging

from models.preferences import UserPreferences, ChannelPreference, CategoryPreference
from repositories.preferences import PreferencesRepository
from services.cache import CacheService

logger = logging.getLogger(__name__)


class PreferenceService:
    """
    Manages user notification preferences.
    
    Features:
    - Hierarchical preferences (global → category → channel)
    - Optimistic locking for concurrent updates
    - Cache with invalidation
    
    Week 5 Concept: Consistency (Optimistic Locking)
    """
    
    def __init__(
        self,
        preference_repo: PreferencesRepository,
        cache: CacheService
    ):
        self.preferences = preference_repo
        self.cache = cache
    
    async def get_preferences(self, user_id: str) -> UserPreferences:
        """Get user preferences with caching."""
        
        # Check cache
        cache_key = f"prefs:{user_id}"
        cached = await self.cache.get(cache_key)
        
        if cached:
            return UserPreferences.from_dict(cached)
        
        # Get from database
        prefs = await self.preferences.get(user_id)
        
        if not prefs:
            # Create default preferences
            prefs = UserPreferences(user_id=user_id)
            await self.preferences.save(prefs)
        
        # Cache
        await self.cache.set(cache_key, prefs.to_dict(), ttl=300)
        
        return prefs
    
    async def update_preferences(
        self,
        user_id: str,
        updates: Dict[str, Any],
        expected_version: int
    ) -> UserPreferences:
        """
        Update user preferences with optimistic locking.
        
        Raises ConflictError if version mismatch.
        """
        
        # Get current preferences
        current = await self.preferences.get(user_id)
        
        if not current:
            raise NotFoundError(f"Preferences not found for {user_id}")
        
        # Check version for optimistic locking
        if current.version != expected_version:
            raise ConflictError(
                f"Version mismatch: expected {expected_version}, got {current.version}"
            )
        
        # Apply updates
        for key, value in updates.items():
            if hasattr(current, key):
                setattr(current, key, value)
        
        # Increment version
        current.version += 1
        
        # Save
        await self.preferences.update(current)
        
        # Invalidate cache
        await self.cache.delete(f"prefs:{user_id}")
        
        logger.info(f"Updated preferences for {user_id} to version {current.version}")
        
        return current
    
    async def set_channel_preference(
        self,
        user_id: str,
        channel: str,
        enabled: bool,
        expected_version: int
    ) -> UserPreferences:
        """Set preference for a specific channel."""
        
        prefs = await self.get_preferences(user_id)
        
        if channel not in prefs.channels:
            prefs.channels[channel] = ChannelPreference()
        
        prefs.channels[channel].enabled = enabled
        
        return await self.update_preferences(
            user_id,
            {"channels": prefs.channels},
            expected_version
        )
    
    async def set_category_preference(
        self,
        user_id: str,
        category: str,
        enabled: bool,
        channels: list[str],
        expected_version: int
    ) -> UserPreferences:
        """Set preference for a notification category."""
        
        prefs = await self.get_preferences(user_id)
        
        prefs.categories[category] = CategoryPreference(
            enabled=enabled,
            channels=channels
        )
        
        return await self.update_preferences(
            user_id,
            {"categories": prefs.categories},
            expected_version
        )
    
    async def set_quiet_hours(
        self,
        user_id: str,
        enabled: bool,
        start: Optional[str] = None,  # "22:00"
        end: Optional[str] = None,    # "08:00"
        expected_version: int = None
    ) -> UserPreferences:
        """Set quiet hours configuration."""
        
        prefs = await self.get_preferences(user_id)
        
        prefs.quiet_hours.enabled = enabled
        
        if start:
            prefs.quiet_hours.start = time.fromisoformat(start)
        if end:
            prefs.quiet_hours.end = time.fromisoformat(end)
        
        return await self.update_preferences(
            user_id,
            {"quiet_hours": prefs.quiet_hours},
            expected_version or prefs.version
        )
    
    async def unsubscribe(
        self,
        user_id: str,
        category: str,
        channel: Optional[str] = None
    ):
        """
        Handle unsubscribe request.
        
        If channel specified, unsubscribe from that channel for category.
        Otherwise, unsubscribe from category entirely.
        """
        
        prefs = await self.get_preferences(user_id)
        
        if category not in prefs.categories:
            prefs.categories[category] = CategoryPreference()
        
        if channel:
            # Remove channel from category
            if channel in prefs.categories[category].channels:
                prefs.categories[category].channels.remove(channel)
        else:
            # Disable entire category
            prefs.categories[category].enabled = False
        
        await self.update_preferences(
            user_id,
            {"categories": prefs.categories},
            prefs.version
        )
        
        logger.info(f"User {user_id} unsubscribed from {category}/{channel or 'all'}")
    
    def is_notification_allowed(
        self,
        prefs: UserPreferences,
        category: str,
        channel: str
    ) -> bool:
        """Check if notification is allowed based on preferences."""
        
        # Global check
        if not prefs.global_enabled:
            return False
        
        # Channel check
        channel_pref = prefs.channels.get(channel)
        if channel_pref and not channel_pref.enabled:
            return False
        
        # Category check
        cat_pref = prefs.categories.get(category)
        if cat_pref:
            if not cat_pref.enabled:
                return False
            if channel not in cat_pref.channels:
                return False
        
        return True

Summary

What We Built Today

DAY 3 SUMMARY: ADVANCED FEATURES

TEMPLATE SYSTEM
├── A/B testing with variants
├── Weighted random selection
├── Consistent user assignment
└── Statistical significance checking

BATCHING
├── Count-based batching
├── Time-window batching
├── Hybrid strategy
└── Automatic flush worker

DIGESTS
├── Daily/weekly/monthly summaries
├── Timezone-aware scheduling
├── Configurable per user
└── Category-specific digests

SCHEDULING
├── Future delivery scheduling
├── Recurring notifications (cron)
├── Timezone conversion
├── Cancellation support

MULTI-CHANNEL SAGA
├── Simultaneous primary channels
├── Fallback on failure
├── Delivery tracking per channel
└── Escalation workflows

REAL-TIME IN-APP
├── Notification center
├── Read/unread state
├── Notification grouping
├── WebSocket delivery

PREFERENCES
├── Hierarchical (global → category → channel)
├── Quiet hours
├── Optimistic locking
└── Unsubscribe handling

Week 1-5 Concepts Applied

Concept Where Applied
Leader Election (Week 5) Scheduler processor runs only on leader
Saga Pattern (Week 5) Multi-channel delivery orchestration
Optimistic Locking (Week 5) Preference updates with version check
Caching (Week 4) Template and preference caching
Background Workers (Week 3) Batch flush, digest generation

What's Coming Tomorrow

Day 4: Scale, Reliability & Edge Cases"What breaks at scale? Everything."

  • Scaling challenges (campaigns, hot users)
  • Circuit breakers and provider failover
  • Every edge case you'll face in production
  • Failure scenarios and recovery
  • Cost optimization strategies

Interview Tip of the Day

INTERVIEW TIP: SHOW DEPTH ON ADVANCED FEATURES

When asked about a notification system, mention advanced features:

"Beyond basic delivery, a production notification system needs:

1. A/B testing for optimizing message engagement
2. Batching to avoid overwhelming users
3. Scheduled delivery with timezone awareness
4. Multi-channel orchestration with fallbacks

For example, a security alert might go to push AND SMS 
simultaneously. If neither gets a response in 5 minutes, 
we escalate to a phone call. This is a saga pattern where 
we track each channel's state independently."

This shows:
- You've built real systems
- You think about user experience
- You understand operational complexity

End of Week 6, Day 3

Tomorrow: Day 4 — Scale, Reliability & Edge Cases