Himanshu Kukreja
0%
LearnSystem DesignWeek 5Consistency Models
Day 01

Week 5 — Day 1: Consistency Models

System Design Mastery Series


Preface

Last week, you mastered caching. You learned to cache aggressively, invalidate strategically, and serve stale data for performance. You accepted "eventual consistency" as a trade-off.

But what does "consistent" actually mean?

THE CONSISTENCY CONFUSION

Developer A: "Our system is eventually consistent."
Developer B: "What does that mean exactly?"
Developer A: "Eventually, all replicas will have the same data."
Developer B: "How long is 'eventually'? Seconds? Hours?"
Developer A: "Uh... it depends?"

Developer C: "We need strong consistency for this feature."
Developer D: "What's the difference from eventual?"
Developer C: "Strong means... stronger? More consistent?"
Developer D: "That's not helpful."

The problem: "Consistency" means different things 
in different contexts, and most engineers can't 
articulate the precise guarantees they need.

Today, we'll fix that confusion. You'll learn:

  • The precise definitions of consistency models
  • When each model is appropriate
  • How to implement systems with specific consistency guarantees
  • The real meaning of CAP theorem

Part I: Foundations

Chapter 1: What Is Consistency?

1.1 The Core Question

Consistency answers a simple question: When a write happens, who sees it and when?

THE VISIBILITY QUESTION

Timeline:
  T=0: User A writes X=1 to Node 1
  T=1: User B reads X from Node 2
  
  Question: What does User B see?

Possible answers:
  A) X=1 (the new value) ← Strong consistency
  B) X=0 (the old value) ← Eventual consistency (temporarily)
  C) Error ("data unavailable") ← Choosing consistency over availability
  D) It depends on other factors ← Causal consistency

1.2 Why Distributed Systems Make This Hard

SINGLE MACHINE vs DISTRIBUTED

Single Machine:
┌─────────────────────────────────┐
│           Memory                │
│    ┌─────────────────────┐      │
│    │      X = 1          │      │
│    └─────────────────────┘      │
│              ▲                  │
│    Write ────┴──── Read         │
│    (instant visibility)         │
└─────────────────────────────────┘

Write happens, read sees it. Simple.


Distributed System:
┌─────────────┐         ┌─────────────┐
│   Node 1    │         │   Node 2    │
│  ┌───────┐  │ network │  ┌───────┐  │
│  │ X = 1 │  │ ──────► │  │ X = ? │  │
│  └───────┘  │ (delay) │  └───────┘  │
└─────────────┘         └─────────────┘
      ▲                       ▲
    Write                   Read
    (T=0)                   (T=1)

Write happens on Node 1.
Read happens on Node 2.
Network delay means Node 2 might not have the update yet.
What does the read return?

1.3 The Fundamental Trade-off

You cannot have all three simultaneously:

  1. Instant visibility (strong consistency)
  2. Low latency (fast responses)
  3. High availability (always responds)
THE CONSISTENCY TRADE-OFF TRIANGLE

                    Instant Visibility
                          ▲
                         /│\
                        / │ \
                       /  │  \
                      /   │   \
                     /    │    \
                    /     │     \
                   /      │      \
                  /───────┼───────\
        Low Latency ──────┴────── High Availability

Pick any two:
• Instant + Low Latency → Sacrifice availability (reject reads during replication)
• Instant + High Availability → Sacrifice latency (wait for all replicas)
• Low Latency + High Availability → Sacrifice instant visibility (eventual consistency)

Chapter 2: The Consistency Spectrum

2.1 Overview of Consistency Models

CONSISTENCY MODELS (Strongest to Weakest)

┌────────────────────────────────────────────────────────────────────────┐
│                                                                        │
│  LINEARIZABILITY (Strongest)                                           │
│  "Behaves like a single copy"                                          │
│  └── Every operation appears to happen atomically at some point        │
│      between invocation and response                                   │
│                                                                        │
├────────────────────────────────────────────────────────────────────────┤
│                                                                        │
│  SEQUENTIAL CONSISTENCY                                                │
│  "All nodes see the same order"                                        │
│  └── Operations appear in some sequential order consistent with        │
│      the order seen by each individual node                            │
│                                                                        │
├────────────────────────────────────────────────────────────────────────┤
│                                                                        │
│  CAUSAL CONSISTENCY                                                    │
│  "Cause before effect"                                                 │
│  └── If operation A causally precedes B, everyone sees A before B      │
│      Concurrent operations may be seen in different orders             │
│                                                                        │
├────────────────────────────────────────────────────────────────────────┤
│                                                                        │
│  READ-YOUR-WRITES                                                      │
│  "See your own updates"                                                │
│  └── A client always sees its own previous writes                      │
│      May not see other clients' writes immediately                     │
│                                                                        │
├────────────────────────────────────────────────────────────────────────┤
│                                                                        │
│  MONOTONIC READS                                                       │
│  "Never go backwards"                                                  │
│  └── Once you read a value, you never read an older value              │
│                                                                        │
├────────────────────────────────────────────────────────────────────────┤
│                                                                        │
│  EVENTUAL CONSISTENCY (Weakest)                                        │
│  "Eventually converges"                                                │
│  └── If no new writes, all replicas will eventually have same value    │
│      No guarantees about when or what you see before convergence       │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

2.2 Linearizability (Strong Consistency)

The gold standard. The system behaves as if there's only one copy of the data.

LINEARIZABILITY EXPLAINED

Requirement: Every operation appears to take effect atomically 
at some point between its start and end.

Example:
                    Time ────────────────────────────►
                    
Client A: ├───Write(X=1)───┤
                                 ├───Read(X)───┤ → Must return 1
                                 
Client B:      ├───Read(X)───┤
               Could return 0 or 1, depending on where 
               Write(X=1) takes effect

If B's read returns 1, then any read that STARTS after B's read
must also return 1 (or a newer value).


Real-world analogy:
  Imagine a global clock visible to everyone.
  Each operation gets a timestamp.
  Everyone agrees on the order based on timestamps.

When to use:

  • Financial transactions
  • Inventory reservations
  • Unique constraint enforcement
  • Leader election

Cost:

  • Higher latency (must coordinate)
  • Lower availability (rejects operations during partitions)

2.3 Sequential Consistency

Weaker than linearizable, but still provides a global order.

SEQUENTIAL CONSISTENCY EXPLAINED

Requirement: All nodes see operations in the SAME order,
but that order doesn't have to match real-time.

Example:
Real time:
  Client A: Write(X=1) at T=0
  Client B: Write(X=2) at T=1
  
Sequential consistency allows:
  All nodes see: Write(X=2), then Write(X=1)
  As long as ALL nodes agree on this order

Real-world analogy:
  Like a message board where posts appear in order,
  but the order might not match when they were written.
  Everyone sees the same order of posts.

Difference from linearizability:
  Linearizable: If Write(X=1) finishes before Write(X=2) starts,
                everyone must see X=1 before X=2
  Sequential:   The writes might appear in any order,
                as long as everyone agrees

2.4 Causal Consistency

Respects cause-and-effect relationships.

CAUSAL CONSISTENCY EXPLAINED

Requirement: If operation A "happens before" B (causally),
everyone sees A before B. Concurrent operations can be in any order.

Example: Social Media Comments
  
  Alice posts: "I'm getting married!"     (Post P1)
  Bob comments: "Congratulations!"        (Comment C1 on P1)
  
  C1 is causally dependent on P1 (Bob saw the post, then commented)
  
  Causal consistency guarantees:
  ✓ Everyone sees P1 before C1
  
  Carol posts: "Nice weather today"       (Post P2, concurrent with P1)
  
  P2 is concurrent with P1 (no causal relationship)
  
  Causal consistency allows:
  ✓ Some users see P1, P2, C1
  ✓ Some users see P2, P1, C1
  ✗ No user sees C1 before P1

How it works:
  Track causal dependencies (vector clocks, version vectors)
  Delay applying updates until their dependencies are satisfied

When to use:

  • Social media feeds
  • Collaborative applications
  • Chat systems
  • Any system where cause-effect matters but global order doesn't

2.5 Read-Your-Writes

A client always sees its own updates.

READ-YOUR-WRITES EXPLAINED

Requirement: If a client writes X, subsequent reads by that 
SAME CLIENT will see X (or a later value).

Example: Profile Update
  
  User updates profile picture
  User refreshes page
  
  Without read-your-writes:
    User might see old picture (read went to stale replica)
    User thinks: "Did my update work? Let me try again..."
  
  With read-your-writes:
    User always sees their new picture
    Confidence in the system

Implementation strategies:
  1. Sticky sessions: Route user to same replica
  2. Write to leader, read from leader (for that user)
  3. Track write timestamp, reject stale reads

2.6 Monotonic Reads

Never go backwards in time.

MONOTONIC READS EXPLAINED

Requirement: Once a client reads value V, it never 
reads a value older than V.

Example: Reading a Counter
  
  Counter value over time: 5 → 6 → 7 → 8
  
  Without monotonic reads:
    Read 1: Returns 7 (from replica A)
    Read 2: Returns 5 (from stale replica B)
    User thinks the counter went backwards!
  
  With monotonic reads:
    Read 1: Returns 7
    Read 2: Returns 7, 8, or newer (never 5 or 6)

Implementation:
  Track the "version" of last read
  Only accept reads from replicas with >= that version

2.7 Eventual Consistency

The weakest guarantee, but the most available.

EVENTUAL CONSISTENCY EXPLAINED

Requirement: If no new writes occur, all replicas will 
EVENTUALLY converge to the same value.

What it DOESN'T guarantee:
  - When convergence happens
  - What you see before convergence
  - Read-your-writes
  - Monotonic reads
  - Causal ordering

Example: DNS
  
  You update your domain's IP address
  Propagation time: "up to 48 hours"
  Different users see different IPs during propagation
  Eventually, everyone sees the new IP

When it's acceptable:
  - Caching (Week 4!)
  - View counts, likes
  - Analytics data
  - Non-critical reads

2.8 Comparison Table

Model Guarantee Latency Availability Use Case
Linearizable Single-copy behavior High Low Transactions, locks
Sequential Global order agreed Medium Medium Logs, message queues
Causal Cause before effect Low High Social, collaboration
Read-your-writes See own updates Low High User profiles
Monotonic reads Never go back Low High Counters, feeds
Eventual Eventually same Lowest Highest Caching, analytics

Chapter 3: CAP Theorem in Practice

3.1 The Theorem (Correctly Stated)

CAP THEOREM

In a distributed system experiencing a network PARTITION,
you must choose between CONSISTENCY and AVAILABILITY.

C - Consistency: All nodes see the same data at the same time
A - Availability: Every request receives a response
P - Partition Tolerance: System continues despite network failures

Key insight: P is not optional. Networks WILL partition.
So the choice is really: CP or AP during partitions.

During NORMAL operation (no partition):
  You CAN have both C and A!
  
During PARTITION:
  Choose C: Reject requests to maintain consistency
  Choose A: Allow stale reads/writes to maintain availability

3.2 CP vs AP: Real Examples

CP SYSTEMS (Consistency over Availability)

Examples:
  - Traditional RDBMS with sync replication
  - ZooKeeper, etcd (coordination services)
  - Google Spanner (uses GPS + atomic clocks)

Behavior during partition:
  - Minority partition becomes unavailable
  - Majority partition continues with consistency
  - "It's better to be unavailable than wrong"

Use for:
  - Financial systems
  - Inventory management
  - Leader election
  - Anything where incorrect data is worse than no data


AP SYSTEMS (Availability over Consistency)

Examples:
  - Cassandra (by default)
  - DynamoDB (eventually consistent reads)
  - DNS
  - Caching layers

Behavior during partition:
  - All nodes continue accepting reads/writes
  - Different nodes may have different data
  - Conflicts resolved later (last-write-wins, CRDTs, etc.)

Use for:
  - Shopping carts
  - User sessions
  - View counts
  - Anything where availability is critical

3.3 The Modern View: Tunable Consistency

TUNABLE CONSISTENCY

Modern databases don't force a single choice.
You can choose consistency per-operation.

DynamoDB:
  - Eventually consistent reads (default, fast, cheap)
  - Strongly consistent reads (option, slower, costlier)

Cassandra:
  - Consistency level per query
  - ONE: Read/write to any single replica (fast, weak)
  - QUORUM: Read/write to majority (balanced)
  - ALL: Read/write to all replicas (slow, strong)

Your application:
  - Browse products: Eventually consistent (cache hit)
  - Check inventory for display: Eventually consistent
  - Reserve inventory: Strongly consistent
  - Checkout: Strongly consistent
  - View order history: Read-your-writes

Chapter 4: Implementing Consistency

4.1 Implementing Read-Your-Writes

# Read-Your-Writes Implementation

from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Any
import time


@dataclass
class WriteRecord:
    """Track a client's last write."""
    key: str
    timestamp: float
    replica_id: str


class ReadYourWritesClient:
    """
    Client that guarantees read-your-writes consistency.
    
    Tracks writes and ensures subsequent reads see them.
    """
    
    def __init__(self, replicas: list, session_id: str):
        self.replicas = replicas
        self.session_id = session_id
        self.last_writes: dict[str, WriteRecord] = {}
    
    async def write(self, key: str, value: Any) -> bool:
        """Write to primary and track the write."""
        primary = self._get_primary()
        
        timestamp = time.time()
        success = await primary.write(key, value, timestamp)
        
        if success:
            # Track this write for read-your-writes
            self.last_writes[key] = WriteRecord(
                key=key,
                timestamp=timestamp,
                replica_id=primary.id
            )
        
        return success
    
    async def read(self, key: str) -> Any:
        """Read with read-your-writes guarantee."""
        last_write = self.last_writes.get(key)
        
        if last_write:
            # We wrote this key - ensure we see our write
            return await self._read_with_min_version(key, last_write.timestamp)
        else:
            # We didn't write this key - any replica is fine
            return await self._read_from_any(key)
    
    async def _read_with_min_version(self, key: str, min_timestamp: float) -> Any:
        """Read ensuring we see at least version min_timestamp."""
        
        # Strategy 1: Read from the replica we wrote to
        for replica in self.replicas:
            if replica.id == self.last_writes[key].replica_id:
                return await replica.read(key)
        
        # Strategy 2: Query replicas until we find one with our version
        for replica in self.replicas:
            value, timestamp = await replica.read_with_version(key)
            if timestamp >= min_timestamp:
                return value
        
        # Strategy 3: Wait for replication (with timeout)
        return await self._wait_for_replication(key, min_timestamp)
    
    async def _wait_for_replication(
        self, 
        key: str, 
        min_timestamp: float,
        timeout: float = 5.0
    ) -> Any:
        """Wait for a replica to catch up."""
        deadline = time.time() + timeout
        
        while time.time() < deadline:
            for replica in self.replicas:
                value, timestamp = await replica.read_with_version(key)
                if timestamp >= min_timestamp:
                    return value
            
            await asyncio.sleep(0.1)
        
        raise TimeoutError(f"Replica did not catch up within {timeout}s")
    
    async def _read_from_any(self, key: str) -> Any:
        """Read from any available replica."""
        for replica in self.replicas:
            try:
                return await replica.read(key)
            except Exception:
                continue
        raise Exception("All replicas unavailable")
    
    def _get_primary(self):
        """Get the primary replica for writes."""
        return self.replicas[0]  # Simplified

4.2 Implementing Monotonic Reads

# Monotonic Reads Implementation

@dataclass
class VersionedValue:
    """Value with version for comparison."""
    value: Any
    version: int  # Logical clock or timestamp


class MonotonicReadsClient:
    """
    Client that guarantees monotonic reads.
    
    Once you read version N, you never read version < N.
    """
    
    def __init__(self, replicas: list, session_id: str):
        self.replicas = replicas
        self.session_id = session_id
        # Track minimum version seen per key
        self.min_versions: dict[str, int] = {}
    
    async def read(self, key: str) -> Any:
        """Read with monotonic guarantee."""
        min_version = self.min_versions.get(key, 0)
        
        # Try replicas until we find one with sufficient version
        for replica in self._shuffled_replicas():
            try:
                result = await replica.read_versioned(key)
                
                if result.version >= min_version:
                    # Update our minimum version
                    self.min_versions[key] = result.version
                    return result.value
                else:
                    # This replica is stale, try another
                    continue
                    
            except Exception:
                continue
        
        # All replicas are stale or unavailable
        raise Exception(f"No replica has version >= {min_version}")
    
    def _shuffled_replicas(self):
        """Shuffle replicas for load balancing."""
        import random
        replicas = self.replicas.copy()
        random.shuffle(replicas)
        return replicas


# Combined: Read-Your-Writes + Monotonic Reads
class SessionConsistencyClient:
    """
    Combines read-your-writes and monotonic reads.
    
    This is what most users expect from a "consistent" system.
    """
    
    def __init__(self, replicas: list, session_id: str):
        self.replicas = replicas
        self.session_id = session_id
        
        # Track writes and read versions
        self.write_versions: dict[str, int] = {}
        self.read_versions: dict[str, int] = {}
    
    async def write(self, key: str, value: Any) -> int:
        """Write and track version."""
        primary = self.replicas[0]
        version = await primary.write(key, value)
        
        self.write_versions[key] = version
        self.read_versions[key] = max(
            self.read_versions.get(key, 0),
            version
        )
        
        return version
    
    async def read(self, key: str) -> Any:
        """Read with session consistency."""
        # Minimum version is max of:
        # - Our last write version
        # - Our last read version
        min_version = max(
            self.write_versions.get(key, 0),
            self.read_versions.get(key, 0)
        )
        
        for replica in self.replicas:
            result = await replica.read_versioned(key)
            if result.version >= min_version:
                self.read_versions[key] = result.version
                return result.value
        
        raise Exception("Consistency requirement not satisfiable")

4.3 Implementing Causal Consistency

# Causal Consistency with Vector Clocks

from dataclasses import dataclass, field
from typing import Dict, Any, Optional


@dataclass
class VectorClock:
    """
    Vector clock for tracking causal dependencies.
    
    Each node maintains a counter. The vector clock
    is the collection of all node counters.
    """
    clocks: Dict[str, int] = field(default_factory=dict)
    
    def increment(self, node_id: str):
        """Increment this node's clock (on local event)."""
        self.clocks[node_id] = self.clocks.get(node_id, 0) + 1
    
    def merge(self, other: 'VectorClock'):
        """Merge with another vector clock (on receive)."""
        for node_id, count in other.clocks.items():
            self.clocks[node_id] = max(
                self.clocks.get(node_id, 0),
                count
            )
    
    def happens_before(self, other: 'VectorClock') -> bool:
        """Check if self happened before other."""
        # self < other if:
        # - All of self's clocks are <= other's
        # - At least one is strictly <
        all_leq = all(
            self.clocks.get(node, 0) <= other.clocks.get(node, 0)
            for node in set(self.clocks) | set(other.clocks)
        )
        some_lt = any(
            self.clocks.get(node, 0) < other.clocks.get(node, 0)
            for node in set(self.clocks) | set(other.clocks)
        )
        return all_leq and some_lt
    
    def concurrent_with(self, other: 'VectorClock') -> bool:
        """Check if self and other are concurrent."""
        return not self.happens_before(other) and not other.happens_before(self)
    
    def copy(self) -> 'VectorClock':
        return VectorClock(clocks=self.clocks.copy())


@dataclass
class CausalMessage:
    """Message with causal metadata."""
    key: str
    value: Any
    vector_clock: VectorClock
    dependencies: list  # List of message IDs this depends on


class CausalConsistencyNode:
    """
    Node implementing causal consistency.
    
    Delays applying updates until their causal
    dependencies are satisfied.
    """
    
    def __init__(self, node_id: str):
        self.node_id = node_id
        self.vector_clock = VectorClock()
        self.data: Dict[str, Any] = {}
        self.pending: list[CausalMessage] = []
        self.delivered: set = set()  # Message IDs we've applied
    
    def local_write(self, key: str, value: Any) -> CausalMessage:
        """Perform a local write operation."""
        # Increment our clock
        self.vector_clock.increment(self.node_id)
        
        # Create message with current vector clock
        msg = CausalMessage(
            key=key,
            value=value,
            vector_clock=self.vector_clock.copy(),
            dependencies=list(self.delivered)
        )
        
        # Apply locally
        self.data[key] = value
        self.delivered.add(id(msg))
        
        return msg  # Broadcast this to other nodes
    
    def receive_message(self, msg: CausalMessage):
        """Receive a message from another node."""
        if self._dependencies_satisfied(msg):
            self._apply_message(msg)
            self._check_pending()
        else:
            # Buffer until dependencies are satisfied
            self.pending.append(msg)
    
    def _dependencies_satisfied(self, msg: CausalMessage) -> bool:
        """Check if all dependencies have been delivered."""
        return all(dep in self.delivered for dep in msg.dependencies)
    
    def _apply_message(self, msg: CausalMessage):
        """Apply a message to local state."""
        self.data[msg.key] = msg.value
        self.vector_clock.merge(msg.vector_clock)
        self.delivered.add(id(msg))
    
    def _check_pending(self):
        """Check if any pending messages can now be applied."""
        made_progress = True
        while made_progress:
            made_progress = False
            for msg in list(self.pending):
                if self._dependencies_satisfied(msg):
                    self._apply_message(msg)
                    self.pending.remove(msg)
                    made_progress = True


# Example usage: Causal social media
class CausalSocialFeed:
    """
    Social feed with causal consistency.
    
    Guarantees: Comments always appear after their parent posts.
    """
    
    def __init__(self, user_id: str, node: CausalConsistencyNode):
        self.user_id = user_id
        self.node = node
    
    def create_post(self, content: str) -> str:
        """Create a new post."""
        post_id = generate_id()
        
        msg = self.node.local_write(
            key=f"post:{post_id}",
            value={
                "id": post_id,
                "author": self.user_id,
                "content": content,
                "type": "post"
            }
        )
        
        # Broadcast msg to other nodes
        return post_id
    
    def comment_on_post(self, post_id: str, content: str) -> str:
        """Comment on an existing post."""
        comment_id = generate_id()
        
        # The comment causally depends on the post
        # (We saw the post, so our vector clock includes it)
        msg = self.node.local_write(
            key=f"comment:{comment_id}",
            value={
                "id": comment_id,
                "parent_post": post_id,
                "author": self.user_id,
                "content": content,
                "type": "comment"
            }
        )
        
        # When other nodes receive this comment,
        # they won't display it until they've seen the parent post
        return comment_id

Part II: Production Implementation

Chapter 5: Consistency-Aware Data Access Layer

# Production Consistency-Aware Data Access Layer

import asyncio
from dataclasses import dataclass
from typing import Any, Optional, List, Callable
from enum import Enum
from datetime import datetime
import logging

logger = logging.getLogger(__name__)


class ConsistencyLevel(Enum):
    """Consistency levels for operations."""
    
    EVENTUAL = "eventual"
    # Read from any replica, may be stale
    # Fastest, highest availability
    
    MONOTONIC = "monotonic"
    # Never read older value than previously seen
    # Requires tracking read versions per session
    
    READ_YOUR_WRITES = "read_your_writes"
    # Always see your own writes
    # Requires tracking write versions per session
    
    SESSION = "session"
    # Combination of monotonic + read-your-writes
    # What users typically expect
    
    CAUSAL = "causal"
    # Respects causal ordering
    # Requires vector clocks or dependency tracking
    
    STRONG = "strong"
    # Linearizable, single-copy semantics
    # Slowest, may be unavailable during partitions


@dataclass
class ConsistencyContext:
    """Context for tracking consistency requirements."""
    session_id: str
    write_versions: dict = None
    read_versions: dict = None
    vector_clock: dict = None
    
    def __post_init__(self):
        self.write_versions = self.write_versions or {}
        self.read_versions = self.read_versions or {}
        self.vector_clock = self.vector_clock or {}


class ConsistencyAwareRepository:
    """
    Repository with configurable consistency levels.
    
    Allows different operations to use different consistency
    levels based on their requirements.
    """
    
    def __init__(
        self,
        primary_db,              # Primary database connection
        read_replicas: list,     # Read replica connections
        cache,                   # Cache layer (Redis)
        default_level: ConsistencyLevel = ConsistencyLevel.SESSION
    ):
        self.primary = primary_db
        self.replicas = read_replicas
        self.cache = cache
        self.default_level = default_level
    
    async def read(
        self,
        key: str,
        consistency: ConsistencyLevel = None,
        context: ConsistencyContext = None
    ) -> Optional[Any]:
        """
        Read with specified consistency level.
        """
        level = consistency or self.default_level
        context = context or ConsistencyContext(session_id="default")
        
        if level == ConsistencyLevel.EVENTUAL:
            return await self._read_eventual(key)
        
        elif level == ConsistencyLevel.MONOTONIC:
            return await self._read_monotonic(key, context)
        
        elif level == ConsistencyLevel.READ_YOUR_WRITES:
            return await self._read_your_writes(key, context)
        
        elif level == ConsistencyLevel.SESSION:
            return await self._read_session(key, context)
        
        elif level == ConsistencyLevel.STRONG:
            return await self._read_strong(key)
        
        else:
            raise ValueError(f"Unknown consistency level: {level}")
    
    async def write(
        self,
        key: str,
        value: Any,
        consistency: ConsistencyLevel = None,
        context: ConsistencyContext = None
    ) -> int:
        """
        Write with specified consistency level.
        Returns version number of the write.
        """
        level = consistency or self.default_level
        context = context or ConsistencyContext(session_id="default")
        
        if level == ConsistencyLevel.STRONG:
            version = await self._write_strong(key, value)
        else:
            version = await self._write_async(key, value)
        
        # Update context for read-your-writes
        context.write_versions[key] = version
        context.read_versions[key] = max(
            context.read_versions.get(key, 0),
            version
        )
        
        return version
    
    # =========================================================================
    # Eventual Consistency
    # =========================================================================
    
    async def _read_eventual(self, key: str) -> Optional[Any]:
        """Read from cache or any replica. May be stale."""
        # Try cache first
        cached = await self.cache.get(key)
        if cached is not None:
            return cached
        
        # Try replicas (load balanced)
        import random
        replicas = self.replicas.copy()
        random.shuffle(replicas)
        
        for replica in replicas:
            try:
                result = await replica.fetch_one(
                    "SELECT value, version FROM data WHERE key = $1",
                    key
                )
                if result:
                    # Cache for future reads
                    await self.cache.set(key, result['value'], ttl=60)
                    return result['value']
            except Exception as e:
                logger.warning(f"Replica read failed: {e}")
                continue
        
        # Fall back to primary
        return await self._read_from_primary(key)
    
    # =========================================================================
    # Monotonic Reads
    # =========================================================================
    
    async def _read_monotonic(
        self,
        key: str,
        context: ConsistencyContext
    ) -> Optional[Any]:
        """Read ensuring we never go backwards."""
        min_version = context.read_versions.get(key, 0)
        
        # Try replicas that have sufficient version
        for replica in self.replicas:
            try:
                result = await replica.fetch_one(
                    """
                    SELECT value, version FROM data 
                    WHERE key = $1 AND version >= $2
                    """,
                    key, min_version
                )
                if result:
                    context.read_versions[key] = result['version']
                    return result['value']
            except Exception:
                continue
        
        # Fall back to primary (always has latest)
        result = await self._read_from_primary_versioned(key)
        if result and result['version'] >= min_version:
            context.read_versions[key] = result['version']
            return result['value']
        
        return None
    
    # =========================================================================
    # Read-Your-Writes
    # =========================================================================
    
    async def _read_your_writes(
        self,
        key: str,
        context: ConsistencyContext
    ) -> Optional[Any]:
        """Read ensuring we see our own writes."""
        write_version = context.write_versions.get(key)
        
        if write_version is None:
            # We haven't written this key, any replica is fine
            return await self._read_eventual(key)
        
        # We wrote this key, ensure we see our write
        return await self._read_with_min_version(key, write_version)
    
    async def _read_with_min_version(
        self,
        key: str,
        min_version: int
    ) -> Optional[Any]:
        """Read from replica with at least min_version."""
        # Try replicas
        for replica in self.replicas:
            result = await replica.fetch_one(
                """
                SELECT value, version FROM data 
                WHERE key = $1 AND version >= $2
                """,
                key, min_version
            )
            if result:
                return result['value']
        
        # Replicas don't have our version yet, use primary
        return await self._read_from_primary(key)
    
    # =========================================================================
    # Session Consistency (Monotonic + Read-Your-Writes)
    # =========================================================================
    
    async def _read_session(
        self,
        key: str,
        context: ConsistencyContext
    ) -> Optional[Any]:
        """Read with session consistency guarantees."""
        # Min version is max of write version and read version
        min_version = max(
            context.write_versions.get(key, 0),
            context.read_versions.get(key, 0)
        )
        
        if min_version == 0:
            return await self._read_eventual(key)
        
        result = await self._read_with_min_version(key, min_version)
        if result:
            # Update read version for monotonic reads
            context.read_versions[key] = min_version
        
        return result
    
    # =========================================================================
    # Strong Consistency
    # =========================================================================
    
    async def _read_strong(self, key: str) -> Optional[Any]:
        """Read from primary only. Linearizable."""
        return await self._read_from_primary(key)
    
    async def _write_strong(self, key: str, value: Any) -> int:
        """Write with synchronous replication."""
        async with self.primary.transaction():
            # Write to primary
            result = await self.primary.fetch_one(
                """
                INSERT INTO data (key, value, version)
                VALUES ($1, $2, 1)
                ON CONFLICT (key) DO UPDATE 
                SET value = $2, version = data.version + 1
                RETURNING version
                """,
                key, value
            )
            version = result['version']
            
            # Synchronously replicate to all replicas
            # (This is slow but guarantees strong consistency)
            await self._sync_replicate(key, value, version)
            
            # Invalidate cache
            await self.cache.delete(key)
            
            return version
    
    async def _write_async(self, key: str, value: Any) -> int:
        """Write to primary with async replication."""
        result = await self.primary.fetch_one(
            """
            INSERT INTO data (key, value, version)
            VALUES ($1, $2, 1)
            ON CONFLICT (key) DO UPDATE 
            SET value = $2, version = data.version + 1
            RETURNING version
            """,
            key, value
        )
        version = result['version']
        
        # Async replication (don't wait)
        asyncio.create_task(self._async_replicate(key, value, version))
        
        # Invalidate cache
        await self.cache.delete(key)
        
        return version
    
    # =========================================================================
    # Helper Methods
    # =========================================================================
    
    async def _read_from_primary(self, key: str) -> Optional[Any]:
        """Read directly from primary."""
        result = await self.primary.fetch_one(
            "SELECT value FROM data WHERE key = $1",
            key
        )
        return result['value'] if result else None
    
    async def _read_from_primary_versioned(self, key: str) -> Optional[dict]:
        """Read with version from primary."""
        return await self.primary.fetch_one(
            "SELECT value, version FROM data WHERE key = $1",
            key
        )
    
    async def _sync_replicate(self, key: str, value: Any, version: int):
        """Synchronously replicate to all replicas."""
        await asyncio.gather(*[
            replica.execute(
                """
                INSERT INTO data (key, value, version) VALUES ($1, $2, $3)
                ON CONFLICT (key) DO UPDATE SET value = $2, version = $3
                WHERE data.version < $3
                """,
                key, value, version
            )
            for replica in self.replicas
        ])
    
    async def _async_replicate(self, key: str, value: Any, version: int):
        """Asynchronously replicate to replicas."""
        for replica in self.replicas:
            try:
                await replica.execute(
                    """
                    INSERT INTO data (key, value, version) VALUES ($1, $2, $3)
                    ON CONFLICT (key) DO UPDATE SET value = $2, version = $3
                    WHERE data.version < $3
                    """,
                    key, value, version
                )
            except Exception as e:
                logger.warning(f"Async replication failed: {e}")

Chapter 6: Real-World Consistency Patterns

6.1 The Inventory Pattern

# Inventory with Appropriate Consistency Levels

class InventoryService:
    """
    Inventory service with consistency-aware operations.
    
    Different operations have different consistency requirements:
    - Display: Eventual consistency (from cache)
    - Reservation: Strong consistency (prevent oversell)
    - Release: Strong consistency (prevent under-count)
    """
    
    def __init__(
        self,
        repository: ConsistencyAwareRepository,
        cache
    ):
        self.repo = repository
        self.cache = cache
    
    async def get_display_count(self, product_id: str) -> int:
        """
        Get inventory count for DISPLAY purposes.
        
        Eventual consistency is fine - showing stale count
        on a product page is acceptable.
        """
        # Try cache first (eventual consistency)
        cache_key = f"inventory:{product_id}"
        cached = await self.cache.get(cache_key)
        
        if cached is not None:
            return cached
        
        # Read from replica (eventual consistency)
        count = await self.repo.read(
            key=f"inventory:{product_id}",
            consistency=ConsistencyLevel.EVENTUAL
        )
        
        # Cache for future reads
        if count is not None:
            await self.cache.set(cache_key, count, ttl=30)
        
        return count or 0
    
    async def reserve(
        self,
        product_id: str,
        quantity: int,
        reservation_id: str
    ) -> bool:
        """
        Reserve inventory for checkout.
        
        STRONG consistency required - we must prevent overselling.
        Uses optimistic locking with version check.
        """
        async with self.repo.primary.transaction(isolation="SERIALIZABLE"):
            # Read current inventory with lock
            result = await self.repo.primary.fetch_one(
                """
                SELECT quantity, version FROM inventory 
                WHERE product_id = $1 FOR UPDATE
                """,
                product_id
            )
            
            if not result or result['quantity'] < quantity:
                return False
            
            # Decrement inventory
            await self.repo.primary.execute(
                """
                UPDATE inventory 
                SET quantity = quantity - $2, version = version + 1
                WHERE product_id = $1
                """,
                product_id, quantity
            )
            
            # Record reservation
            await self.repo.primary.execute(
                """
                INSERT INTO reservations (id, product_id, quantity, status, created_at)
                VALUES ($1, $2, $3, 'active', NOW())
                """,
                reservation_id, product_id, quantity
            )
            
            # Invalidate cache
            await self.cache.delete(f"inventory:{product_id}")
            
            return True
    
    async def release(self, reservation_id: str) -> bool:
        """
        Release a reservation (order cancelled).
        
        STRONG consistency - must accurately restore inventory.
        """
        async with self.repo.primary.transaction():
            # Get reservation
            reservation = await self.repo.primary.fetch_one(
                """
                SELECT product_id, quantity, status FROM reservations
                WHERE id = $1 FOR UPDATE
                """,
                reservation_id
            )
            
            if not reservation or reservation['status'] != 'active':
                return False
            
            # Restore inventory
            await self.repo.primary.execute(
                """
                UPDATE inventory 
                SET quantity = quantity + $2, version = version + 1
                WHERE product_id = $1
                """,
                reservation['product_id'], reservation['quantity']
            )
            
            # Mark reservation as released
            await self.repo.primary.execute(
                """
                UPDATE reservations SET status = 'released' WHERE id = $1
                """,
                reservation_id
            )
            
            # Invalidate cache
            await self.cache.delete(f"inventory:{reservation['product_id']}")
            
            return True
    
    async def confirm(self, reservation_id: str) -> bool:
        """
        Confirm a reservation (order completed).
        
        Changes reservation status, inventory already decremented.
        """
        result = await self.repo.primary.execute(
            """
            UPDATE reservations SET status = 'confirmed'
            WHERE id = $1 AND status = 'active'
            """,
            reservation_id
        )
        return result.rowcount > 0

6.2 The Profile Pattern

# User Profile with Read-Your-Writes

class ProfileService:
    """
    User profile service with session consistency.
    
    Users expect to see their own updates immediately.
    Other users' updates can be eventually consistent.
    """
    
    def __init__(self, repository: ConsistencyAwareRepository):
        self.repo = repository
    
    async def get_profile(
        self,
        user_id: str,
        viewer_id: str,
        context: ConsistencyContext
    ) -> dict:
        """
        Get a user's profile.
        
        If viewing your own profile: Read-your-writes
        If viewing someone else's: Eventual consistency
        """
        if user_id == viewer_id:
            # Viewing own profile - see your updates
            consistency = ConsistencyLevel.READ_YOUR_WRITES
        else:
            # Viewing someone else - eventual is fine
            consistency = ConsistencyLevel.EVENTUAL
        
        return await self.repo.read(
            key=f"profile:{user_id}",
            consistency=consistency,
            context=context
        )
    
    async def update_profile(
        self,
        user_id: str,
        updates: dict,
        context: ConsistencyContext
    ) -> dict:
        """Update user's profile."""
        # Merge with existing profile
        current = await self.repo.read(
            key=f"profile:{user_id}",
            consistency=ConsistencyLevel.STRONG,  # Get latest for merge
            context=context
        )
        
        updated = {**(current or {}), **updates}
        
        await self.repo.write(
            key=f"profile:{user_id}",
            value=updated,
            context=context  # Track for read-your-writes
        )
        
        return updated

Part III: Real-World Application

Chapter 7: Case Studies

7.1 Case Study: Amazon DynamoDB

DYNAMODB CONSISTENCY MODEL

DynamoDB offers tunable consistency per-read:

EVENTUALLY CONSISTENT READS (Default)
├── Read from any replica
├── May return stale data
├── Lower latency, higher throughput
├── 50% cheaper than strongly consistent
└── Use for: Product catalog, user preferences

STRONGLY CONSISTENT READS
├── Read from leader replica
├── Always returns latest data
├── Higher latency
├── May be unavailable during leader election
└── Use for: Inventory, financial data

Example API usage:
  
  # Eventually consistent (default)
  response = dynamodb.get_item(
      TableName='Products',
      Key={'product_id': {'S': '12345'}}
  )
  
  # Strongly consistent
  response = dynamodb.get_item(
      TableName='Products',
      Key={'product_id': {'S': '12345'}},
      ConsistentRead=True
  )

Key insight:
  DynamoDB forces you to think about consistency
  at the operation level, not system level.

7.2 Case Study: Google Spanner

GOOGLE SPANNER: STRONG CONSISTENCY AT GLOBAL SCALE

The impossible made possible:
├── Strong consistency (linearizability)
├── Global distribution (multiple continents)
├── High availability (99.999% SLA)

How?

1. TRUETIME API
   ├── GPS receivers + atomic clocks in every datacenter
   ├── Returns: [earliest, latest] time bounds
   ├── Uncertainty typically < 7ms
   └── Enables: Commit-wait for global ordering

2. PAXOS FOR REPLICATION
   ├── Each shard replicated via Paxos
   ├── Writes go to Paxos leader
   ├── Synchronous replication within region
   └── Cross-region replication for disaster recovery

3. TWO-PHASE COMMIT WITH PAXOS
   ├── Transactions span shards
   ├── Each shard is Paxos group
   ├── Coordinator also Paxos group
   └── Survives any single failure

Trade-off:
  Write latency: ~10-20ms (commit-wait)
  Cross-region writes: ~100ms+
  
  But: TRUE global strong consistency
  No "eventually consistent" surprises

7.3 Case Study: Facebook TAO

FACEBOOK TAO: CACHING WITH CONSISTENCY

TAO (The Associations and Objects):
  Distributed graph store for social data

Challenge:
  - Billions of reads/second
  - Strong consistency for user's own data
  - Eventual OK for social graph

Solution: Read-your-writes via sticky routing

1. WRITE PATH
   ├── Write to MySQL (source of truth)
   ├── Invalidate local cache
   ├── Async invalidation to other caches
   └── Leader cache in each region

2. READ PATH
   ├── User's own data: Route to consistent cache
   ├── Social graph: Any cache (eventual)
   └── Leader cache handles read-your-writes

3. CACHE INVALIDATION
   ├── Local: Synchronous
   ├── Cross-datacenter: Async via refill
   └── Bounded staleness: ~1 second

Key insight:
  Different consistency for different access patterns.
  Users care about their own data being fresh.
  They don't notice if friend's data is 1s stale.

Chapter 8: Common Mistakes

8.1 Mistake 1: Assuming Strong Consistency

❌ WRONG: Reading from replica after write

async def update_and_read(user_id: str, new_email: str):
    # Write to primary
    await primary.execute(
        "UPDATE users SET email = $1 WHERE id = $2",
        new_email, user_id
    )
    
    # Read from replica immediately
    user = await replica.fetch_one(
        "SELECT * FROM users WHERE id = $1",
        user_id
    )
    
    # BUG: user.email might still be the old value!
    # Replication hasn't happened yet
    return user


✅ CORRECT: Read from primary for read-your-writes

async def update_and_read(user_id: str, new_email: str):
    # Write to primary
    await primary.execute(
        "UPDATE users SET email = $1 WHERE id = $2",
        new_email, user_id
    )
    
    # Read from PRIMARY for read-your-writes guarantee
    user = await primary.fetch_one(
        "SELECT * FROM users WHERE id = $1",
        user_id
    )
    
    return user

8.2 Mistake 2: Ignoring Monotonic Reads

❌ WRONG: Reading from random replicas

async def poll_for_updates(key: str):
    """Poll for updates - broken implementation."""
    while True:
        # Each read might hit a different replica
        value = await random_replica().read(key)
        
        # BUG: Value might go backwards!
        # Replica A has version 5
        # Replica B has version 3
        # User sees: 5, 3, 5, 4, 5, 3...
        
        yield value
        await asyncio.sleep(1)


✅ CORRECT: Track and enforce monotonic reads

async def poll_for_updates(key: str):
    """Poll for updates with monotonic guarantee."""
    min_version = 0
    
    while True:
        # Find replica with at least our version
        for replica in replicas:
            result = await replica.read_versioned(key)
            if result.version >= min_version:
                min_version = result.version
                yield result.value
                break
        
        await asyncio.sleep(1)

8.3 Mistake 3: Using Strong Consistency Everywhere

❌ WRONG: Strong consistency for everything

async def get_product_page(product_id: str):
    # ALL reads from primary - unnecessary!
    product = await primary.fetch("SELECT * FROM products WHERE id = $1", product_id)
    reviews = await primary.fetch("SELECT * FROM reviews WHERE product_id = $1", product_id)
    inventory = await primary.fetch("SELECT quantity FROM inventory WHERE product_id = $1", product_id)
    
    # This doesn't scale - primary is bottleneck
    return render_page(product, reviews, inventory)


✅ CORRECT: Appropriate consistency per operation

async def get_product_page(product_id: str):
    # Product details: Eventual (from cache/replica)
    product = await cache_or_replica("products", product_id)
    
    # Reviews: Eventual (stale reviews are fine)
    reviews = await cache_or_replica("reviews", product_id)
    
    # Inventory for DISPLAY: Eventual with short TTL
    inventory = await cache.get(f"inventory:{product_id}", ttl=30)
    
    return render_page(product, reviews, inventory)


async def reserve_inventory(product_id: str, quantity: int):
    # Inventory for RESERVATION: Strong consistency
    async with primary.transaction(isolation="SERIALIZABLE"):
        # This is where strong consistency matters
        ...

8.4 Mistake Checklist

  • Don't assume replication is instant — Always consider replication lag
  • Don't read from replica after write — Use read-your-writes pattern
  • Don't use random replica selection — May break monotonic reads
  • Don't use strong consistency everywhere — It doesn't scale
  • Don't forget about sessions — Users have expectations about their data
  • Do choose consistency per operation — Different data has different needs

Part IV: Interview Preparation

Chapter 9: Interview Tips

9.1 Key Phrases

INTRODUCING CONSISTENCY REQUIREMENTS:

"Before I design this, I need to understand the consistency 
requirements. For the product catalog browsing, eventual 
consistency is fine - we can serve from cache. But for 
inventory checks at checkout, we need strong consistency 
to prevent overselling."


EXPLAINING THE TRADE-OFF:

"There's a fundamental trade-off here. Strong consistency 
means coordinating across replicas, which adds latency and 
reduces availability during partitions. Eventual consistency 
is faster and more available, but clients may see stale data. 
For this use case, I'd choose [X] because [reason]."


DISCUSSING CAP:

"CAP theorem tells us that during a network partition, we 
must choose between consistency and availability. For a 
banking system, I'd choose consistency - it's better to 
reject a transaction than process it incorrectly. For a 
shopping cart, I'd choose availability - a slightly stale 
cart is better than an error page."


SESSION CONSISTENCY:

"Users have a mental model that their actions are 
immediately visible to them. If I update my profile 
and refresh, I expect to see the update. This is 
read-your-writes consistency - not full strong 
consistency, but essential for user experience."

9.2 Common Questions

Question Good Answer
"What consistency model would you use for a social media feed?" "Causal consistency. Comments must appear after their parent posts, but concurrent posts from different users can appear in any order. This preserves the user experience without requiring strong consistency."
"How do you handle inventory for an e-commerce site?" "Two different consistency levels. For display, eventual consistency from cache is fine. For reservation at checkout, strong consistency with SELECT FOR UPDATE to prevent overselling."
"What's the difference between eventual and strong consistency?" "With strong consistency, once a write completes, all subsequent reads see that write. With eventual, a write may complete but readers might still see old data for some time. The 'eventually' has no upper bound."
"When would you choose CP over AP?" "When incorrect data is worse than unavailability. Financial transactions, inventory that can oversell, unique constraint enforcement. These systems should reject requests rather than process them wrong."

Chapter 10: Practice Problems

Problem 1: Airline Seat Selection

Setup: Design the seat selection system for an airline booking site. Multiple users may try to select the same seat simultaneously.

Requirements:

  • Show available seats on a plane map
  • Allow users to select and reserve seats
  • Prevent double-booking of the same seat

Questions:

  1. What consistency level for displaying available seats?
  2. What consistency level for reserving a seat?
  3. How would you handle the race condition when two users click the same seat?

Displaying seats: Eventual consistency acceptable. Cache the seat map with short TTL (10s). Slightly stale availability is OK for display.

Reserving seats: Strong consistency required. Use optimistic locking:

-- Attempt reservation
UPDATE seats 
SET status = 'reserved', reserved_by = $user_id, version = version + 1
WHERE flight_id = $flight_id AND seat_number = $seat 
AND status = 'available' AND version = $expected_version

-- If 0 rows affected, seat was taken

Race condition: Only one UPDATE succeeds (due to version check). Loser gets "seat already taken" message. Refresh the seat map to show updated availability.

Problem 2: Collaborative Document Editing

Setup: Design a Google Docs-like collaborative editor. Multiple users edit the same document simultaneously.

Requirements:

  • Real-time updates visible to all editors
  • Works offline (changes sync when online)
  • No edits should be lost

Questions:

  1. What consistency model is appropriate?
  2. How do you handle conflicting edits?
  3. What happens when an offline user comes back online?

Consistency model: Causal consistency with conflict resolution. Edits that causally depend on each other (e.g., edit then delete) must be ordered. Concurrent edits are merged.

Conflicting edits: Use Operational Transformation (OT) or CRDTs. These automatically merge concurrent operations without losing edits.

Offline sync:

  1. Track all local edits with vector clock
  2. On reconnect, sync local edits with server
  3. Server merges using OT/CRDT
  4. Return merged result to all clients

This is covered more in Day 4 (Conflict Resolution).


Chapter 11: Sample Interview Dialogue

Scenario: Design a Shopping Cart

Interviewer: "Design a shopping cart for an e-commerce site."

You: "Before I dive in, I'd like to clarify the consistency requirements. When a user adds an item to their cart, do they need to see it immediately? What happens if they're on multiple devices?"

Interviewer: "Yes, they should see their own changes immediately. Multiple devices should eventually sync."

You: "Got it. So I need read-your-writes consistency for the user's own cart, but eventual consistency across devices is acceptable. Let me think about the access patterns..."

You sketch on the whiteboard:

Cart Consistency Requirements:

SAME SESSION:
├── Add item to cart
├── View cart immediately
└── Must see the item I just added (read-your-writes)

CROSS-DEVICE:
├── Add item on phone
├── Open laptop 5 seconds later
└── Eventually see the item (eventual consistency OK)

CHECKOUT:
├── Need accurate cart contents
├── Need current prices
└── Strong consistency required

You: "For the data model, I'd store the cart in a primary database with read replicas. For the same session, I'd route reads to the primary after writes to guarantee read-your-writes. For cross-device, eventual consistency is fine - replicas will catch up within seconds."

Interviewer: "What if the user adds an item on their phone while offline?"

You: "Good question. For offline support, I'd store the cart locally on the device. When they come back online, I'd sync the local changes to the server. If there are conflicts - like they added the same item on both devices - I'd merge by taking the maximum quantity. This is an additive merge that never loses items."

Interviewer: "What about checkout? How do you ensure consistency?"

You: "At checkout, I switch to strong consistency. I'd read the cart from the primary database, verify inventory for each item with SELECT FOR UPDATE to prevent race conditions, and lock prices at that moment. The cart might have been slightly stale during browsing, but at checkout I need the ground truth."


Summary

DAY 1 KEY TAKEAWAYS

CONSISTENCY MODELS (Strongest to Weakest):

1. LINEARIZABLE
   └── Single-copy behavior, real-time ordering
   └── Use for: Transactions, locks, inventory

2. SEQUENTIAL
   └── Global order agreed by all nodes
   └── Use for: Logs, message queues

3. CAUSAL
   └── Cause always before effect
   └── Use for: Social media, collaboration

4. READ-YOUR-WRITES
   └── See your own updates
   └── Use for: User profiles, settings

5. MONOTONIC READS
   └── Never go backwards
   └── Use for: Feeds, counters

6. EVENTUAL
   └── Converges eventually
   └── Use for: Caching, analytics


CAP THEOREM IN PRACTICE:
  • Network partitions WILL happen
  • During partition: Choose C or A
  • Different operations can choose differently
  • Strong consistency for critical operations
  • Eventual consistency for performance


IMPLEMENTATION PATTERNS:
  • Track write versions for read-your-writes
  • Track read versions for monotonic reads
  • Route to primary for strong consistency
  • Use replicas for eventual consistency


DEFAULT APPROACH:
  Display/Browse: Eventual consistency (cache)
  User's own data: Read-your-writes
  Critical operations: Strong consistency

Part V: Advanced Patterns

Chapter 12: Quorum-Based Consistency

12.1 Understanding Quorums

QUORUM BASICS

In a system with N replicas:
  W = Number of nodes that must acknowledge a write
  R = Number of nodes that must respond to a read

For STRONG consistency: W + R > N
  This ensures read and write sets overlap
  At least one node has the latest value

Example with N=3:
  W=2, R=2: Strong consistency (2+2 > 3)
  W=1, R=1: Eventual consistency (1+1 < 3)
  W=3, R=1: Write-heavy strong consistency
  W=1, R=3: Read-heavy strong consistency

12.2 Quorum Implementation

# Quorum-Based Consistency Implementation

from dataclasses import dataclass
from typing import List, Optional, Any, Tuple
import asyncio


@dataclass
class QuorumConfig:
    """Configuration for quorum-based consistency."""
    n: int  # Total replicas
    w: int  # Write quorum
    r: int  # Read quorum
    
    def is_strong(self) -> bool:
        """Check if config provides strong consistency."""
        return self.w + self.r > self.n
    
    def __post_init__(self):
        if self.w > self.n or self.r > self.n:
            raise ValueError("Quorum cannot exceed replica count")


@dataclass
class VersionedValue:
    """Value with version for conflict detection."""
    value: Any
    version: int
    timestamp: float


class QuorumStore:
    """
    Distributed store with quorum-based consistency.
    
    Provides tunable consistency through W and R parameters.
    """
    
    def __init__(self, replicas: List, config: QuorumConfig):
        self.replicas = replicas
        self.config = config
    
    async def write(self, key: str, value: Any) -> bool:
        """
        Write to W replicas (quorum write).
        
        Returns True if quorum was achieved.
        """
        timestamp = time.time()
        
        # Get current max version
        current = await self._read_from_quorum(key)
        new_version = (current.version + 1) if current else 1
        
        versioned = VersionedValue(
            value=value,
            version=new_version,
            timestamp=timestamp
        )
        
        # Write to all replicas concurrently
        tasks = [
            self._write_to_replica(replica, key, versioned)
            for replica in self.replicas
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Count successful writes
        successes = sum(1 for r in results if r is True)
        
        if successes >= self.config.w:
            return True
        else:
            # Quorum not achieved - write failed
            # Could attempt rollback here
            return False
    
    async def read(self, key: str) -> Optional[Any]:
        """
        Read from R replicas (quorum read).
        
        Returns the value with highest version.
        """
        result = await self._read_from_quorum(key)
        return result.value if result else None
    
    async def _read_from_quorum(self, key: str) -> Optional[VersionedValue]:
        """Read from R replicas and return highest version."""
        tasks = [
            self._read_from_replica(replica, key)
            for replica in self.replicas
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Filter successful reads
        values = [r for r in results if isinstance(r, VersionedValue)]
        
        if len(values) < self.config.r:
            raise Exception(f"Read quorum not achieved: {len(values)}/{self.config.r}")
        
        # Return highest version (read repair could update stale replicas)
        if not values:
            return None
        
        latest = max(values, key=lambda v: (v.version, v.timestamp))
        
        # Optional: Read repair - update stale replicas
        await self._read_repair(key, latest, values)
        
        return latest
    
    async def _read_repair(
        self,
        key: str,
        latest: VersionedValue,
        all_values: List[VersionedValue]
    ):
        """Update replicas that have stale values."""
        stale_replicas = []
        
        for i, value in enumerate(all_values):
            if value.version < latest.version:
                stale_replicas.append(self.replicas[i])
        
        if stale_replicas:
            # Repair stale replicas asynchronously
            asyncio.create_task(
                self._repair_replicas(key, latest, stale_replicas)
            )
    
    async def _repair_replicas(
        self,
        key: str,
        value: VersionedValue,
        replicas: List
    ):
        """Repair stale replicas with latest value."""
        tasks = [
            self._write_to_replica(replica, key, value)
            for replica in replicas
        ]
        await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _write_to_replica(
        self,
        replica,
        key: str,
        value: VersionedValue
    ) -> bool:
        """Write to a single replica."""
        try:
            await replica.set(key, value)
            return True
        except Exception as e:
            logger.warning(f"Write to replica failed: {e}")
            return False
    
    async def _read_from_replica(
        self,
        replica,
        key: str
    ) -> Optional[VersionedValue]:
        """Read from a single replica."""
        try:
            return await replica.get(key)
        except Exception as e:
            logger.warning(f"Read from replica failed: {e}")
            raise


# Usage examples
async def demo_quorum_consistency():
    # Strong consistency: W=2, R=2 with N=3
    strong_config = QuorumConfig(n=3, w=2, r=2)
    strong_store = QuorumStore(replicas, strong_config)
    
    # Write requires 2/3 replicas
    await strong_store.write("key", "value")
    
    # Read requires 2/3 replicas - guaranteed to see latest write
    value = await strong_store.read("key")
    
    
    # Eventual consistency: W=1, R=1 with N=3
    eventual_config = QuorumConfig(n=3, w=1, r=1)
    eventual_store = QuorumStore(replicas, eventual_config)
    
    # Fast writes and reads, but may return stale data
    await eventual_store.write("key", "value")
    value = await eventual_store.read("key")  # Might be stale!

12.3 Sloppy Quorums and Hinted Handoff

# Sloppy Quorums for High Availability

class SloppyQuorumStore(QuorumStore):
    """
    Quorum store with sloppy quorums and hinted handoff.
    
    When a replica is down, writes go to a "hint" node instead.
    When the original replica recovers, hints are handed off.
    
    This improves availability at the cost of consistency guarantees.
    """
    
    def __init__(self, replicas: List, config: QuorumConfig, all_nodes: List):
        super().__init__(replicas, config)
        self.all_nodes = all_nodes  # Including non-replica nodes
        self.hints: dict = {}  # node_id -> list of hints
    
    async def write_sloppy(self, key: str, value: Any) -> bool:
        """
        Write with sloppy quorum.
        
        If a replica is down, write to any available node as a hint.
        """
        timestamp = time.time()
        current = await self._read_from_quorum(key)
        new_version = (current.version + 1) if current else 1
        
        versioned = VersionedValue(
            value=value,
            version=new_version,
            timestamp=timestamp
        )
        
        successes = 0
        hints_stored = []
        
        # Try preferred replicas first
        for replica in self.replicas:
            try:
                await self._write_to_replica(replica, key, versioned)
                successes += 1
            except Exception:
                # Replica is down, find a substitute
                substitute = await self._find_substitute(replica)
                if substitute:
                    await self._store_hint(substitute, replica.id, key, versioned)
                    successes += 1
                    hints_stored.append((substitute, replica.id))
        
        return successes >= self.config.w
    
    async def _find_substitute(self, failed_replica) -> Optional:
        """Find an available node to store hint."""
        for node in self.all_nodes:
            if node.id != failed_replica.id and await node.is_healthy():
                return node
        return None
    
    async def _store_hint(
        self,
        node,
        target_replica_id: str,
        key: str,
        value: VersionedValue
    ):
        """Store a hinted value for later handoff."""
        hint = {
            "target": target_replica_id,
            "key": key,
            "value": value
        }
        
        # Store hint on the substitute node
        await node.store_hint(hint)
    
    async def handoff_hints(self, recovered_replica):
        """
        Hand off hints to a recovered replica.
        
        Called when a replica comes back online.
        """
        for node in self.all_nodes:
            hints = await node.get_hints_for(recovered_replica.id)
            
            for hint in hints:
                try:
                    await self._write_to_replica(
                        recovered_replica,
                        hint["key"],
                        hint["value"]
                    )
                    await node.delete_hint(hint)
                except Exception as e:
                    logger.warning(f"Hint handoff failed: {e}")

Chapter 13: Consistency in Practice — Database Isolation Levels

13.1 SQL Isolation Levels Explained

DATABASE ISOLATION LEVELS

PostgreSQL/MySQL provide these isolation levels:

READ UNCOMMITTED
├── Can see uncommitted changes from other transactions
├── "Dirty reads" possible
├── Rarely used in practice
└── Fastest, least consistent

READ COMMITTED (PostgreSQL default)
├── Only sees committed changes
├── No dirty reads
├── Non-repeatable reads possible (value changes between reads)
└── Good balance for most applications

REPEATABLE READ (MySQL InnoDB default)
├── Snapshot at start of transaction
├── Same query returns same results within transaction
├── Phantom reads possible in some databases
└── Good for reports, analytics

SERIALIZABLE
├── Transactions appear to execute one at a time
├── No anomalies possible
├── Performance impact (locking or abort/retry)
└── Required for financial transactions, inventory

13.2 Choosing Isolation Levels

# Isolation Level Selection by Use Case

class TransactionManager:
    """
    Manager for database transactions with appropriate isolation.
    """
    
    def __init__(self, db_pool):
        self.db = db_pool
    
    async def browse_products(self, category_id: str) -> List[dict]:
        """
        Browsing products - READ COMMITTED is fine.
        
        We don't need repeatable reads for browsing.
        Seeing slightly different results on refresh is OK.
        """
        async with self.db.transaction(isolation="READ COMMITTED"):
            return await self.db.fetch(
                "SELECT * FROM products WHERE category_id = $1",
                category_id
            )
    
    async def generate_report(self, start_date: str, end_date: str) -> dict:
        """
        Generating a report - REPEATABLE READ.
        
        Report should be internally consistent.
        Same query should return same results throughout.
        """
        async with self.db.transaction(isolation="REPEATABLE READ"):
            total_sales = await self.db.fetch_val(
                "SELECT SUM(amount) FROM orders WHERE date BETWEEN $1 AND $2",
                start_date, end_date
            )
            
            top_products = await self.db.fetch(
                """
                SELECT product_id, SUM(quantity) as total
                FROM order_items oi
                JOIN orders o ON oi.order_id = o.id
                WHERE o.date BETWEEN $1 AND $2
                GROUP BY product_id
                ORDER BY total DESC
                LIMIT 10
                """,
                start_date, end_date
            )
            
            # Both queries see the same snapshot
            return {"total": total_sales, "top_products": top_products}
    
    async def reserve_inventory(
        self,
        product_id: str,
        quantity: int
    ) -> bool:
        """
        Reserving inventory - SERIALIZABLE.
        
        Must prevent race conditions that cause overselling.
        """
        async with self.db.transaction(isolation="SERIALIZABLE"):
            # This sees a consistent snapshot AND prevents concurrent modifications
            current = await self.db.fetch_one(
                "SELECT quantity FROM inventory WHERE product_id = $1",
                product_id
            )
            
            if current['quantity'] < quantity:
                return False
            
            await self.db.execute(
                "UPDATE inventory SET quantity = quantity - $1 WHERE product_id = $2",
                quantity, product_id
            )
            
            return True
    
    async def transfer_funds(
        self,
        from_account: str,
        to_account: str,
        amount: float
    ) -> bool:
        """
        Money transfer - SERIALIZABLE with explicit locks.
        
        Double protection: isolation level + explicit locks.
        """
        async with self.db.transaction(isolation="SERIALIZABLE"):
            # Lock both accounts in consistent order to prevent deadlock
            accounts = sorted([from_account, to_account])
            
            balances = await self.db.fetch(
                """
                SELECT account_id, balance FROM accounts 
                WHERE account_id = ANY($1)
                ORDER BY account_id
                FOR UPDATE
                """,
                accounts
            )
            
            balance_map = {row['account_id']: row['balance'] for row in balances}
            
            if balance_map[from_account] < amount:
                raise InsufficientFundsError()
            
            await self.db.execute(
                "UPDATE accounts SET balance = balance - $1 WHERE account_id = $2",
                amount, from_account
            )
            
            await self.db.execute(
                "UPDATE accounts SET balance = balance + $1 WHERE account_id = $2",
                amount, to_account
            )
            
            return True

13.3 Optimistic vs Pessimistic Locking

# Optimistic vs Pessimistic Locking Comparison

class LockingPatterns:
    """
    Comparison of locking strategies for consistency.
    """
    
    # =========================================================================
    # Pessimistic Locking: Lock first, then read/write
    # =========================================================================
    
    async def pessimistic_update(self, product_id: str, new_price: float):
        """
        Pessimistic locking with SELECT FOR UPDATE.
        
        Use when:
        - High contention expected
        - Short transaction duration
        - Can tolerate lock waiting
        """
        async with self.db.transaction():
            # Lock the row immediately
            product = await self.db.fetch_one(
                """
                SELECT * FROM products 
                WHERE id = $1 
                FOR UPDATE  -- Acquires exclusive lock
                """,
                product_id
            )
            
            if not product:
                raise NotFoundError()
            
            # Other transactions wait here until we commit
            await self.db.execute(
                "UPDATE products SET price = $1 WHERE id = $2",
                new_price, product_id
            )
    
    async def pessimistic_with_nowait(self, product_id: str, new_price: float):
        """
        Pessimistic locking with NOWAIT - fail fast if locked.
        
        Use when:
        - Can't afford to wait
        - Will retry or fail gracefully
        """
        async with self.db.transaction():
            try:
                product = await self.db.fetch_one(
                    """
                    SELECT * FROM products 
                    WHERE id = $1 
                    FOR UPDATE NOWAIT  -- Fails immediately if locked
                    """,
                    product_id
                )
            except LockNotAvailableError:
                raise ConflictError("Product is being modified")
            
            await self.db.execute(
                "UPDATE products SET price = $1 WHERE id = $2",
                new_price, product_id
            )
    
    # =========================================================================
    # Optimistic Locking: Read version, check at write time
    # =========================================================================
    
    async def optimistic_update(
        self,
        product_id: str,
        new_price: float,
        expected_version: int
    ):
        """
        Optimistic locking with version check.
        
        Use when:
        - Low contention expected
        - Long-running operations (user editing form)
        - Can retry on conflict
        """
        result = await self.db.execute(
            """
            UPDATE products 
            SET price = $1, version = version + 1
            WHERE id = $2 AND version = $3
            """,
            new_price, product_id, expected_version
        )
        
        if result.rowcount == 0:
            # Either not found or version mismatch
            current = await self.db.fetch_one(
                "SELECT version FROM products WHERE id = $1",
                product_id
            )
            
            if current is None:
                raise NotFoundError()
            else:
                raise OptimisticLockError(
                    f"Version mismatch: expected {expected_version}, "
                    f"got {current['version']}"
                )
    
    async def optimistic_with_retry(
        self,
        product_id: str,
        update_fn: Callable,
        max_retries: int = 3
    ):
        """
        Optimistic locking with automatic retry.
        
        Retries on conflict with exponential backoff.
        """
        for attempt in range(max_retries):
            # Read current state
            product = await self.db.fetch_one(
                "SELECT * FROM products WHERE id = $1",
                product_id
            )
            
            if not product:
                raise NotFoundError()
            
            # Apply update function
            updated = update_fn(product)
            
            # Try to save with version check
            result = await self.db.execute(
                """
                UPDATE products 
                SET price = $1, name = $2, version = version + 1
                WHERE id = $3 AND version = $4
                """,
                updated['price'], updated['name'],
                product_id, product['version']
            )
            
            if result.rowcount > 0:
                return updated  # Success!
            
            # Conflict - retry with backoff
            await asyncio.sleep(0.1 * (2 ** attempt))
        
        raise OptimisticLockError(f"Failed after {max_retries} retries")

📚 Further Reading

Papers

  • "Linearizability: A Correctness Condition for Concurrent Objects" by Herlihy & Wing
  • "Causal Memory: Definitions, Implementation, and Programming" by Ahamad et al.
  • "Eventually Consistent" by Werner Vogels (Amazon CTO)

Books

  • "Designing Data-Intensive Applications" by Martin Kleppmann — Chapters 5, 9
  • "Database Internals" by Alex Petrov — Consistency chapters

Engineering Blogs


End of Day 1: Consistency Models

Tomorrow: Day 2 — Distributed Transactions: The Saga Pattern. We'll learn how to maintain consistency across multiple services without distributed locks, using compensation instead of rollback.