Himanshu Kukreja
0%
Day 04

Week 4 — Day 4: Feed Caching

System Design Mastery Series


Preface

Yesterday, you learned to prevent thundering herds. You can now protect your database from cache stampedes.

Today, we tackle one of the most interesting caching problems in system design: personalized feeds.

THE FEED PROBLEM

Your social media app has 100 million users.
Each user has a personalized feed.
Each user follows between 10 and 10,000,000 other users.

When a user opens the app:
  "Show me the latest posts from people I follow,
   sorted by time (or relevance), instantly."

Seems simple. Let's do the math:

User opens app:
  → Get list of 500 accounts they follow
  → Fetch recent posts from each account
  → Merge and sort
  → Return top 50 posts

Query time per account: 10ms
Total time: 500 × 10ms = 5 seconds

5 seconds to load a feed? Users will leave.

Okay, let's cache the feed!

But wait:
  → 100 million users
  → Each feed: 50 posts × 1KB = 50KB
  → Total cache: 100M × 50KB = 5 petabytes

5 petabytes of cache? That's expensive.

And when someone posts:
  → Celebrity with 50 million followers posts
  → Update 50 million cached feeds?
  → That's 50 million cache writes for one post!

This is the feed caching problem.

Today, you'll learn how Twitter, Instagram, and LinkedIn solve this — and when to use each approach.


Part I: Foundations

Chapter 1: Understanding Feeds

1.1 What Is a Feed?

A feed is a personalized, time-ordered (or relevance-ordered) list of content from sources a user has chosen to follow.

FEED ANATOMY

┌───────────────────────────────────────────────────────────┐
│                      YOUR FEED                            │
├───────────────────────────────────────────────────────────┤
│                                                           │
│  ┌────────────────────────────────────────────────────┐   │
│  │ @celebrity · 2 min ago                             │   │
│  │ Just landed in Paris! 🗼                           │   │
│  │ ❤️ 45,231  💬 2,891  🔄 5,632                       │   │
│  └────────────────────────────────────────────────────┘   │
│                                                           │
│  ┌────────────────────────────────────────────────────┐   │
│  │ @friend · 15 min ago                               │   │
│  │ Great coffee at the new place downtown             │   │
│  │ ❤️ 12  💬 3  🔄 0                                   │   │
│  └────────────────────────────────────────────────────┘   │
│                                                           │
│  ┌────────────────────────────────────────────────────┐   │
│  │ @news_outlet · 1 hour ago                          │   │
│  │ Breaking: Major announcement expected...           │   │
│  │ ❤️ 8,432  💬 1,203  🔄 3,891                        │   │
│  └────────────────────────────────────────────────────┘   │
│                                                           │
│                    [Load More...]                         │
│                                                           │
└───────────────────────────────────────────────────────────┘

Components:
  - Posts from accounts user follows
  - Ordered by time (or relevance algorithm)
  - Paginated (load more on scroll)
  - Updated in near real-time

1.2 The Core Challenge

THE FEED CHALLENGE: TWO COMPETING CONCERNS

READ LATENCY:
  User opens app → Feed appears instantly (<200ms)
  Users won't wait. Every 100ms delay = users lost.

WRITE FANOUT:
  User posts → Appears in all followers' feeds
  Celebrity posts → 50 million feeds need updating

These goals conflict:

Fast reads (pre-compute everything):
  ✓ Instant feed retrieval
  ✗ Massive write amplification (celebrity problem)
  ✗ Huge storage for all pre-computed feeds

Fast writes (compute on demand):
  ✓ Single write per post
  ✗ Slow reads (must query many sources)
  ✗ Database load at read time

1.3 The Numbers

Let's understand the scale:

TWITTER-SCALE NUMBERS (approximate)

Users:                    500 million
Daily Active Users:       250 million
Tweets per day:           500 million
Tweets per second:        ~6,000

Average followers:        200
Median followers:         50
Celebrity followers:      50+ million (top accounts)

Feed requests per day:    Billions
Feed latency target:      <200ms

The math problem:
  If we pre-compute feeds:
    500M tweets × 200 avg followers = 100 billion feed insertions/day
    = 1.1 million feed insertions/second
    
  If we compute on read:
    Billions of feed reads
    Each read: query 200 followees, merge, sort
    = Trillions of database operations/day

1.4 Key Terminology

Term Definition
Fan-out Distributing a post to multiple feeds
Fan-out on write Pre-compute feeds when post is created
Fan-out on read Compute feeds when user requests
Write amplification One write causes many downstream writes
Read amplification One read causes many downstream reads
Hot user User with many followers (celebrity)
Cold user User with few followers or inactive
Timeline Another term for feed

Chapter 2: The Two Fundamental Approaches

2.1 Approach 1: Fan-Out on Write (Push Model)

When a user posts, immediately push to all followers' feeds.

FAN-OUT ON WRITE

User A posts "Hello World"
     │
     ▼
┌─────────────────────────────────────────────────────────────┐
│                     POST SERVICE                            │
│                                                             │
│  1. Save post to Posts table                                │
│  2. Get A's followers: [B, C, D, E, ...]                    │
│  3. For each follower, add post to their feed cache         │
│                                                             │
└─────────────────────────────────────────────────────────────┘
     │
     ├──────────────────┬──────────────────┬─────────────────┐
     │                  │                  │                 │
     ▼                  ▼                  ▼                 ▼
┌─────────┐      ┌─────────┐      ┌─────────┐      ┌─────────┐
│ B's Feed│      │ C's Feed│      │ D's Feed│      │ E's Feed│
│ Cache   │      │ Cache   │      │ Cache   │      │ Cache   │
│         │      │         │      │         │      │         │
│ [A:post]│      │ [A:post]│      │ [A:post]│      │ [A:post]│
└─────────┘      └─────────┘      └─────────┘      └─────────┘

When B opens app:
  → Read B's feed cache directly
  → Instant! No computation needed

Implementation:

# Fan-Out on Write Implementation

class FanOutOnWriteFeedService:
    """
    Push-based feed system.
    
    When a user posts, we immediately push the post
    to all their followers' feed caches.
    """
    
    def __init__(self, redis_client, db_client, queue_client):
        self.redis = redis_client
        self.db = db_client
        self.queue = queue_client
        
        self.feed_size_limit = 800  # Keep last 800 posts per feed
    
    async def create_post(self, user_id: str, content: str) -> dict:
        """Create post and fan out to followers."""
        
        # 1. Save post to database
        post = await self.db.fetch_one(
            """
            INSERT INTO posts (user_id, content, created_at)
            VALUES ($1, $2, NOW())
            RETURNING *
            """,
            user_id, content
        )
        
        # 2. Get follower count to decide strategy
        follower_count = await self._get_follower_count(user_id)
        
        # 3. Fan out to followers
        if follower_count < 10000:
            # Small account: synchronous fan-out
            await self._fan_out_sync(post)
        else:
            # Large account: async fan-out via queue
            await self._fan_out_async(post)
        
        return dict(post)
    
    async def _fan_out_sync(self, post: dict):
        """Synchronous fan-out for small accounts."""
        user_id = post['user_id']
        post_id = post['id']
        created_at = post['created_at'].timestamp()
        
        # Get all followers
        followers = await self.db.fetch(
            "SELECT follower_id FROM follows WHERE followee_id = $1",
            user_id
        )
        
        # Push to each follower's feed
        pipe = self.redis.pipeline()
        for row in followers:
            follower_id = row['follower_id']
            feed_key = f"feed:{follower_id}"
            
            # Add to sorted set (score = timestamp for ordering)
            pipe.zadd(feed_key, {post_id: created_at})
            
            # Trim to keep only recent posts
            pipe.zremrangebyrank(feed_key, 0, -self.feed_size_limit - 1)
        
        await pipe.execute()
    
    async def _fan_out_async(self, post: dict):
        """Async fan-out via message queue for large accounts."""
        await self.queue.send("fan-out-jobs", {
            "post_id": post['id'],
            "user_id": post['user_id'],
            "created_at": post['created_at'].isoformat()
        })
    
    async def get_feed(self, user_id: str, limit: int = 50, offset: int = 0) -> list:
        """Get user's feed - just read from cache!"""
        feed_key = f"feed:{user_id}"
        
        # Get post IDs from sorted set (newest first)
        post_ids = await self.redis.zrevrange(
            feed_key,
            offset,
            offset + limit - 1
        )
        
        if not post_ids:
            return []
        
        # Fetch post details
        posts = await self._get_posts_by_ids(post_ids)
        return posts
    
    async def _get_posts_by_ids(self, post_ids: list) -> list:
        """Fetch post details, with caching."""
        posts = []
        
        # Try cache first
        pipe = self.redis.pipeline()
        for post_id in post_ids:
            pipe.get(f"post:{post_id}")
        cached = await pipe.execute()
        
        # Separate hits and misses
        to_fetch = []
        for i, (post_id, data) in enumerate(zip(post_ids, cached)):
            if data:
                posts.append((i, json.loads(data)))
            else:
                to_fetch.append((i, post_id))
        
        # Fetch missing from database
        if to_fetch:
            ids = [post_id for _, post_id in to_fetch]
            rows = await self.db.fetch(
                "SELECT * FROM posts WHERE id = ANY($1)",
                ids
            )
            
            # Cache and add to results
            row_map = {str(r['id']): r for r in rows}
            pipe = self.redis.pipeline()
            
            for idx, post_id in to_fetch:
                if post_id in row_map:
                    post = dict(row_map[post_id])
                    posts.append((idx, post))
                    pipe.setex(f"post:{post_id}", 3600, json.dumps(post, default=str))
            
            await pipe.execute()
        
        # Sort by original order
        posts.sort(key=lambda x: x[0])
        return [post for _, post in posts]
    
    async def _get_follower_count(self, user_id: str) -> int:
        """Get follower count (cached)."""
        cache_key = f"follower_count:{user_id}"
        
        cached = await self.redis.get(cache_key)
        if cached:
            return int(cached)
        
        count = await self.db.fetch_val(
            "SELECT COUNT(*) FROM follows WHERE followee_id = $1",
            user_id
        )
        
        await self.redis.setex(cache_key, 300, str(count))
        return count

Pros:

  • Extremely fast reads (just read from cache)
  • Predictable read latency
  • Simple read path

Cons:

  • Write amplification (1 post → N cache writes)
  • Celebrity problem (millions of writes per post)
  • Storage heavy (every user has cached feed)
  • Delayed delivery for large fan-outs

2.2 Approach 2: Fan-Out on Read (Pull Model)

When a user requests their feed, compute it on demand.

FAN-OUT ON READ

User B opens app, requests feed
     │
     ▼
┌────────────────────────────────────────────────────────────┐
│                     FEED SERVICE                           │
│                                                            │
│  1. Get B's following list: [A, C, D, ...]                 │
│  2. For each followee, get their recent posts              │
│  3. Merge all posts                                        │
│  4. Sort by time                                           │
│  5. Return top 50                                          │
│                                                            │
└────────────────────────────────────────────────────────────┘
     │
     ├──────────────────┬──────────────────┬─────────────────┐
     │                  │                  │                 │
     ▼                  ▼                  ▼                 ▼
┌─────────┐      ┌─────────┐      ┌─────────┐      ┌─────────┐
│ A's     │      │ C's     │      │ D's     │      │ E's     │
│ Posts   │      │ Posts   │      │ Posts   │      │ Posts   │
└─────────┘      └─────────┘      └─────────┘      └─────────┘

Pros: No pre-computation, no storage per user
Cons: Slow! Must query many sources per request

Implementation:

# Fan-Out on Read Implementation

class FanOutOnReadFeedService:
    """
    Pull-based feed system.
    
    When a user requests their feed, we compute it on demand
    by fetching posts from all accounts they follow.
    """
    
    def __init__(self, redis_client, db_client):
        self.redis = redis_client
        self.db = db_client
    
    async def create_post(self, user_id: str, content: str) -> dict:
        """Create post - just save it, no fan-out!"""
        
        # Save post to database
        post = await self.db.fetch_one(
            """
            INSERT INTO posts (user_id, content, created_at)
            VALUES ($1, $2, NOW())
            RETURNING *
            """,
            user_id, content
        )
        
        # Cache the post for fast retrieval
        await self.redis.setex(
            f"post:{post['id']}",
            3600,
            json.dumps(dict(post), default=str)
        )
        
        # Add to user's posts list (for pull queries)
        await self.redis.zadd(
            f"user_posts:{user_id}",
            {str(post['id']): post['created_at'].timestamp()}
        )
        
        # Trim old posts
        await self.redis.zremrangebyrank(f"user_posts:{user_id}", 0, -1001)
        
        return dict(post)
    
    async def get_feed(self, user_id: str, limit: int = 50) -> list:
        """Get feed by pulling from all followees."""
        
        # 1. Get list of accounts user follows
        following = await self._get_following(user_id)
        
        if not following:
            return []
        
        # 2. Fetch recent posts from each followee (parallel)
        tasks = [
            self._get_user_recent_posts(followee_id)
            for followee_id in following
        ]
        
        all_posts_lists = await asyncio.gather(*tasks)
        
        # 3. Merge all posts
        all_posts = []
        for posts in all_posts_lists:
            all_posts.extend(posts)
        
        # 4. Sort by time (newest first)
        all_posts.sort(key=lambda p: p['created_at'], reverse=True)
        
        # 5. Return top N
        return all_posts[:limit]
    
    async def _get_following(self, user_id: str) -> list:
        """Get list of accounts user follows."""
        cache_key = f"following:{user_id}"
        
        # Try cache
        cached = await self.redis.smembers(cache_key)
        if cached:
            return list(cached)
        
        # Fetch from database
        rows = await self.db.fetch(
            "SELECT followee_id FROM follows WHERE follower_id = $1",
            user_id
        )
        
        following = [str(row['followee_id']) for row in rows]
        
        # Cache for 5 minutes
        if following:
            await self.redis.sadd(cache_key, *following)
            await self.redis.expire(cache_key, 300)
        
        return following
    
    async def _get_user_recent_posts(self, user_id: str, limit: int = 100) -> list:
        """Get recent posts from a user (cached)."""
        cache_key = f"user_posts:{user_id}"
        
        # Get post IDs from sorted set
        post_ids = await self.redis.zrevrange(cache_key, 0, limit - 1)
        
        if not post_ids:
            # Cache miss - fetch from database
            rows = await self.db.fetch(
                """
                SELECT * FROM posts 
                WHERE user_id = $1 
                ORDER BY created_at DESC 
                LIMIT $2
                """,
                user_id, limit
            )
            return [dict(row) for row in rows]
        
        # Fetch post details
        return await self._get_posts_by_ids(post_ids)
    
    async def _get_posts_by_ids(self, post_ids: list) -> list:
        """Fetch post details with caching."""
        # Same as fan-out-on-write implementation
        posts = []
        
        pipe = self.redis.pipeline()
        for post_id in post_ids:
            pipe.get(f"post:{post_id}")
        cached = await pipe.execute()
        
        to_fetch = []
        for i, (post_id, data) in enumerate(zip(post_ids, cached)):
            if data:
                posts.append((i, json.loads(data)))
            else:
                to_fetch.append((i, post_id))
        
        if to_fetch:
            ids = [post_id for _, post_id in to_fetch]
            rows = await self.db.fetch(
                "SELECT * FROM posts WHERE id = ANY($1)",
                ids
            )
            
            row_map = {str(r['id']): r for r in rows}
            pipe = self.redis.pipeline()
            
            for idx, post_id in to_fetch:
                if post_id in row_map:
                    post = dict(row_map[post_id])
                    posts.append((idx, post))
                    pipe.setex(f"post:{post_id}", 3600, json.dumps(post, default=str))
            
            await pipe.execute()
        
        posts.sort(key=lambda x: x[0])
        return [post for _, post in posts]

Pros:

  • Fast writes (just save the post)
  • No write amplification
  • Storage efficient (no per-user feed cache)
  • Always fresh (computed in real-time)

Cons:

  • Slow reads (must query many sources)
  • High read amplification
  • Unpredictable latency (depends on following count)
  • Heavy database load at read time

2.3 Comparison

Aspect Fan-Out on Write Fan-Out on Read
Write latency Slow (N cache writes) Fast (single write)
Read latency Fast (single cache read) Slow (N queries)
Storage High (feed per user) Low (posts only)
Write amplification High None
Read amplification None High
Celebrity problem Severe None
Freshness Near real-time Real-time
Best for Read-heavy, regular users Write-heavy, celebrities

Chapter 3: The Hybrid Approach

3.1 The Celebrity Problem

THE CELEBRITY PROBLEM

Kim Kardashian posts a photo.
She has 300 million followers.

With fan-out on write:
  → 300 million cache writes
  → At 100K writes/sec = 50 minutes to complete
  → Users don't see post for almost an hour!
  → Meanwhile, she posts again... backlog grows

This doesn't scale.

But most users aren't celebrities:
  → Average user: 200 followers
  → 200 cache writes = instant

The insight:
  Different users need different strategies.

3.2 Hybrid Strategy

Use fan-out on write for regular users, fan-out on read for celebrities.

HYBRID APPROACH

User classification:
  Regular user:  < 10,000 followers  → Fan-out on write
  Celebrity:     >= 10,000 followers → Fan-out on read

When regular user posts:
  → Push to all followers' feeds (immediate)

When celebrity posts:
  → Store post, don't fan out
  → When follower reads feed:
    → Get pre-computed feed (from regular users)
    → Pull celebrity posts on demand
    → Merge and return

Implementation:

# Hybrid Feed System

from dataclasses import dataclass
from typing import List, Set
import asyncio


@dataclass 
class FeedConfig:
    """Configuration for hybrid feed system."""
    celebrity_threshold: int = 10000  # Followers to be considered celebrity
    feed_cache_size: int = 800        # Posts to keep in feed cache
    celebrity_posts_limit: int = 100  # Celebrity posts to fetch per request
    feed_ttl: int = 86400             # Feed cache TTL


class HybridFeedService:
    """
    Hybrid feed system combining push and pull.
    
    Regular users (< 10K followers): Fan-out on write
    Celebrities (>= 10K followers): Fan-out on read
    
    This solves the celebrity problem while keeping
    fast reads for most users.
    """
    
    def __init__(
        self,
        redis_client,
        db_client,
        queue_client,
        config: FeedConfig = None
    ):
        self.redis = redis_client
        self.db = db_client
        self.queue = queue_client
        self.config = config or FeedConfig()
        
        # Cache of celebrity user IDs
        self._celebrity_cache: Set[str] = set()
    
    async def create_post(self, user_id: str, content: str) -> dict:
        """Create post with appropriate fan-out strategy."""
        
        # Save post
        post = await self._save_post(user_id, content)
        
        # Check if user is a celebrity
        is_celebrity = await self._is_celebrity(user_id)
        
        if is_celebrity:
            # Celebrity: No fan-out, just index the post
            await self._index_celebrity_post(post)
        else:
            # Regular user: Fan out to followers
            await self._fan_out_to_followers(post)
        
        return post
    
    async def get_feed(self, user_id: str, limit: int = 50) -> list:
        """
        Get feed by merging pre-computed feed with celebrity posts.
        """
        # 1. Get pre-computed feed (from regular users)
        precomputed = await self._get_precomputed_feed(user_id, limit * 2)
        
        # 2. Get celebrities this user follows
        followed_celebrities = await self._get_followed_celebrities(user_id)
        
        # 3. Pull recent posts from followed celebrities
        celebrity_posts = []
        if followed_celebrities:
            celebrity_posts = await self._get_celebrity_posts(
                followed_celebrities,
                limit=self.config.celebrity_posts_limit
            )
        
        # 4. Merge and sort
        all_posts = precomputed + celebrity_posts
        all_posts.sort(key=lambda p: p['created_at'], reverse=True)
        
        # 5. Deduplicate and return top N
        seen = set()
        unique_posts = []
        for post in all_posts:
            if post['id'] not in seen:
                seen.add(post['id'])
                unique_posts.append(post)
                if len(unique_posts) >= limit:
                    break
        
        return unique_posts
    
    async def _is_celebrity(self, user_id: str) -> bool:
        """Check if user is a celebrity (cached)."""
        # Check memory cache first
        if user_id in self._celebrity_cache:
            return True
        
        # Check Redis
        is_celeb = await self.redis.sismember("celebrities", user_id)
        if is_celeb:
            self._celebrity_cache.add(user_id)
            return True
        
        # Check database
        count = await self._get_follower_count(user_id)
        if count >= self.config.celebrity_threshold:
            # Add to celebrity set
            await self.redis.sadd("celebrities", user_id)
            self._celebrity_cache.add(user_id)
            return True
        
        return False
    
    async def _save_post(self, user_id: str, content: str) -> dict:
        """Save post to database."""
        post = await self.db.fetch_one(
            """
            INSERT INTO posts (user_id, content, created_at)
            VALUES ($1, $2, NOW())
            RETURNING *
            """,
            user_id, content
        )
        
        post_dict = dict(post)
        
        # Cache the post
        await self.redis.setex(
            f"post:{post['id']}",
            3600,
            json.dumps(post_dict, default=str)
        )
        
        return post_dict
    
    async def _index_celebrity_post(self, post: dict):
        """Index celebrity post for pull-based retrieval."""
        user_id = post['user_id']
        post_id = str(post['id'])
        timestamp = post['created_at'].timestamp()
        
        # Add to celebrity's posts list
        await self.redis.zadd(
            f"celebrity_posts:{user_id}",
            {post_id: timestamp}
        )
        
        # Trim old posts
        await self.redis.zremrangebyrank(
            f"celebrity_posts:{user_id}",
            0, -self.config.celebrity_posts_limit - 1
        )
    
    async def _fan_out_to_followers(self, post: dict):
        """Fan out post to all followers' feeds."""
        user_id = post['user_id']
        post_id = str(post['id'])
        timestamp = post['created_at'].timestamp()
        
        # Get followers
        followers = await self.db.fetch(
            "SELECT follower_id FROM follows WHERE followee_id = $1",
            user_id
        )
        
        # Update each follower's feed cache
        pipe = self.redis.pipeline()
        
        for row in followers:
            follower_id = row['follower_id']
            feed_key = f"feed:{follower_id}"
            
            pipe.zadd(feed_key, {post_id: timestamp})
            pipe.zremrangebyrank(feed_key, 0, -self.config.feed_cache_size - 1)
        
        await pipe.execute()
    
    async def _get_precomputed_feed(self, user_id: str, limit: int) -> list:
        """Get pre-computed feed from cache."""
        feed_key = f"feed:{user_id}"
        
        post_ids = await self.redis.zrevrange(feed_key, 0, limit - 1)
        
        if not post_ids:
            return []
        
        return await self._get_posts_by_ids(post_ids)
    
    async def _get_followed_celebrities(self, user_id: str) -> list:
        """Get list of celebrities this user follows."""
        # Get all following
        following = await self.redis.smembers(f"following:{user_id}")
        
        if not following:
            rows = await self.db.fetch(
                "SELECT followee_id FROM follows WHERE follower_id = $1",
                user_id
            )
            following = {str(row['followee_id']) for row in rows}
            
            if following:
                await self.redis.sadd(f"following:{user_id}", *following)
                await self.redis.expire(f"following:{user_id}", 300)
        
        # Filter to celebrities only
        celebrities = await self.redis.smembers("celebrities")
        return list(following & celebrities)
    
    async def _get_celebrity_posts(
        self,
        celebrity_ids: list,
        limit: int
    ) -> list:
        """Get recent posts from celebrities."""
        all_posts = []
        
        # Fetch from each celebrity in parallel
        tasks = []
        for celeb_id in celebrity_ids:
            tasks.append(self._get_celebrity_recent_posts(celeb_id))
        
        results = await asyncio.gather(*tasks)
        
        for posts in results:
            all_posts.extend(posts)
        
        # Sort and limit
        all_posts.sort(key=lambda p: p['created_at'], reverse=True)
        return all_posts[:limit]
    
    async def _get_celebrity_recent_posts(self, user_id: str) -> list:
        """Get recent posts from a celebrity."""
        post_ids = await self.redis.zrevrange(
            f"celebrity_posts:{user_id}",
            0, 99
        )
        
        if not post_ids:
            return []
        
        return await self._get_posts_by_ids(post_ids)
    
    async def _get_posts_by_ids(self, post_ids: list) -> list:
        """Fetch posts by IDs with caching."""
        posts = []
        
        # Batch fetch from cache
        pipe = self.redis.pipeline()
        for post_id in post_ids:
            pipe.get(f"post:{post_id}")
        cached = await pipe.execute()
        
        to_fetch = []
        for i, (post_id, data) in enumerate(zip(post_ids, cached)):
            if data:
                posts.append((i, json.loads(data)))
            else:
                to_fetch.append((i, post_id))
        
        # Fetch missing from database
        if to_fetch:
            ids = [pid for _, pid in to_fetch]
            rows = await self.db.fetch(
                "SELECT * FROM posts WHERE id = ANY($1)",
                ids
            )
            
            row_map = {str(r['id']): dict(r) for r in rows}
            
            for idx, post_id in to_fetch:
                if post_id in row_map:
                    posts.append((idx, row_map[post_id]))
        
        posts.sort(key=lambda x: x[0])
        return [post for _, post in posts]
    
    async def _get_follower_count(self, user_id: str) -> int:
        """Get follower count."""
        cached = await self.redis.get(f"follower_count:{user_id}")
        if cached:
            return int(cached)
        
        count = await self.db.fetch_val(
            "SELECT COUNT(*) FROM follows WHERE followee_id = $1",
            user_id
        )
        
        await self.redis.setex(f"follower_count:{user_id}", 300, str(count))
        return count

3.3 Hybrid Benefits

HYBRID APPROACH BENEFITS

Write Performance:
  Regular user (200 followers):
    → 200 cache writes (fast)
  
  Celebrity (10M followers):
    → 0 cache writes (instant!)
    → Post indexed for pull

Read Performance:
  User following 500 accounts:
    → 490 regular users: read from pre-computed feed
    → 10 celebrities: pull their posts
    → Merge 490 cached + 100 celebrity posts
    → Still fast! (one cache read + 10 small pulls)

Storage:
  → Feed caches only contain regular user posts
  → Celebrity posts stored once, pulled on demand
  → Significant storage savings

Chapter 4: Advanced Feed Patterns

4.1 Ranked Feeds (Algorithmic)

Most modern feeds aren't purely chronological — they use relevance ranking.

RANKED FEED COMPONENTS

Score = f(recency, engagement, affinity, ...)

Recency:
  → How old is the post?
  → Decay function: score decreases over time

Engagement:
  → Likes, comments, shares
  → More engagement = higher score

Affinity:
  → How much does user interact with author?
  → Frequent interactions = higher score

Content type:
  → Videos might rank higher
  → Posts with images might rank higher

Negative signals:
  → User hid similar posts
  → Low engagement on similar content

Implementation:

# Ranked Feed Implementation

@dataclass
class RankingConfig:
    """Configuration for feed ranking."""
    recency_weight: float = 0.3
    engagement_weight: float = 0.3
    affinity_weight: float = 0.3
    content_weight: float = 0.1
    
    recency_half_life_hours: float = 6.0  # Score halves every 6 hours


class RankedFeedService:
    """
    Feed service with relevance ranking.
    
    Instead of pure chronological order, posts are ranked
    by a combination of recency, engagement, and affinity.
    """
    
    def __init__(self, redis_client, db_client, config: RankingConfig = None):
        self.redis = redis_client
        self.db = db_client
        self.config = config or RankingConfig()
    
    async def get_ranked_feed(
        self,
        user_id: str,
        limit: int = 50
    ) -> list:
        """Get feed ranked by relevance."""
        
        # 1. Get candidate posts (more than we need)
        candidates = await self._get_candidate_posts(user_id, limit * 3)
        
        # 2. Get user's affinity scores
        affinity_scores = await self._get_affinity_scores(user_id)
        
        # 3. Score each post
        scored_posts = []
        for post in candidates:
            score = self._calculate_score(post, affinity_scores)
            scored_posts.append((score, post))
        
        # 4. Sort by score and return top N
        scored_posts.sort(key=lambda x: x[0], reverse=True)
        return [post for _, post in scored_posts[:limit]]
    
    def _calculate_score(
        self,
        post: dict,
        affinity_scores: dict
    ) -> float:
        """Calculate relevance score for a post."""
        
        # Recency score (exponential decay)
        age_hours = (datetime.utcnow() - post['created_at']).total_seconds() / 3600
        recency_score = 0.5 ** (age_hours / self.config.recency_half_life_hours)
        
        # Engagement score (log scale)
        engagement = post.get('likes', 0) + post.get('comments', 0) * 2
        engagement_score = math.log(engagement + 1) / 10  # Normalize
        
        # Affinity score (from user interaction history)
        author_id = post['user_id']
        affinity_score = affinity_scores.get(author_id, 0.1)  # Default low
        
        # Content type score
        content_score = 0.5
        if post.get('has_image'):
            content_score = 0.7
        if post.get('has_video'):
            content_score = 0.9
        
        # Weighted combination
        total_score = (
            self.config.recency_weight * recency_score +
            self.config.engagement_weight * engagement_score +
            self.config.affinity_weight * affinity_score +
            self.config.content_weight * content_score
        )
        
        return total_score
    
    async def _get_affinity_scores(self, user_id: str) -> dict:
        """Get user's affinity scores for accounts they follow."""
        cache_key = f"affinity:{user_id}"
        
        cached = await self.redis.hgetall(cache_key)
        if cached:
            return {k: float(v) for k, v in cached.items()}
        
        # Calculate from interaction history
        interactions = await self.db.fetch(
            """
            SELECT target_user_id, 
                   COUNT(*) as interaction_count
            FROM user_interactions
            WHERE user_id = $1
            AND created_at > NOW() - INTERVAL '30 days'
            GROUP BY target_user_id
            """,
            user_id
        )
        
        # Normalize to 0-1 range
        if not interactions:
            return {}
        
        max_count = max(row['interaction_count'] for row in interactions)
        scores = {
            str(row['target_user_id']): row['interaction_count'] / max_count
            for row in interactions
        }
        
        # Cache for 1 hour
        if scores:
            await self.redis.hset(cache_key, mapping=scores)
            await self.redis.expire(cache_key, 3600)
        
        return scores
    
    async def _get_candidate_posts(self, user_id: str, limit: int) -> list:
        """Get candidate posts for ranking."""
        # Use hybrid approach to get candidates
        # Then we'll re-rank them
        
        # ... (same as hybrid feed implementation)
        pass

4.2 Sharded Feed Storage

For massive scale, shard feeds across multiple Redis instances.

SHARDED FEED ARCHITECTURE

Instead of one Redis cluster for all feeds:

Feed Shard 0:  Users 0-999,999
Feed Shard 1:  Users 1,000,000-1,999,999
Feed Shard 2:  Users 2,000,000-2,999,999
...

Sharding function:
  shard_id = hash(user_id) % num_shards

Benefits:
  → Each shard handles subset of users
  → Horizontal scaling
  → Failure isolation
# Sharded Feed Storage

class ShardedFeedStorage:
    """
    Sharded storage for feeds.
    
    Distributes feeds across multiple Redis instances
    for horizontal scaling.
    """
    
    def __init__(self, redis_clients: List, num_shards: int):
        self.redis_clients = redis_clients
        self.num_shards = num_shards
    
    def _get_shard(self, user_id: str) -> int:
        """Get shard ID for a user."""
        return hash(user_id) % self.num_shards
    
    def _get_client(self, user_id: str):
        """Get Redis client for a user."""
        shard_id = self._get_shard(user_id)
        return self.redis_clients[shard_id]
    
    async def add_to_feed(self, user_id: str, post_id: str, timestamp: float):
        """Add post to user's feed."""
        client = self._get_client(user_id)
        feed_key = f"feed:{user_id}"
        
        await client.zadd(feed_key, {post_id: timestamp})
        await client.zremrangebyrank(feed_key, 0, -801)
    
    async def get_feed(self, user_id: str, limit: int = 50) -> list:
        """Get user's feed."""
        client = self._get_client(user_id)
        feed_key = f"feed:{user_id}"
        
        return await client.zrevrange(feed_key, 0, limit - 1)
    
    async def fan_out(self, post: dict, follower_ids: List[str]):
        """Fan out post to multiple followers (potentially different shards)."""
        post_id = str(post['id'])
        timestamp = post['created_at'].timestamp()
        
        # Group followers by shard
        shard_followers = defaultdict(list)
        for follower_id in follower_ids:
            shard_id = self._get_shard(follower_id)
            shard_followers[shard_id].append(follower_id)
        
        # Execute fan-out per shard (parallel)
        tasks = []
        for shard_id, followers in shard_followers.items():
            client = self.redis_clients[shard_id]
            tasks.append(
                self._fan_out_to_shard(client, followers, post_id, timestamp)
            )
        
        await asyncio.gather(*tasks)
    
    async def _fan_out_to_shard(
        self,
        client,
        follower_ids: List[str],
        post_id: str,
        timestamp: float
    ):
        """Fan out to followers in a single shard."""
        pipe = client.pipeline()
        
        for follower_id in follower_ids:
            feed_key = f"feed:{follower_id}"
            pipe.zadd(feed_key, {post_id: timestamp})
            pipe.zremrangebyrank(feed_key, 0, -801)
        
        await pipe.execute()

4.3 Feed Warming

Pre-populate feeds for active users.

# Feed Warming for Active Users

class FeedWarmer:
    """
    Pre-populate feeds for users likely to be active.
    
    Run periodically (e.g., before peak hours) to ensure
    active users have warm feed caches.
    """
    
    def __init__(self, feed_service, redis_client, db_client):
        self.feed_service = feed_service
        self.redis = redis_client
        self.db = db_client
    
    async def warm_active_users(self, hours_ahead: int = 2):
        """Warm feeds for users predicted to be active soon."""
        
        # Get users active at this time yesterday/last week
        predicted_active = await self._predict_active_users(hours_ahead)
        
        logger.info(f"Warming feeds for {len(predicted_active)} users")
        
        # Warm each user's feed
        for user_id in predicted_active:
            try:
                await self._warm_user_feed(user_id)
            except Exception as e:
                logger.warning(f"Failed to warm feed for {user_id}: {e}")
    
    async def _predict_active_users(self, hours_ahead: int) -> List[str]:
        """Predict users who will be active in the next N hours."""
        
        # Simple heuristic: users active at same time yesterday
        return await self.db.fetch_column(
            """
            SELECT DISTINCT user_id 
            FROM user_sessions
            WHERE created_at BETWEEN 
                NOW() - INTERVAL '25 hours' AND 
                NOW() - INTERVAL '23 hours'
            LIMIT 100000
            """
        )
    
    async def _warm_user_feed(self, user_id: str):
        """Populate a user's feed cache."""
        
        # Check if feed already cached
        feed_key = f"feed:{user_id}"
        exists = await self.redis.exists(feed_key)
        
        if exists:
            return  # Already warm
        
        # Compute and cache feed
        feed = await self.feed_service.get_feed(user_id, limit=100)
        
        if feed:
            pipe = self.redis.pipeline()
            for post in feed:
                pipe.zadd(feed_key, {str(post['id']): post['created_at'].timestamp()})
            pipe.expire(feed_key, 86400)
            await pipe.execute()

Part II: Implementation

Chapter 5: Production Feed System

# Production Feed System - Complete Implementation

import asyncio
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Set, Optional
from collections import defaultdict
import math

logger = logging.getLogger(__name__)


# =============================================================================
# Configuration
# =============================================================================

@dataclass
class ProductionFeedConfig:
    """Complete configuration for production feed system."""
    
    # Celebrity threshold
    celebrity_threshold: int = 10000
    
    # Cache settings
    feed_cache_size: int = 800
    feed_cache_ttl: int = 86400  # 24 hours
    post_cache_ttl: int = 3600   # 1 hour
    
    # Fan-out settings
    async_fanout_threshold: int = 1000  # Use async for > 1000 followers
    fanout_batch_size: int = 1000
    
    # Ranking settings
    enable_ranking: bool = True
    recency_weight: float = 0.3
    engagement_weight: float = 0.3
    affinity_weight: float = 0.3
    content_weight: float = 0.1
    recency_half_life_hours: float = 6.0
    
    # Performance settings
    max_following_to_pull: int = 50  # Max celebrities to pull per request
    candidate_multiplier: int = 3   # Fetch 3x posts for ranking


# =============================================================================
# Metrics
# =============================================================================

@dataclass
class FeedMetrics:
    """Metrics for feed system."""
    feed_requests: int = 0
    posts_created: int = 0
    fanouts_sync: int = 0
    fanouts_async: int = 0
    celebrity_pulls: int = 0
    cache_hits: int = 0
    cache_misses: int = 0
    avg_feed_latency_ms: float = 0
    
    def to_dict(self) -> dict:
        return {
            'feed_requests': self.feed_requests,
            'posts_created': self.posts_created,
            'fanouts': {'sync': self.fanouts_sync, 'async': self.fanouts_async},
            'celebrity_pulls': self.celebrity_pulls,
            'cache_hit_ratio': self.cache_hits / max(1, self.cache_hits + self.cache_misses),
            'avg_feed_latency_ms': round(self.avg_feed_latency_ms, 2)
        }


# =============================================================================
# Main Feed Service
# =============================================================================

class ProductionFeedService:
    """
    Production-ready feed service.
    
    Features:
    - Hybrid push/pull model
    - Celebrity handling
    - Relevance ranking
    - Sharding support
    - Comprehensive metrics
    """
    
    def __init__(
        self,
        redis_client,
        db_client,
        queue_client,
        config: ProductionFeedConfig = None
    ):
        self.redis = redis_client
        self.db = db_client
        self.queue = queue_client
        self.config = config or ProductionFeedConfig()
        self.metrics = FeedMetrics()
        
        # Celebrity cache (in-memory for speed)
        self._celebrities: Set[str] = set()
        self._celebrities_loaded = False
    
    async def initialize(self):
        """Initialize service (call on startup)."""
        await self._load_celebrities()
    
    # =========================================================================
    # Post Creation
    # =========================================================================
    
    async def create_post(
        self,
        user_id: str,
        content: str,
        media_type: str = None
    ) -> dict:
        """
        Create a post and distribute to followers.
        
        Uses appropriate fan-out strategy based on follower count.
        """
        self.metrics.posts_created += 1
        
        # Save post
        post = await self._save_post(user_id, content, media_type)
        
        # Determine fan-out strategy
        is_celebrity = await self._is_celebrity(user_id)
        
        if is_celebrity:
            # Celebrity: Index for pull, no fan-out
            await self._index_celebrity_post(post)
            logger.debug(f"Celebrity post indexed: {post['id']}")
        else:
            # Regular user: Fan out
            follower_count = await self._get_follower_count(user_id)
            
            if follower_count < self.config.async_fanout_threshold:
                # Small fan-out: synchronous
                await self._fan_out_sync(post)
                self.metrics.fanouts_sync += 1
            else:
                # Large fan-out: asynchronous via queue
                await self._fan_out_async(post)
                self.metrics.fanouts_async += 1
        
        return post
    
    async def _save_post(
        self,
        user_id: str,
        content: str,
        media_type: str
    ) -> dict:
        """Save post to database and cache."""
        post = await self.db.fetch_one(
            """
            INSERT INTO posts (user_id, content, media_type, created_at)
            VALUES ($1, $2, $3, NOW())
            RETURNING id, user_id, content, media_type, created_at,
                      0 as likes, 0 as comments, 0 as shares
            """,
            user_id, content, media_type
        )
        
        post_dict = self._post_to_dict(post)
        
        # Cache post
        await self.redis.setex(
            f"post:{post['id']}",
            self.config.post_cache_ttl,
            json.dumps(post_dict, default=str)
        )
        
        # Add to user's posts
        await self.redis.zadd(
            f"user_posts:{user_id}",
            {str(post['id']): post['created_at'].timestamp()}
        )
        
        return post_dict
    
    async def _index_celebrity_post(self, post: dict):
        """Index celebrity post for pull queries."""
        await self.redis.zadd(
            f"celebrity_posts:{post['user_id']}",
            {str(post['id']): post['created_at'].timestamp()}
        )
        await self.redis.zremrangebyrank(
            f"celebrity_posts:{post['user_id']}",
            0, -101  # Keep last 100
        )
    
    async def _fan_out_sync(self, post: dict):
        """Synchronous fan-out for small follower counts."""
        followers = await self._get_followers(post['user_id'])
        
        if not followers:
            return
        
        post_id = str(post['id'])
        timestamp = post['created_at'].timestamp()
        
        # Batch updates
        pipe = self.redis.pipeline()
        for follower_id in followers:
            feed_key = f"feed:{follower_id}"
            pipe.zadd(feed_key, {post_id: timestamp})
            pipe.zremrangebyrank(feed_key, 0, -self.config.feed_cache_size - 1)
        
        await pipe.execute()
    
    async def _fan_out_async(self, post: dict):
        """Asynchronous fan-out via message queue."""
        await self.queue.send("fanout-jobs", {
            "post_id": str(post['id']),
            "user_id": post['user_id'],
            "timestamp": post['created_at'].timestamp()
        })
    
    # =========================================================================
    # Feed Retrieval
    # =========================================================================
    
    async def get_feed(
        self,
        user_id: str,
        limit: int = 50,
        offset: int = 0,
        ranked: bool = None
    ) -> List[dict]:
        """
        Get user's feed.
        
        Combines pre-computed feed (regular users) with
        pulled celebrity posts, optionally ranked.
        """
        start_time = datetime.utcnow()
        self.metrics.feed_requests += 1
        
        ranked = ranked if ranked is not None else self.config.enable_ranking
        
        # Get more candidates if ranking
        fetch_limit = limit * self.config.candidate_multiplier if ranked else limit
        
        # 1. Get pre-computed feed
        precomputed = await self._get_precomputed_feed(
            user_id, fetch_limit, offset
        )
        
        # 2. Get celebrity posts
        followed_celebrities = await self._get_followed_celebrities(user_id)
        
        celebrity_posts = []
        if followed_celebrities:
            # Limit celebrities to pull
            celebs_to_pull = followed_celebrities[:self.config.max_following_to_pull]
            celebrity_posts = await self._pull_celebrity_posts(celebs_to_pull)
            self.metrics.celebrity_pulls += len(celebs_to_pull)
        
        # 3. Merge
        all_posts = precomputed + celebrity_posts
        
        # 4. Deduplicate
        seen = set()
        unique_posts = []
        for post in all_posts:
            if post['id'] not in seen:
                seen.add(post['id'])
                unique_posts.append(post)
        
        # 5. Rank or sort chronologically
        if ranked and len(unique_posts) > limit:
            affinity = await self._get_affinity_scores(user_id)
            result = self._rank_posts(unique_posts, affinity)[:limit]
        else:
            unique_posts.sort(key=lambda p: p['created_at'], reverse=True)
            result = unique_posts[:limit]
        
        # Record latency
        latency_ms = (datetime.utcnow() - start_time).total_seconds() * 1000
        self._update_latency(latency_ms)
        
        return result
    
    async def _get_precomputed_feed(
        self,
        user_id: str,
        limit: int,
        offset: int
    ) -> List[dict]:
        """Get pre-computed feed from cache."""
        feed_key = f"feed:{user_id}"
        
        post_ids = await self.redis.zrevrange(
            feed_key,
            offset,
            offset + limit - 1
        )
        
        if post_ids:
            self.metrics.cache_hits += 1
            return await self._get_posts_by_ids(post_ids)
        
        self.metrics.cache_misses += 1
        return []
    
    async def _pull_celebrity_posts(
        self,
        celebrity_ids: List[str]
    ) -> List[dict]:
        """Pull recent posts from celebrities."""
        all_posts = []
        
        # Parallel fetch
        tasks = [
            self._get_celebrity_recent_posts(celeb_id)
            for celeb_id in celebrity_ids
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for result in results:
            if isinstance(result, list):
                all_posts.extend(result)
        
        return all_posts
    
    async def _get_celebrity_recent_posts(
        self,
        user_id: str,
        limit: int = 20
    ) -> List[dict]:
        """Get recent posts from a celebrity."""
        post_ids = await self.redis.zrevrange(
            f"celebrity_posts:{user_id}",
            0, limit - 1
        )
        
        if not post_ids:
            return []
        
        return await self._get_posts_by_ids(post_ids)
    
    # =========================================================================
    # Ranking
    # =========================================================================
    
    def _rank_posts(
        self,
        posts: List[dict],
        affinity_scores: Dict[str, float]
    ) -> List[dict]:
        """Rank posts by relevance."""
        scored = []
        
        for post in posts:
            score = self._calculate_score(post, affinity_scores)
            scored.append((score, post))
        
        scored.sort(key=lambda x: x[0], reverse=True)
        return [post for _, post in scored]
    
    def _calculate_score(
        self,
        post: dict,
        affinity_scores: Dict[str, float]
    ) -> float:
        """Calculate relevance score for a post."""
        c = self.config
        
        # Recency score
        age_hours = (datetime.utcnow() - post['created_at']).total_seconds() / 3600
        recency = 0.5 ** (age_hours / c.recency_half_life_hours)
        
        # Engagement score (log scale, normalized)
        engagement = post.get('likes', 0) + post.get('comments', 0) * 2
        engagement_score = min(1.0, math.log(engagement + 1) / 10)
        
        # Affinity score
        affinity = affinity_scores.get(str(post['user_id']), 0.1)
        
        # Content score
        content = 0.5
        if post.get('media_type') == 'image':
            content = 0.7
        elif post.get('media_type') == 'video':
            content = 0.9
        
        return (
            c.recency_weight * recency +
            c.engagement_weight * engagement_score +
            c.affinity_weight * affinity +
            c.content_weight * content
        )
    
    async def _get_affinity_scores(self, user_id: str) -> Dict[str, float]:
        """Get user's affinity scores."""
        cache_key = f"affinity:{user_id}"
        
        cached = await self.redis.hgetall(cache_key)
        if cached:
            return {k: float(v) for k, v in cached.items()}
        
        # Calculate from interactions
        rows = await self.db.fetch(
            """
            SELECT target_user_id, COUNT(*) as cnt
            FROM user_interactions
            WHERE user_id = $1 AND created_at > NOW() - INTERVAL '30 days'
            GROUP BY target_user_id
            """,
            user_id
        )
        
        if not rows:
            return {}
        
        max_cnt = max(r['cnt'] for r in rows)
        scores = {str(r['target_user_id']): r['cnt'] / max_cnt for r in rows}
        
        if scores:
            await self.redis.hset(cache_key, mapping={k: str(v) for k, v in scores.items()})
            await self.redis.expire(cache_key, 3600)
        
        return scores
    
    # =========================================================================
    # Helpers
    # =========================================================================
    
    async def _is_celebrity(self, user_id: str) -> bool:
        """Check if user is a celebrity."""
        if not self._celebrities_loaded:
            await self._load_celebrities()
        
        return user_id in self._celebrities
    
    async def _load_celebrities(self):
        """Load celebrity set from Redis."""
        celebrities = await self.redis.smembers("celebrities")
        self._celebrities = set(celebrities) if celebrities else set()
        self._celebrities_loaded = True
    
    async def _get_follower_count(self, user_id: str) -> int:
        """Get follower count (cached)."""
        cached = await self.redis.get(f"follower_count:{user_id}")
        if cached:
            return int(cached)
        
        count = await self.db.fetch_val(
            "SELECT COUNT(*) FROM follows WHERE followee_id = $1",
            user_id
        )
        
        await self.redis.setex(f"follower_count:{user_id}", 300, str(count))
        
        # Update celebrity status
        if count >= self.config.celebrity_threshold:
            await self.redis.sadd("celebrities", user_id)
            self._celebrities.add(user_id)
        
        return count
    
    async def _get_followers(self, user_id: str) -> List[str]:
        """Get list of follower IDs."""
        rows = await self.db.fetch(
            "SELECT follower_id FROM follows WHERE followee_id = $1",
            user_id
        )
        return [str(row['follower_id']) for row in rows]
    
    async def _get_followed_celebrities(self, user_id: str) -> List[str]:
        """Get celebrities that user follows."""
        following = await self.redis.smembers(f"following:{user_id}")
        
        if not following:
            rows = await self.db.fetch(
                "SELECT followee_id FROM follows WHERE follower_id = $1",
                user_id
            )
            following = {str(row['followee_id']) for row in rows}
            
            if following:
                await self.redis.sadd(f"following:{user_id}", *following)
                await self.redis.expire(f"following:{user_id}", 300)
        
        return [uid for uid in following if uid in self._celebrities]
    
    async def _get_posts_by_ids(self, post_ids: List[str]) -> List[dict]:
        """Fetch posts by IDs with caching."""
        posts = []
        to_fetch = []
        
        # Try cache
        pipe = self.redis.pipeline()
        for post_id in post_ids:
            pipe.get(f"post:{post_id}")
        cached = await pipe.execute()
        
        for i, (post_id, data) in enumerate(zip(post_ids, cached)):
            if data:
                posts.append((i, json.loads(data)))
            else:
                to_fetch.append((i, post_id))
        
        # Fetch missing
        if to_fetch:
            ids = [pid for _, pid in to_fetch]
            rows = await self.db.fetch(
                "SELECT * FROM posts WHERE id = ANY($1)",
                ids
            )
            
            row_map = {str(r['id']): self._post_to_dict(r) for r in rows}
            
            pipe = self.redis.pipeline()
            for idx, post_id in to_fetch:
                if post_id in row_map:
                    post = row_map[post_id]
                    posts.append((idx, post))
                    pipe.setex(
                        f"post:{post_id}",
                        self.config.post_cache_ttl,
                        json.dumps(post, default=str)
                    )
            
            await pipe.execute()
        
        posts.sort(key=lambda x: x[0])
        return [post for _, post in posts]
    
    def _post_to_dict(self, row) -> dict:
        """Convert database row to dict."""
        post = dict(row)
        if isinstance(post.get('created_at'), str):
            post['created_at'] = datetime.fromisoformat(post['created_at'])
        return post
    
    def _update_latency(self, latency_ms: float):
        """Update average latency metric."""
        n = self.metrics.feed_requests
        old_avg = self.metrics.avg_feed_latency_ms
        self.metrics.avg_feed_latency_ms = old_avg + (latency_ms - old_avg) / n
    
    def get_metrics(self) -> dict:
        """Get service metrics."""
        return self.metrics.to_dict()

Part III: Real-World Application

Chapter 6: How Big Tech Does It

6.1 Case Study: Twitter — Hybrid with Heavy Pull

TWITTER FEED ARCHITECTURE

Twitter's approach:
  - Primarily fan-out on read (pull-based)
  - With targeted fan-out on write for some users

Why mostly pull?
  - Twitter has many celebrities (verified accounts)
  - Fan-out on write would be extremely expensive
  - Most tweets are not seen by most followers anyway

Architecture:

  Tweet Created
       │
       ▼
  ┌────────────────────────────────────────────────────────────┐
  │                    TWEET SERVICE                           │
  │                                                            │
  │  1. Save to Manhattan (distributed DB)                     │
  │  2. Index for search                                       │
  │  3. Queue for limited fan-out (close friends feature)      │
  │                                                            │
  └────────────────────────────────────────────────────────────┘

  Timeline Request
       │
       ▼
  ┌────────────────────────────────────────────────────────────┐
  │                  TIMELINE SERVICE                          │
  │                                                            │
  │  1. Get user's following list                              │
  │  2. Fetch recent tweets from each (parallel)               │
  │  3. Merge and rank (algorithm)                             │
  │  4. Cache result briefly                                   │
  │                                                            │
  └────────────────────────────────────────────────────────────┘

Key optimizations:
  - Heavy caching of user tweet lists
  - Parallel fetching with timeouts
  - Pre-computed "best tweets" per user
  - Algorithmic ranking reduces need for completeness

6.2 Case Study: Instagram — Hybrid with Heavy Push

INSTAGRAM FEED ARCHITECTURE

Instagram's approach:
  - Primarily fan-out on write (push-based)
  - Pull only for mega-celebrities

Why mostly push?
  - Instagram users follow fewer accounts (avg ~150)
  - Photos/videos are high-value content
  - User expectation of seeing all followed content

Architecture:

  Post Created
       │
       ▼
  ┌────────────────────────────────────────────────────────────┐
  │                    MEDIA SERVICE                           │
  │                                                            │
  │  1. Save media to storage                                  │
  │  2. Create post record                                     │
  │  3. Check follower count                                   │
  │     < 10K: Sync fan-out                                    │
  │     >= 10K: Async fan-out (queued)                         │
  │     > 1M: Celebrity treatment (pull-based)                 │
  │                                                            │
  └────────────────────────────────────────────────────────────┘

  Feed Request
       │
       ▼
  ┌────────────────────────────────────────────────────────────┐
  │                    FEED SERVICE                            │
  │                                                            │
  │  1. Read from feed cache (Cassandra)                       │
  │  2. If following mega-celebrities, pull their posts        │
  │  3. Merge and rank                                         │
  │  4. Apply "posts you may have missed" logic                │
  │                                                            │
  └────────────────────────────────────────────────────────────┘

Key features:
  - Feed stored in Cassandra (sorted by ranking score)
  - Ranking model runs at fan-out time
  - "You're all caught up" feature stops scroll

6.3 Case Study: LinkedIn — Professional Feed

LINKEDIN FEED ARCHITECTURE

LinkedIn's unique challenges:
  - "Viral" posts spread beyond connections (reactions show to network)
  - Professional content requires different ranking
  - Ads mixed into feed

Architecture:

  Post/Share Created
       │
       ▼
  ┌────────────────────────────────────────────────────────────┐
  │                    FEED MIXER                              │
  │                                                            │
  │  "Follow Graph Store" - connections/followers              │
  │  "Activity Store" - posts, reactions, comments             │
  │  "Ads Service" - sponsored content                         │
  │                                                            │
  └────────────────────────────────────────────────────────────┘

  Feed Request
       │
       ▼
  ┌────────────────────────────────────────────────────────────┐
  │  1. Pull from multiple sources:                            │
  │     - Direct connections' posts                            │
  │     - Posts connections reacted to (2nd degree)            │
  │     - Hashtags followed                                    │
  │     - Company pages followed                               │
  │                                                            │
  │  2. Rank with professional relevance model                 │
  │                                                            │
  │  3. Mix in ads based on targeting                          │
  │                                                            │
  │  4. Apply diversity rules (no same author twice in a row)  │
  │                                                            │
  └────────────────────────────────────────────────────────────┘

Key insight:
  LinkedIn feed includes "2nd degree" content:
  "John Doe likes this" expands content distribution
  This makes pure push impossible - must compute on read

6.4 Summary: Industry Approaches

Platform Primary Strategy Celebrity Handling Feed Size
Twitter Pull All users pull ~200 following
Instagram Push Pull for >1M followers ~150 following
Facebook Hybrid Complex ranking ~300 friends
LinkedIn Pull 2nd degree content ~500 connections
TikTok Pull (algorithm) Minimal follow concept Algorithmic

Chapter 7: Common Mistakes

7.1 Mistake 1: Pure Push at Scale

❌ WRONG: Fan-out everything to everyone

async def create_post(user_id: str, content: str):
    post = await save_post(user_id, content)
    
    followers = await get_all_followers(user_id)  # Could be millions!
    
    for follower in followers:
        await add_to_feed(follower, post)  # Millions of operations!
    
    return post

# Celebrity with 10M followers posts
# = 10M synchronous cache writes
# = Minutes or hours to complete
# = Post not visible for ages


✅ CORRECT: Strategy based on follower count

async def create_post(user_id: str, content: str):
    post = await save_post(user_id, content)
    
    follower_count = await get_follower_count(user_id)
    
    if follower_count < CELEBRITY_THRESHOLD:
        # Push for regular users
        if follower_count < 1000:
            await fan_out_sync(post)
        else:
            await fan_out_async(post)
    else:
        # Index for pull (celebrities)
        await index_celebrity_post(post)
    
    return post

7.2 Mistake 2: Unbounded Feed Size

❌ WRONG: Keep all posts forever

async def add_to_feed(user_id: str, post_id: str):
    await redis.zadd(f"feed:{user_id}", {post_id: timestamp})
    # Never trimmed!
    # Feed grows forever
    # Memory exhaustion


✅ CORRECT: Trim to reasonable size

FEED_SIZE_LIMIT = 800  # Keep last 800 posts

async def add_to_feed(user_id: str, post_id: str):
    pipe = redis.pipeline()
    pipe.zadd(f"feed:{user_id}", {post_id: timestamp})
    pipe.zremrangebyrank(f"feed:{user_id}", 0, -FEED_SIZE_LIMIT - 1)
    await pipe.execute()

7.3 Mistake 3: Ignoring Inactive Users

❌ WRONG: Fan out to all followers equally

async def fan_out(post: dict, followers: List[str]):
    for follower in followers:
        await add_to_feed(follower, post)
    
    # Fans out to users who haven't logged in for years
    # Wasted writes


✅ CORRECT: Prioritize active users

async def fan_out(post: dict, followers: List[str]):
    # Get activity status
    activity = await get_user_activity_batch(followers)
    
    # Prioritize by activity
    active = [f for f in followers if activity[f] == 'active']
    recent = [f for f in followers if activity[f] == 'recent']
    dormant = [f for f in followers if activity[f] == 'dormant']
    
    # Always fan out to active users
    await fan_out_batch(post, active)
    
    # Fan out to recent users
    await fan_out_batch(post, recent)
    
    # Skip dormant users (they'll pull on return)
    # Or fan out in background with low priority

7.4 Mistake 4: Sequential Feed Assembly

❌ WRONG: Sequential fetches

async def get_feed(user_id: str) -> list:
    following = await get_following(user_id)
    
    all_posts = []
    for followee in following:
        posts = await get_user_posts(followee)  # Sequential!
        all_posts.extend(posts)
    
    # 500 following = 500 sequential queries
    # At 10ms each = 5 seconds!


✅ CORRECT: Parallel fetches

async def get_feed(user_id: str) -> list:
    following = await get_following(user_id)
    
    # Parallel fetch
    tasks = [get_user_posts(followee) for followee in following]
    results = await asyncio.gather(*tasks)
    
    all_posts = []
    for posts in results:
        all_posts.extend(posts)
    
    # 500 following = 1 parallel batch
    # Total time ≈ slowest single query

7.5 Mistake Checklist

Before deploying feed system:

  • Celebrity handling — Different strategy for high-follower users
  • Feed size limits — Trim old posts
  • Activity awareness — Prioritize active users
  • Parallel operations — Don't sequentially fetch
  • Async fan-out — Queue large fan-outs
  • Monitoring — Track fan-out times, feed latency
  • Graceful degradation — Serve partial feed if slow

Part IV: Interview Preparation

Chapter 8: Interview Tips

8.1 Key Phrases

INTRODUCING THE PROBLEM:

"The key challenge with feeds is the read/write trade-off. 
We can optimize for fast reads by pre-computing feeds, but 
that creates massive write amplification when someone with 
millions of followers posts."


EXPLAINING HYBRID APPROACH:

"I'd use a hybrid model. Regular users with under 10,000 
followers get fan-out on write—their posts are pushed to 
followers' feeds immediately. Celebrities get fan-out on 
read—their posts are pulled when followers request their 
feed. This solves the celebrity problem while keeping reads 
fast for most users."


DISCUSSING TRADE-OFFS:

"The trade-off is storage versus compute. Fan-out on write 
uses more storage—we're caching feeds for every user—but 
reads are instant. Fan-out on read saves storage but every 
feed request requires computation. For a social network 
where reads vastly outnumber writes, the storage trade-off 
is usually worth it."


ON RANKING:

"Modern feeds aren't purely chronological. I'd rank by a 
combination of recency, engagement, and user affinity. 
This means we fetch more candidates than we need and 
score them, returning the top N. The ranking happens 
at read time, using cached affinity scores."

8.2 Common Questions

Question Good Answer
"What about consistency? User posts, immediately checks feed, doesn't see post?" "For fan-out on write, the post should appear almost instantly. For celebrities with async fan-out, there's a brief delay. We could add the user's own posts client-side immediately, or use read-your-writes consistency by checking the posts table for the requesting user's recent posts."
"How do you handle unfollows?" "When user A unfollows user B, we remove B's posts from A's feed cache. For active users, this is a small operation. For cached feeds of inactive users, we can clean up lazily on next access."
"What about deletions?" "When a post is deleted, we publish a deletion event. The feed service removes that post_id from feeds. Since feeds are stored as sorted sets of IDs, we just remove the ID—O(log N). We also mark the post as deleted in cache so it's filtered at read time."
"How do you handle someone following a new account?" "When user A follows user B, we backfill A's feed with B's recent posts. If B is a celebrity, those posts will be pulled anyway. If B is a regular user, we add their recent posts to A's feed cache, re-sorted by timestamp."

Chapter 9: Practice Problems

Problem 1: News Feed Design

Setup: Design a news feed for a social network with 500 million users. Users follow accounts and see posts in reverse chronological order.

Requirements:

  • Average user follows 200 accounts
  • 1% of users have >100K followers (influencers)
  • Feed must load in <200ms
  • 10 billion feed requests per day

Questions:

  1. Push, pull, or hybrid?
  2. How do you store feeds?
  3. How do you handle influencers?

Strategy: Hybrid

Regular users (< 100K followers): Fan-out on write
Influencers (>= 100K followers): Fan-out on read

Storage:
  - Redis sorted sets for feed cache (per user)
  - Keep last 500 posts per feed
  - ~500 bytes per post reference
  - 500M users × 500 × 0.5KB = 125TB
  - Use Redis Cluster with sharding

Feed retrieval:
  1. Get pre-computed feed from Redis
  2. Get followed influencers
  3. Pull influencer posts
  4. Merge (already sorted in cache + sort influencer posts)
  5. Return top 50

Influencer handling:
  - Store influencer posts in separate sorted set
  - Pull max 20 influencers per request
  - Cache influencer post lists aggressively

Problem 2: Activity Feed with Reactions

Setup: Design a LinkedIn-style activity feed where you see not just posts from connections, but also posts they reacted to.

Requirements:

  • User sees: direct posts, posts connections liked/commented
  • Must avoid showing same post twice
  • Must show "John liked this" attribution

Questions:

  1. How does this change the architecture?
  2. How do you track "2nd degree" content?
  3. How do you deduplicate?

This requires pull-based approach

Why pull?
  - When John likes a post, it should appear in his connections' feeds
  - That's fan-out-on-reaction which multiplies write load
  - Better to compute at read time

Architecture:

Feed request:
  1. Get direct posts from connections (standard)
  2. Get recent reactions from connections
  3. For each reaction, fetch the post (if not already included)
  4. Add attribution ("John liked this")
  5. Merge, dedupe, rank

Deduplication:
  - Track seen post_ids
  - If post appears via multiple paths (direct + reaction), keep direct
  - Add reaction info as metadata

Data model:
  reactions table: (user_id, post_id, reaction_type, created_at)
  
  Query: Get posts my connections reacted to
  
  SELECT DISTINCT r.post_id, r.user_id, r.reaction_type
  FROM reactions r
  WHERE r.user_id IN (my_connections)
  AND r.created_at > NOW() - INTERVAL '7 days'
  ORDER BY r.created_at DESC
  LIMIT 100

Cache reactions per user for fast lookup.

Problem 3: Real-Time Feed Updates

Setup: Users want to see new posts appear in their feed without refreshing. Design real-time feed updates.

Requirements:

  • New posts appear within 5 seconds
  • Don't overwhelm clients with updates
  • Handle users following many active accounts

Questions:

  1. Push to client or poll?
  2. How do you handle high-frequency posters?
  3. How do you scale WebSocket connections?

Approach: Hybrid push with rate limiting

Architecture:

Post created
     │
     ▼
Fan-out service
     │
     ├──▶ Feed cache (as before)
     │
     └──▶ Real-time notification service
              │
              ▼
         ┌──────────────────────────────────┐
         │  For each online follower:       │
         │  - Check rate limit              │
         │  - If OK, push via WebSocket     │
         │  - If rate limited, skip         │
         └──────────────────────────────────┘

Rate limiting:
  - Max 1 push per user per 5 seconds
  - If multiple posts in 5s window, batch them
  - "3 new posts" indicator instead of individual pushes

WebSocket scaling:
  - Sticky sessions to WebSocket servers
  - Use Redis pub/sub for cross-server messaging
  - Connection servers subscribe to user channels

Client handling:
  - Receive push notification "new posts available"
  - Client decides when to fetch (immediate or on scroll-to-top)
  - Avoid jarring UI updates while reading

Fallback:
  - If WebSocket disconnects, fall back to polling
  - Poll every 30 seconds when app is in foreground

Chapter 10: Mock Interview

Scenario: Design Instagram Feed

Interviewer: "Let's design the Instagram feed. Walk me through your approach."

You: "Sure. Let me start by understanding the scale and requirements.

Instagram has about 2 billion users, with maybe 500 million daily active. The average user follows around 150 accounts. Users expect to see posts from everyone they follow, and the feed should load instantly.

The core challenge is: when someone posts, how do we get that post into the feeds of all their followers?"

Interviewer: "Exactly. How would you approach that?"

You: "I'd use a hybrid push/pull model. The key insight is that most users have relatively few followers, but some celebrities have millions. These need different strategies.

For regular users with under, say, 10,000 followers, I'd use fan-out on write. When they post, we immediately push that post to all their followers' feed caches. Since it's a small number of followers, this is fast—maybe a few hundred cache writes.

For celebrities with 10,000+ followers, I'd use fan-out on read. When they post, we just store the post and index it. When a follower requests their feed, we pull the celebrity posts on demand and merge them with the pre-computed feed.

Interviewer: "How do you store the feeds?"

You: "I'd use Redis sorted sets. The key is feed:{user_id}, and the values are post IDs with timestamps as scores. This gives us efficient:

  • Adding posts: ZADD — O(log N)
  • Getting feed: ZREVRANGE — O(log N + M) for M posts
  • Trimming old posts: ZREMRANGEBYRANK — O(log N + M)

For 500 million users, each with a feed of 500 posts (just IDs), we're looking at maybe 100-200 TB of data. That's a lot, but Redis Cluster can handle it with sharding.

I'd shard by user_id, so each user's feed is on one shard. This keeps feed reads fast—just one shard hit.

Interviewer: "Walk me through what happens when I open Instagram."

You: "When you open the app:

  1. Fetch pre-computed feed: We read your feed from Redis—ZREVRANGE feed:{your_id} 0 49. This is instant, maybe 2ms.

  2. Check for celebrity follows: We look up which accounts you follow are celebrities. This is cached.

  3. Pull celebrity posts: For each celebrity you follow, we fetch their recent posts. These are stored separately in celebrity_posts:{user_id}. We do this in parallel—maybe 20 celebrity accounts, all fetched at once.

  4. Merge and rank: We combine your pre-computed feed with the celebrity posts. Instagram doesn't use pure chronological order—there's a ranking algorithm considering engagement, your affinity with the poster, content type, etc.

  5. Return top 50: We return the top-ranked posts for your initial feed load.

Total time: maybe 50-100ms, well under the 200ms target.

Interviewer: "What happens if a celebrity you follow posts while you have the app open?"

You: "Good question. There are a few approaches:

The simplest is polling. The app periodically checks for new posts—maybe every 30-60 seconds. The server returns a count of new posts, and the user can tap 'New Posts' to refresh.

For more real-time updates, we could use push notifications via WebSocket. When a celebrity posts, we notify online followers. But we'd rate-limit this—maybe batch notifications if multiple posts come in quickly.

The client shows a subtle 'New posts' indicator rather than immediately inserting posts into the feed, which would be jarring.

Interviewer: "How do you handle someone unfollowing an account?"

You: "When user A unfollows user B:

  1. Update the follow relationship in the database.

  2. If B is a regular user, remove B's posts from A's feed cache. We can do this by scanning A's feed and removing posts where user_id = B. Or we track which posts came from whom.

  3. If B is a celebrity, nothing to do—their posts were being pulled dynamically anyway. Next feed request just won't include B.

  4. Invalidate the cached 'following' list for A.

The deletion doesn't need to be instant—it's okay if A sees one more post from B before the cache updates. That's an acceptable trade-off for simplicity."


Summary

DAY 4 KEY TAKEAWAYS

THE FEED PROBLEM:
• Personalized feeds for millions of users
• Posts must appear in followers' feeds quickly
• Celebrity problem: 1 post → millions of feed updates

TWO FUNDAMENTAL APPROACHES:

Fan-Out on Write (Push):
  • When user posts, push to all followers' feeds
  • ✓ Fast reads (just read cache)
  • ✗ Write amplification
  • ✗ Celebrity problem (millions of writes per post)
  • Use for: Regular users with <10K followers

Fan-Out on Read (Pull):
  • When user reads feed, compute on demand
  • ✓ Fast writes (just save post)
  • ✓ No celebrity problem
  • ✗ Slow reads (must query many sources)
  • Use for: Celebrities, algorithmic feeds

HYBRID APPROACH (Production):
  • Regular users: Push (fan-out on write)
  • Celebrities: Pull (fan-out on read)
  • Merge at read time
  • Best of both worlds

ADVANCED PATTERNS:
  • Ranked feeds (not just chronological)
  • Sharded storage for scale
  • Feed warming for active users
  • Activity-based fan-out (skip dormant users)

KEY DECISIONS:
  • Celebrity threshold: 10K-100K followers
  • Feed cache size: 500-1000 posts
  • Parallel fetching for pull queries
  • Trim old posts to bound storage

INDUSTRY EXAMPLES:
  • Twitter: Mostly pull (many celebrities)
  • Instagram: Mostly push (fewer celebrities)
  • LinkedIn: Pull (2nd degree content)

📚 Further Reading

Engineering Blogs

Papers

  • "Feeding Frenzy: Selectively Materializing Users' Event Feeds" — SIGMOD
  • "The Anatomy of a Large-Scale Social Search Engine" — WWW

Books

  • "Designing Data-Intensive Applications" by Martin Kleppmann — Chapter 11

End of Day 4: Feed Caching

Tomorrow: Day 5 — Multi-Tier Caching. We'll explore how to design caching across multiple layers: CDN, API Gateway, Application, and Database. You'll learn what belongs at each layer, how to handle invalidation across tiers, and how to design for both authenticated and anonymous traffic.