Week 5 — Day 4: Conflict Resolution
System Design Mastery Series
Preface
Yesterday, we learned about workflow orchestration with Temporal — how to coordinate complex distributed transactions with durable execution. But there's a scenario that even the best orchestration can't handle:
THE OFFLINE PROBLEM
User opens shopping cart on phone (offline):
- Adds item A
- Adds item B
- Removes item C
Simultaneously, user opens cart on laptop:
- Removes item A
- Adds item D
- Changes quantity of item C to 5
Both devices come online and sync.
Question: What should the cart contain?
Option 1: Last-write-wins
→ Phone synced last, so phone's version wins
→ Lost: item D, quantity change
→ User is confused and frustrated
Option 2: Laptop-write-wins
→ Lost: item B
→ User is confused and frustrated
Option 3: Merge somehow?
→ How do you merge "remove C" with "change C quantity to 5"?
→ This is the hard problem.
This is the conflict resolution problem — when concurrent updates happen to the same data, how do we reconcile them?
Today, you'll learn:
- Why last-write-wins loses data
- Vector clocks for detecting conflicts
- CRDTs for automatic conflict resolution
- When to use each approach
Part I: Foundations
Chapter 1: The Conflict Problem
1.1 Why Conflicts Happen
CAUSES OF CONFLICTS
1. NETWORK PARTITIONS
┌─────────────┐ ┌─────────────┐
│ Node A │ PARTITION │ Node B │
│ X = 1 │◄──────────────────▶│ X = 2 │
└─────────────┘ └─────────────┘
Both nodes accept writes during partition.
When partition heals: X = 1 or X = 2?
2. CONCURRENT CLIENTS
Client 1: Read X=0, Write X=1 (at T=0.001)
Client 2: Read X=0, Write X=2 (at T=0.002)
Both read the same value, both write.
Result depends on arrival order.
3. OFFLINE OPERATION
Mobile app works offline.
User makes changes.
Syncs when online.
Server has different changes from other devices.
4. REPLICATION LAG
Write to primary: X=1
Async replication to replica (delayed)
Read from replica: X=0 (stale)
Write based on stale read: X=0+1=1 (lost update)
1.2 The Lost Update Problem
LOST UPDATE EXAMPLE
Initial: Counter = 100
User A reads counter: 100
User B reads counter: 100
User A increments: writes 101
User B increments: writes 101
Final value: 101
Expected value: 102
One increment was LOST.
This happens because:
- Both users read the same initial value
- Both computed new value independently
- Last write overwrote the first
IN A DISTRIBUTED SYSTEM:
Node 1 Node 2
─────── ───────
Cart: [A, B] Cart: [A, B]
User adds C User adds D
Cart: [A, B, C] Cart: [A, B, D]
────── Sync ──────
Result with LWW:
If Node 2 synced last: [A, B, D]
Lost: item C
1.3 Conflict Resolution Strategies
CONFLICT RESOLUTION STRATEGIES
1. LAST-WRITE-WINS (LWW)
├── Simple: Timestamp decides winner
├── Problem: Loses data
├── Use when: Data loss is acceptable
└── Example: Cache updates, status fields
2. FIRST-WRITE-WINS
├── First write is preserved
├── Subsequent writes rejected
├── Use when: Immutability matters
└── Example: ID generation, audit logs
3. APPLICATION-LEVEL MERGE
├── Application defines merge logic
├── Most flexible, most complex
├── Use when: Domain-specific rules exist
└── Example: Git merge, document editing
4. USER RESOLUTION
├── Show conflict to user
├── User chooses winner
├── Use when: User input is available
└── Example: File sync (Dropbox), contacts
5. AUTOMATIC MERGE (CRDTs)
├── Data structures that auto-merge
├── No conflicts by design
├── Use when: Offline-first, collaboration
└── Example: Shopping cart, collaborative text
Chapter 2: Last-Write-Wins (LWW)
2.1 How LWW Works
LAST-WRITE-WINS MECHANISM
Each write includes a timestamp.
On conflict, highest timestamp wins.
Write 1: {value: "A", timestamp: 1000}
Write 2: {value: "B", timestamp: 1001}
Conflict resolution: "B" wins (higher timestamp)
IMPLEMENTATION:
┌────────────────────────────────────────────────────────────────────────┐
│ LWW REGISTER │
│ │
│ State: {value: V, timestamp: T} │
│ │
│ Write(new_value, new_timestamp): │
│ if new_timestamp > state.timestamp: │
│ state = {value: new_value, timestamp: new_timestamp} │
│ │
│ Merge(other_state): │
│ if other_state.timestamp > state.timestamp: │
│ state = other_state │
│ │
└────────────────────────────────────────────────────────────────────────┘
2.2 Problems with LWW
LWW PROBLEMS
1. CLOCK SKEW
├── Different machines have different clocks
├── Write with "earlier" timestamp might actually be later
├── NTP helps but doesn't guarantee synchronization
└── Can lose the "actually latest" write
2. DATA LOSS
├── Concurrent writes = one is discarded
├── No merge, just overwrite
├── Users lose work
└── Acceptable only when updates are idempotent
3. TIE-BREAKING
├── What if timestamps are equal?
├── Need secondary comparison (node ID, random)
├── Still arbitrary, can lose data
└── Millisecond precision often insufficient
4. CAUSALITY VIOLATIONS
├── LWW doesn't track causality
├── Can't tell if writes are concurrent or sequential
├── May overwrite with "older" logical state
└── Confusing user experience
WHEN LWW IS ACCEPTABLE:
✓ Single-value updates (not increments)
✓ Idempotent operations
✓ Data can be regenerated
✓ User doesn't care about lost concurrent updates
Examples:
- User's current location (latest is best)
- Profile picture (latest is best)
- Cache entries (can re-fetch)
- "Last seen" timestamp
2.3 LWW Implementation
# Last-Write-Wins Register Implementation
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional, Tuple
import time
@dataclass
class LWWValue:
"""Value with timestamp for LWW."""
value: Any
timestamp: float # Unix timestamp with microseconds
node_id: str # For tie-breaking
def __lt__(self, other: 'LWWValue') -> bool:
"""Compare for ordering (used in conflict resolution)."""
if self.timestamp != other.timestamp:
return self.timestamp < other.timestamp
# Tie-break by node ID (deterministic)
return self.node_id < other.node_id
class LWWRegister:
"""
Last-Write-Wins Register.
Simple conflict resolution: latest timestamp wins.
"""
def __init__(self, node_id: str):
self.node_id = node_id
self.state: Optional[LWWValue] = None
def get(self) -> Optional[Any]:
"""Get current value."""
return self.state.value if self.state else None
def set(self, value: Any) -> LWWValue:
"""Set value with current timestamp."""
new_state = LWWValue(
value=value,
timestamp=time.time(),
node_id=self.node_id
)
if self.state is None or self.state < new_state:
self.state = new_state
return new_state
def merge(self, other: LWWValue):
"""Merge with value from another node."""
if self.state is None or self.state < other:
self.state = other
def get_state(self) -> Optional[LWWValue]:
"""Get full state for replication."""
return self.state
# Usage example
def demo_lww():
# Two nodes with same data
node_a = LWWRegister("node-a")
node_b = LWWRegister("node-b")
# Concurrent writes
state_a = node_a.set("value from A")
time.sleep(0.001) # Tiny delay
state_b = node_b.set("value from B")
# Sync
node_a.merge(state_b)
node_b.merge(state_a)
# Both nodes now have same value (B wins due to later timestamp)
print(node_a.get()) # "value from B"
print(node_b.get()) # "value from B"
Chapter 3: Vector Clocks
3.1 What Are Vector Clocks?
Vector clocks track causality — they tell us whether one event happened before another, or if events are concurrent.
VECTOR CLOCKS EXPLAINED
A vector clock is a map: {node_id: counter}
Each node maintains its own counter.
When a node does something:
1. Increment its own counter
2. Include full vector clock with message
When receiving a message:
1. Merge vector clocks (take max of each counter)
2. Increment own counter
EXAMPLE:
Initial state:
Node A: {A: 0, B: 0, C: 0}
Node B: {A: 0, B: 0, C: 0}
Node C: {A: 0, B: 0, C: 0}
A writes X=1:
A's clock: {A: 1, B: 0, C: 0}
A sends to B:
B receives {A: 1, B: 0, C: 0}
B merges: max of each = {A: 1, B: 0, C: 0}
B increments: {A: 1, B: 1, C: 0}
Meanwhile, C writes X=2 (concurrent!):
C's clock: {A: 0, B: 0, C: 1}
Now we can detect:
A's write {A: 1, B: 0, C: 0}
C's write {A: 0, B: 0, C: 1}
Neither is "before" the other — they're CONCURRENT.
We have a CONFLICT that needs resolution.
3.2 Comparing Vector Clocks
VECTOR CLOCK COMPARISON
Given two vector clocks V1 and V2:
V1 < V2 (V1 happened before V2):
∀i: V1[i] ≤ V2[i] AND ∃j: V1[j] < V2[j]
V1 = V2 (Same event):
∀i: V1[i] = V2[i]
V1 || V2 (Concurrent):
Neither V1 < V2 nor V2 < V1
(Some V1[i] > V2[i] AND some V1[j] < V2[j])
EXAMPLES:
{A:1, B:0} < {A:1, B:1} → First happened before second
{A:2, B:1} < {A:3, B:2} → First happened before second
{A:1, B:2} || {A:2, B:1} → Concurrent (conflict!)
{A:1, B:1} = {A:1, B:1} → Same
WHY THIS MATTERS:
If V1 < V2:
The write with V2 supersedes V1.
No conflict — just apply V2.
If V1 || V2:
Concurrent writes — CONFLICT!
Need resolution strategy.
3.3 Vector Clock Implementation
# Vector Clock Implementation
from dataclasses import dataclass, field
from typing import Dict, Any, Optional, Tuple
from enum import Enum
class ClockComparison(Enum):
BEFORE = "before" # V1 < V2
AFTER = "after" # V1 > V2
EQUAL = "equal" # V1 = V2
CONCURRENT = "concurrent" # V1 || V2
@dataclass
class VectorClock:
"""
Vector clock for tracking causality.
"""
clocks: Dict[str, int] = field(default_factory=dict)
def increment(self, node_id: str) -> 'VectorClock':
"""Increment this node's clock."""
new_clocks = self.clocks.copy()
new_clocks[node_id] = new_clocks.get(node_id, 0) + 1
return VectorClock(clocks=new_clocks)
def merge(self, other: 'VectorClock') -> 'VectorClock':
"""Merge with another vector clock (take max of each)."""
all_nodes = set(self.clocks.keys()) | set(other.clocks.keys())
new_clocks = {
node: max(
self.clocks.get(node, 0),
other.clocks.get(node, 0)
)
for node in all_nodes
}
return VectorClock(clocks=new_clocks)
def compare(self, other: 'VectorClock') -> ClockComparison:
"""Compare with another vector clock."""
all_nodes = set(self.clocks.keys()) | set(other.clocks.keys())
self_greater = False
other_greater = False
for node in all_nodes:
self_val = self.clocks.get(node, 0)
other_val = other.clocks.get(node, 0)
if self_val > other_val:
self_greater = True
elif self_val < other_val:
other_greater = True
if self_greater and other_greater:
return ClockComparison.CONCURRENT
elif self_greater:
return ClockComparison.AFTER
elif other_greater:
return ClockComparison.BEFORE
else:
return ClockComparison.EQUAL
def copy(self) -> 'VectorClock':
"""Create a copy."""
return VectorClock(clocks=self.clocks.copy())
@dataclass
class VersionedValue:
"""Value with vector clock for conflict detection."""
value: Any
clock: VectorClock
class VectorClockStore:
"""
Key-value store with vector clock conflict detection.
"""
def __init__(self, node_id: str):
self.node_id = node_id
self.data: Dict[str, list[VersionedValue]] = {} # Key -> list of versions
def get(self, key: str) -> list[VersionedValue]:
"""
Get all versions of a key.
If there are conflicts, returns multiple versions.
Application must resolve.
"""
return self.data.get(key, [])
def put(
self,
key: str,
value: Any,
context: Optional[VectorClock] = None
) -> VectorClock:
"""
Put a value, optionally based on a previous read.
Args:
key: Key to write
value: New value
context: Vector clock from previous read (for conflict detection)
Returns:
New vector clock for this write
"""
# Get current versions
current_versions = self.data.get(key, [])
# Determine new clock
if context:
# Based on previous read - increment context
new_clock = context.increment(self.node_id)
else:
# Fresh write - merge all current clocks and increment
merged = VectorClock()
for version in current_versions:
merged = merged.merge(version.clock)
new_clock = merged.increment(self.node_id)
new_version = VersionedValue(value=value, clock=new_clock)
# Remove versions that this write supersedes
remaining = []
for version in current_versions:
comparison = new_clock.compare(version.clock)
if comparison == ClockComparison.AFTER:
# New write supersedes this version - discard
pass
elif comparison == ClockComparison.CONCURRENT:
# Conflict - keep both
remaining.append(version)
elif comparison == ClockComparison.BEFORE:
# This shouldn't happen if context was used correctly
remaining.append(version)
remaining.append(new_version)
self.data[key] = remaining
return new_clock
def merge_remote(self, key: str, remote_version: VersionedValue):
"""Merge a version received from another node."""
current_versions = self.data.get(key, [])
# Check if remote version supersedes, is superseded, or conflicts
remaining = []
dominated = False
for version in current_versions:
comparison = remote_version.clock.compare(version.clock)
if comparison == ClockComparison.AFTER:
# Remote supersedes local - discard local
pass
elif comparison == ClockComparison.BEFORE:
# Local supersedes remote - keep local, ignore remote
remaining.append(version)
dominated = True
elif comparison == ClockComparison.CONCURRENT:
# Conflict - keep both
remaining.append(version)
else: # EQUAL
# Same version - keep one
remaining.append(version)
dominated = True
if not dominated:
remaining.append(remote_version)
self.data[key] = remaining
# Example: Shopping cart with conflict detection
class ConflictAwareCart:
"""
Shopping cart that detects conflicts.
When conflicts exist, shows user both versions.
"""
def __init__(self, store: VectorClockStore, user_id: str):
self.store = store
self.user_id = user_id
self.last_read_clock: Optional[VectorClock] = None
def get_cart(self) -> Tuple[list[dict], bool]:
"""
Get cart contents.
Returns:
(items, has_conflicts)
"""
versions = self.store.get(f"cart:{self.user_id}")
if len(versions) == 0:
return [], False
if len(versions) == 1:
self.last_read_clock = versions[0].clock
return versions[0].value, False
# Multiple versions = conflict!
# For now, just return the first one and flag conflict
self.last_read_clock = versions[0].clock
return versions[0].value, True
def get_all_versions(self) -> list[list[dict]]:
"""Get all conflicting versions for user resolution."""
versions = self.store.get(f"cart:{self.user_id}")
return [v.value for v in versions]
def update_cart(self, items: list[dict]):
"""Update cart based on last read."""
self.last_read_clock = self.store.put(
f"cart:{self.user_id}",
items,
context=self.last_read_clock
)
def resolve_conflict(self, chosen_items: list[dict]):
"""Resolve conflict by choosing/merging versions."""
# Get all current versions
versions = self.store.get(f"cart:{self.user_id}")
# Merge all clocks
merged_clock = VectorClock()
for version in versions:
merged_clock = merged_clock.merge(version.clock)
# Write resolution with merged clock
self.last_read_clock = self.store.put(
f"cart:{self.user_id}",
chosen_items,
context=merged_clock
)
Chapter 4: CRDTs (Conflict-free Replicated Data Types)
4.1 What Are CRDTs?
CRDTs are data structures that can be replicated across nodes, updated independently, and automatically merged without conflicts.
CRDT PRINCIPLE
A CRDT guarantees:
1. Any two replicas can be merged
2. Merge is commutative: merge(A, B) = merge(B, A)
3. Merge is associative: merge(merge(A, B), C) = merge(A, merge(B, C))
4. Merge is idempotent: merge(A, A) = A
Result: Order of merges doesn't matter!
All replicas eventually converge to same state.
TWO TYPES OF CRDTs:
1. STATE-BASED (CvRDT)
├── Ship entire state between nodes
├── Merge by combining states
└── Simpler, but more bandwidth
2. OPERATION-BASED (CmRDT)
├── Ship operations between nodes
├── Apply operations to reach same state
└── Less bandwidth, but needs reliable delivery
4.2 Common CRDT Types
COMMON CRDTS
1. G-COUNTER (Grow-only Counter)
├── Each node has its own counter
├── Can only increment
├── Value = sum of all node counters
└── Use: View counts, likes
2. PN-COUNTER (Positive-Negative Counter)
├── Two G-Counters: one for + one for -
├── Value = positive - negative
└── Use: Up/down votes, inventory
3. LWW-REGISTER (Last-Writer-Wins Register)
├── Single value with timestamp
├── Highest timestamp wins
└── Use: Simple fields, status
4. MV-REGISTER (Multi-Value Register)
├── Keeps all concurrent values
├── Application resolves
└── Use: When concurrent values matter
5. G-SET (Grow-only Set)
├── Can only add elements
├── Merge = union
└── Use: Tags, followers
6. OR-SET (Observed-Remove Set)
├── Can add and remove elements
├── Add wins over concurrent remove
└── Use: Shopping cart, todo list
7. LWW-MAP (Last-Writer-Wins Map)
├── Map of key → LWW-Register
└── Use: User profiles, settings
8. RGA (Replicated Growable Array)
├── Ordered list with insert/delete
├── Used for collaborative text
└── Use: Google Docs-style editing
4.3 CRDT Implementations
# CRDT Implementations
from dataclasses import dataclass, field
from typing import Dict, Set, Any, Optional, Tuple
import time
import uuid
# =============================================================================
# G-Counter (Grow-only Counter)
# =============================================================================
class GCounter:
"""
Grow-only counter CRDT.
Each node can only increment its own counter.
Value is sum of all counters.
"""
def __init__(self, node_id: str):
self.node_id = node_id
self.counts: Dict[str, int] = {}
def increment(self, amount: int = 1):
"""Increment this node's counter."""
if amount < 0:
raise ValueError("G-Counter can only increment")
self.counts[self.node_id] = self.counts.get(self.node_id, 0) + amount
def value(self) -> int:
"""Get total count across all nodes."""
return sum(self.counts.values())
def merge(self, other: 'GCounter'):
"""Merge with another G-Counter."""
all_nodes = set(self.counts.keys()) | set(other.counts.keys())
self.counts = {
node: max(
self.counts.get(node, 0),
other.counts.get(node, 0)
)
for node in all_nodes
}
def get_state(self) -> Dict[str, int]:
"""Get state for replication."""
return self.counts.copy()
@classmethod
def from_state(cls, node_id: str, state: Dict[str, int]) -> 'GCounter':
"""Create from replicated state."""
counter = cls(node_id)
counter.counts = state.copy()
return counter
# =============================================================================
# PN-Counter (Positive-Negative Counter)
# =============================================================================
class PNCounter:
"""
Positive-Negative counter CRDT.
Supports both increment and decrement.
Uses two G-Counters internally.
"""
def __init__(self, node_id: str):
self.positive = GCounter(node_id)
self.negative = GCounter(node_id)
def increment(self, amount: int = 1):
"""Increment counter."""
self.positive.increment(amount)
def decrement(self, amount: int = 1):
"""Decrement counter."""
self.negative.increment(amount)
def value(self) -> int:
"""Get current value (positive - negative)."""
return self.positive.value() - self.negative.value()
def merge(self, other: 'PNCounter'):
"""Merge with another PN-Counter."""
self.positive.merge(other.positive)
self.negative.merge(other.negative)
# =============================================================================
# G-Set (Grow-only Set)
# =============================================================================
class GSet:
"""
Grow-only set CRDT.
Elements can only be added, never removed.
Merge is union.
"""
def __init__(self):
self.elements: Set[Any] = set()
def add(self, element: Any):
"""Add element to set."""
self.elements.add(element)
def contains(self, element: Any) -> bool:
"""Check if element is in set."""
return element in self.elements
def value(self) -> Set[Any]:
"""Get all elements."""
return self.elements.copy()
def merge(self, other: 'GSet'):
"""Merge with another G-Set (union)."""
self.elements = self.elements | other.elements
# =============================================================================
# OR-Set (Observed-Remove Set) - Add-Wins
# =============================================================================
@dataclass
class ORSetElement:
"""Element in OR-Set with unique tags."""
value: Any
tags: Set[str] = field(default_factory=set)
class ORSet:
"""
Observed-Remove Set CRDT.
Supports both add and remove.
Add wins over concurrent remove.
Each add creates a unique tag.
Remove deletes all known tags.
If any tag survives, element is present.
"""
def __init__(self, node_id: str):
self.node_id = node_id
# Map from value to set of (tag, tombstoned) pairs
self.elements: Dict[Any, Dict[str, bool]] = {}
def add(self, value: Any):
"""Add element with new unique tag."""
tag = f"{self.node_id}:{uuid.uuid4()}"
if value not in self.elements:
self.elements[value] = {}
self.elements[value][tag] = False # Not tombstoned
def remove(self, value: Any):
"""Remove element by tombstoning all known tags."""
if value in self.elements:
for tag in self.elements[value]:
self.elements[value][tag] = True # Tombstone
def contains(self, value: Any) -> bool:
"""Check if element is in set (any non-tombstoned tag)."""
if value not in self.elements:
return False
return any(
not tombstoned
for tombstoned in self.elements[value].values()
)
def value(self) -> Set[Any]:
"""Get all non-removed elements."""
return {
v for v in self.elements
if self.contains(v)
}
def merge(self, other: 'ORSet'):
"""Merge with another OR-Set."""
all_values = set(self.elements.keys()) | set(other.elements.keys())
for value in all_values:
self_tags = self.elements.get(value, {})
other_tags = other.elements.get(value, {})
# Merge tags
merged_tags = {}
all_tag_ids = set(self_tags.keys()) | set(other_tags.keys())
for tag in all_tag_ids:
# Tag is tombstoned if tombstoned in either
self_tomb = self_tags.get(tag, True) # Missing = already removed
other_tomb = other_tags.get(tag, True)
# If tag exists in one but not other:
# - If not tombstoned in the one that has it, keep it
# - If tombstoned in the one that has it, it's removed
if tag in self_tags and tag in other_tags:
merged_tags[tag] = self_tomb or other_tomb
elif tag in self_tags:
merged_tags[tag] = self_tomb
else:
merged_tags[tag] = other_tomb
if merged_tags:
self.elements[value] = merged_tags
# =============================================================================
# LWW-Map (Last-Writer-Wins Map)
# =============================================================================
@dataclass
class LWWEntry:
"""Entry in LWW-Map."""
value: Any
timestamp: float
deleted: bool = False
class LWWMap:
"""
Last-Writer-Wins Map CRDT.
Each key is a LWW-Register.
Supports add, update, and delete.
"""
def __init__(self, node_id: str):
self.node_id = node_id
self.entries: Dict[str, LWWEntry] = {}
def set(self, key: str, value: Any):
"""Set a key-value pair."""
timestamp = time.time()
current = self.entries.get(key)
if current is None or current.timestamp < timestamp:
self.entries[key] = LWWEntry(
value=value,
timestamp=timestamp,
deleted=False
)
def delete(self, key: str):
"""Delete a key."""
timestamp = time.time()
current = self.entries.get(key)
if current is None or current.timestamp < timestamp:
self.entries[key] = LWWEntry(
value=None,
timestamp=timestamp,
deleted=True
)
def get(self, key: str) -> Optional[Any]:
"""Get value for key."""
entry = self.entries.get(key)
if entry is None or entry.deleted:
return None
return entry.value
def keys(self) -> Set[str]:
"""Get all non-deleted keys."""
return {
k for k, v in self.entries.items()
if not v.deleted
}
def to_dict(self) -> Dict[str, Any]:
"""Get as regular dict."""
return {
k: v.value
for k, v in self.entries.items()
if not v.deleted
}
def merge(self, other: 'LWWMap'):
"""Merge with another LWW-Map."""
all_keys = set(self.entries.keys()) | set(other.entries.keys())
for key in all_keys:
self_entry = self.entries.get(key)
other_entry = other.entries.get(key)
if self_entry is None:
self.entries[key] = other_entry
elif other_entry is None:
pass # Keep self
elif other_entry.timestamp > self_entry.timestamp:
self.entries[key] = other_entry
# If timestamps equal, keep self (arbitrary but deterministic)
# =============================================================================
# Shopping Cart CRDT (OR-Set based)
# =============================================================================
@dataclass
class CartItem:
"""Item in shopping cart."""
product_id: str
quantity: int
def __hash__(self):
return hash(self.product_id)
def __eq__(self, other):
if isinstance(other, CartItem):
return self.product_id == other.product_id
return False
class ShoppingCartCRDT:
"""
Shopping cart using OR-Set + PN-Counter.
OR-Set for items (add/remove)
PN-Counter for quantities
"""
def __init__(self, node_id: str):
self.node_id = node_id
self.items = ORSet(node_id) # Tracks which items are in cart
self.quantities: Dict[str, PNCounter] = {} # Quantity per item
def add_item(self, product_id: str, quantity: int = 1):
"""Add item to cart."""
self.items.add(product_id)
if product_id not in self.quantities:
self.quantities[product_id] = PNCounter(self.node_id)
self.quantities[product_id].increment(quantity)
def remove_item(self, product_id: str):
"""Remove item from cart entirely."""
self.items.remove(product_id)
def update_quantity(self, product_id: str, delta: int):
"""Change quantity (positive or negative)."""
if product_id not in self.quantities:
self.quantities[product_id] = PNCounter(self.node_id)
if delta > 0:
self.quantities[product_id].increment(delta)
else:
self.quantities[product_id].decrement(abs(delta))
def get_items(self) -> Dict[str, int]:
"""Get cart contents."""
result = {}
for product_id in self.items.value():
if product_id in self.quantities:
qty = self.quantities[product_id].value()
if qty > 0:
result[product_id] = qty
return result
def merge(self, other: 'ShoppingCartCRDT'):
"""Merge with another cart."""
# Merge item sets
self.items.merge(other.items)
# Merge quantity counters
all_products = set(self.quantities.keys()) | set(other.quantities.keys())
for product_id in all_products:
if product_id not in self.quantities:
self.quantities[product_id] = PNCounter(self.node_id)
if product_id in other.quantities:
self.quantities[product_id].merge(other.quantities[product_id])
# Usage example
def demo_cart_crdt():
# Two devices, same user
phone = ShoppingCartCRDT("phone")
laptop = ShoppingCartCRDT("laptop")
# Phone adds items
phone.add_item("apple", 3)
phone.add_item("banana", 2)
# Laptop adds different items (concurrently, offline)
laptop.add_item("orange", 5)
laptop.add_item("apple", 1) # Also adds apple!
# Sync
phone.merge(laptop)
laptop.merge(phone)
# Both devices have same cart
print(phone.get_items())
# {'apple': 4, 'banana': 2, 'orange': 5}
# Apple quantity = 3 + 1 = 4 (merged!)
print(laptop.get_items())
# {'apple': 4, 'banana': 2, 'orange': 5}
Part II: Production Implementation
Chapter 5: Choosing the Right Approach
# Decision Framework for Conflict Resolution
from enum import Enum
from dataclasses import dataclass
from typing import Any, Callable, Optional
class ConflictStrategy(Enum):
LAST_WRITE_WINS = "lww"
VECTOR_CLOCK = "vector_clock"
CRDT = "crdt"
APPLICATION_MERGE = "app_merge"
USER_RESOLUTION = "user_resolution"
@dataclass
class DataCharacteristics:
"""Characteristics of data to determine conflict strategy."""
# Can data be regenerated if lost?
regenerable: bool
# Is latest value always the "correct" value?
latest_is_correct: bool
# Can concurrent updates be automatically merged?
mergeable: bool
# Is user available to resolve conflicts?
user_available: bool
# Is data structure simple (register) or complex (set, counter)?
is_simple_value: bool
# Does order of operations matter?
order_dependent: bool
def choose_strategy(characteristics: DataCharacteristics) -> ConflictStrategy:
"""
Choose conflict resolution strategy based on data characteristics.
"""
# If latest is correct and data is simple, LWW is fine
if characteristics.latest_is_correct and characteristics.is_simple_value:
return ConflictStrategy.LAST_WRITE_WINS
# If data is naturally mergeable (counters, sets), use CRDT
if characteristics.mergeable and not characteristics.order_dependent:
return ConflictStrategy.CRDT
# If user is available and data is important, ask them
if characteristics.user_available and not characteristics.regenerable:
return ConflictStrategy.USER_RESOLUTION
# If order matters or complex merge logic needed
if characteristics.order_dependent:
return ConflictStrategy.VECTOR_CLOCK
# Default to application-level merge
return ConflictStrategy.APPLICATION_MERGE
# Example usage
STRATEGY_MAPPING = {
"user_location": choose_strategy(DataCharacteristics(
regenerable=True,
latest_is_correct=True,
mergeable=False,
user_available=False,
is_simple_value=True,
order_dependent=False
)), # LWW
"shopping_cart": choose_strategy(DataCharacteristics(
regenerable=False,
latest_is_correct=False,
mergeable=True,
user_available=False,
is_simple_value=False,
order_dependent=False
)), # CRDT
"document": choose_strategy(DataCharacteristics(
regenerable=False,
latest_is_correct=False,
mergeable=True,
user_available=True,
is_simple_value=False,
order_dependent=True
)), # VECTOR_CLOCK or USER_RESOLUTION
"view_count": choose_strategy(DataCharacteristics(
regenerable=True,
latest_is_correct=False,
mergeable=True,
user_available=False,
is_simple_value=True,
order_dependent=False
)), # CRDT (G-Counter)
}
Chapter 6: Real-World CRDT Systems
# Production CRDT System
import asyncio
from dataclasses import dataclass, field
from typing import Dict, Any, Set, Optional, List
from datetime import datetime
import json
@dataclass
class ReplicationMessage:
"""Message for CRDT state replication."""
source_node: str
timestamp: datetime
crdt_type: str
key: str
state: Any
class CRDTReplicationManager:
"""
Manages CRDT replication across nodes.
Handles:
- State-based replication
- Gossip protocol
- Anti-entropy
"""
def __init__(
self,
node_id: str,
peer_nodes: List[str],
message_queue, # Kafka, RabbitMQ, etc.
):
self.node_id = node_id
self.peer_nodes = peer_nodes
self.mq = message_queue
# Local CRDT stores
self.counters: Dict[str, PNCounter] = {}
self.sets: Dict[str, ORSet] = {}
self.maps: Dict[str, LWWMap] = {}
# Track what we've sent to avoid redundant replication
self.last_sent: Dict[str, datetime] = {}
async def start(self):
"""Start replication manager."""
# Start background tasks
asyncio.create_task(self._receive_loop())
asyncio.create_task(self._anti_entropy_loop())
# =========================================================================
# Local Operations
# =========================================================================
def get_counter(self, key: str) -> PNCounter:
"""Get or create counter."""
if key not in self.counters:
self.counters[key] = PNCounter(self.node_id)
return self.counters[key]
def increment_counter(self, key: str, amount: int = 1):
"""Increment counter and trigger replication."""
counter = self.get_counter(key)
counter.increment(amount)
asyncio.create_task(self._replicate_counter(key))
def get_set(self, key: str) -> ORSet:
"""Get or create OR-Set."""
if key not in self.sets:
self.sets[key] = ORSet(self.node_id)
return self.sets[key]
def add_to_set(self, key: str, value: Any):
"""Add to set and trigger replication."""
s = self.get_set(key)
s.add(value)
asyncio.create_task(self._replicate_set(key))
def remove_from_set(self, key: str, value: Any):
"""Remove from set and trigger replication."""
s = self.get_set(key)
s.remove(value)
asyncio.create_task(self._replicate_set(key))
# =========================================================================
# Replication
# =========================================================================
async def _replicate_counter(self, key: str):
"""Replicate counter state to peers."""
counter = self.counters[key]
message = ReplicationMessage(
source_node=self.node_id,
timestamp=datetime.utcnow(),
crdt_type="pn_counter",
key=key,
state={
"positive": counter.positive.get_state(),
"negative": counter.negative.get_state()
}
)
await self.mq.publish("crdt_replication", json.dumps(message.__dict__, default=str))
async def _replicate_set(self, key: str):
"""Replicate set state to peers."""
s = self.sets[key]
message = ReplicationMessage(
source_node=self.node_id,
timestamp=datetime.utcnow(),
crdt_type="or_set",
key=key,
state=self._serialize_or_set(s)
)
await self.mq.publish("crdt_replication", json.dumps(message.__dict__, default=str))
async def _receive_loop(self):
"""Receive and apply replication messages."""
async for raw_message in self.mq.subscribe("crdt_replication"):
try:
data = json.loads(raw_message)
# Skip our own messages
if data["source_node"] == self.node_id:
continue
await self._apply_replication(data)
except Exception as e:
logger.error(f"Failed to apply replication: {e}")
async def _apply_replication(self, data: dict):
"""Apply a replication message."""
crdt_type = data["crdt_type"]
key = data["key"]
state = data["state"]
if crdt_type == "pn_counter":
counter = self.get_counter(key)
# Create remote counter from state
remote = PNCounter(data["source_node"])
remote.positive.counts = state["positive"]
remote.negative.counts = state["negative"]
# Merge
counter.merge(remote)
elif crdt_type == "or_set":
s = self.get_set(key)
remote = self._deserialize_or_set(state, data["source_node"])
s.merge(remote)
async def _anti_entropy_loop(self):
"""
Periodically sync full state with random peer.
Catches any missed updates.
"""
while True:
await asyncio.sleep(60) # Every minute
# Pick random peer
import random
peer = random.choice(self.peer_nodes)
# Request full state from peer
await self._sync_with_peer(peer)
async def _sync_with_peer(self, peer: str):
"""Full state sync with a peer."""
# In practice, this would be a direct RPC call
# For now, just broadcast our full state
for key, counter in self.counters.items():
await self._replicate_counter(key)
for key, s in self.sets.items():
await self._replicate_set(key)
def _serialize_or_set(self, s: ORSet) -> dict:
"""Serialize OR-Set for transmission."""
return {
str(k): {tag: tomb for tag, tomb in v.items()}
for k, v in s.elements.items()
}
def _deserialize_or_set(self, data: dict, node_id: str) -> ORSet:
"""Deserialize OR-Set from transmission."""
s = ORSet(node_id)
for k, tags in data.items():
s.elements[k] = {tag: tomb for tag, tomb in tags.items()}
return s
Part III: Real-World Application
Chapter 7: Case Studies
7.1 Case Study: Riak (Dynamo-style)
RIAK CONFLICT RESOLUTION
Riak is a distributed key-value store using:
├── Vector clocks for conflict detection
├── Sibling values for concurrent updates
└── Application-level or automatic resolution
HOW IT WORKS:
1. Write with context (vector clock from read)
2. If context matches, write succeeds
3. If concurrent write detected, create "sibling"
4. On read, return all siblings for resolution
CONFIGURATION OPTIONS:
allow_mult: true
→ Keep siblings, application resolves
allow_mult: false
→ LWW, last write wins
BUILT-IN CRDT SUPPORT:
├── Counters (PN-Counter)
├── Sets (OR-Set)
├── Maps (nested CRDTs)
├── Flags (boolean)
└── Registers (LWW)
Example: Riak Set
# Add to set
riak.update_set("bucket", "shopping-cart",
add=["item-1", "item-2"])
# Remove from set
riak.update_set("bucket", "shopping-cart",
remove=["item-1"])
# Automatic merge across replicas
7.2 Case Study: Google Docs (OT/CRDT)
GOOGLE DOCS COLLABORATIVE EDITING
Google Docs uses Operational Transformation (OT),
but CRDTs are becoming more common (Yjs, Automerge).
OPERATIONAL TRANSFORMATION:
Client A: Insert "X" at position 5
Client B: Insert "Y" at position 3
Without transformation:
A applies B's op: Insert "Y" at 3 → "abcYdefghij"
B applies A's op: Insert "X" at 5 → "abcYdXefghij"
Result: Different documents!
With transformation:
A sees B inserted at 3 (before position 5)
A transforms its op: Insert "X" at position 6
B sees A inserted at 5 (after position 3)
B's op stays: Insert "Y" at position 3
Both get: "abcYdeXfghij"
CRDT ALTERNATIVE (RGA - Replicated Growable Array):
Each character has unique ID: (node_id, sequence)
Insert references the character it comes after.
Deletes are tombstones.
Advantages:
├── No central server needed
├── Works offline
├── Guaranteed convergence
└── Simpler than OT
Used by: Yjs, Automerge, xi-editor
7.3 Case Study: Redis CRDTs
REDIS ENTERPRISE CRDTs
Redis Enterprise supports CRDTs for active-active replication:
SUPPORTED TYPES:
1. CRDT Strings (LWW)
SET key value
→ Last write wins across datacenters
2. CRDT Counters
INCRBY key amount
→ Merged across datacenters (no lost increments)
3. CRDT Sets
SADD key member
SREM key member
→ Add-wins semantics
4. CRDT Sorted Sets
ZADD key score member
→ LWW per member, add-wins for membership
5. CRDT Hashes
HSET key field value
→ LWW per field
CONFLICT RESOLUTION RULES:
├── Strings: LWW based on wall-clock + vector clock
├── Counters: Sum of all increments
├── Sets: Union (add-wins)
├── Sorted Sets: LWW for score, add-wins for membership
└── Hashes: LWW per field
EXAMPLE: Global Session Store
# Datacenter US
HSET session:user123 last_seen "2024-01-15T10:00:00Z"
HSET session:user123 cart_count 5
# Datacenter EU (concurrent)
HSET session:user123 last_seen "2024-01-15T10:00:01Z"
HSET session:user123 preferences "dark_mode"
# After sync, both have:
{
last_seen: "2024-01-15T10:00:01Z", # LWW
cart_count: 5,
preferences: "dark_mode"
}
Chapter 8: Common Mistakes
8.1 Mistake 1: Using LWW When Order Matters
❌ WRONG: LWW for sequential operations
# Counter implemented with LWW
counter_value = 0
# User A reads: 0
# User B reads: 0
# User A writes: 1 (timestamp: 1000)
# User B writes: 1 (timestamp: 1001)
# Result: 1 (should be 2!)
✅ CORRECT: Use CRDT counter
counter = PNCounter("node-1")
# User A increments
counter.increment()
# User B increments (concurrent)
other_counter = PNCounter("node-2")
other_counter.increment()
# Merge
counter.merge(other_counter)
# Result: 2 ✓
8.2 Mistake 2: Not Handling Concurrent Removes
❌ WRONG: Simple set with concurrent add/remove
# Initial: {A, B}
# Node 1: Remove B
set1 = {A}
# Node 2 (concurrent): Add C
set2 = {A, B, C}
# Merge with union: {A, B, C}
# But Node 1 wanted B removed!
✅ CORRECT: Use OR-Set
# Initial: {A, B}
orset = ORSet("node-1")
orset.add("A")
orset.add("B")
# Node 1: Remove B (tombstones B's tags)
orset.remove("B")
# Node 2 (concurrent): Add C (new tag)
orset2 = ORSet("node-2")
orset2.merge(orset) # Has A, B
orset2.add("C")
# Merge
orset.merge(orset2)
# Result: {A, C}
# B's tags were tombstoned, C has new tag
8.3 Mistake 3: Vector Clock Explosion
❌ WRONG: Creating new vector clock entry per client
# Every client gets its own entry
{
"client-1": 1,
"client-2": 1,
"client-3": 1,
...
"client-1000000": 1
}
# Vector clock becomes huge!
# Memory explosion, slow comparisons
✅ CORRECT: Use node-based clocks, not client-based
# Clients write through servers
# Vector clock has entry per SERVER, not per CLIENT
{
"server-us-east": 1000,
"server-us-west": 999,
"server-eu": 1001
}
# Fixed number of entries (number of servers)
8.4 Mistake Checklist
- Don't use LWW for additive operations — Use counters, sets
- Don't use simple sets — Use OR-Set for add/remove
- Don't create unbounded vector clocks — Limit to server nodes
- Don't forget tombstone garbage collection — Clean up old tombstones
- Do consider semantics — Add-wins vs remove-wins
- Do test with concurrent operations — Simulate network partitions
Part IV: Interview Preparation
Chapter 9: Interview Tips
9.1 Key Phrases
INTRODUCING CONFLICT RESOLUTION:
"When we have concurrent updates to the same data, we need
a conflict resolution strategy. The choice depends on the
data semantics — is latest always correct? Can updates be
merged? Is user intervention possible?"
ON LWW:
"Last-write-wins is the simplest approach — timestamp decides.
It works when the latest value is the 'right' value, like a
user's current location. But it loses data when concurrent
updates both matter, like a shopping cart."
ON VECTOR CLOCKS:
"Vector clocks track causality — they tell us if one update
happened before another, or if they're concurrent. If concurrent,
we know we have a conflict that needs resolution. The application
can then merge or ask the user."
ON CRDTs:
"CRDTs are data structures designed to merge automatically.
A counter CRDT sums increments from all nodes. A set CRDT
takes the union. They're perfect for collaborative apps
where you need offline support and automatic sync."
ON CHOOSING:
"For a shopping cart, I'd use an OR-Set CRDT. Items can be
added and removed, and concurrent adds should both appear.
For a simple status field, LWW is fine. For a document,
I'd use operational transformation or a sequence CRDT."
9.2 Common Questions
| Question | Good Answer |
|---|---|
| "How do you handle conflicts in a distributed shopping cart?" | "I'd use an OR-Set CRDT. Each item addition creates a unique tag. Removes tombstone existing tags. Concurrent adds from different devices both survive because they have different tags. This gives us add-wins semantics which matches user expectations." |
| "When would you use LWW vs CRDTs?" | "LWW when latest is correct (location, status) or data is regenerable (cache). CRDTs when concurrent updates should merge (counters, sets) or offline support is needed. Vector clocks when I need to detect conflicts for manual resolution." |
| "How do CRDTs guarantee convergence?" | "CRDTs have merge operations that are commutative, associative, and idempotent. This means the order of merges doesn't matter — all replicas reach the same state. G-Counter sums node counts, OR-Set unions with tags, etc." |
| "What's the difference between G-Set and OR-Set?" | "G-Set only allows adds — merge is union. OR-Set allows removes by using unique tags per add. When you remove, you tombstone the tag. Concurrent add of same item creates new tag, so it survives the remove. Add-wins semantics." |
Chapter 10: Practice Problems
Problem 1: Collaborative Whiteboard
Setup: Design conflict resolution for a collaborative whiteboard where users can add and move shapes.
Requirements:
- Users can add shapes (rectangle, circle)
- Users can move shapes
- Users can delete shapes
- Works offline
Questions:
- What CRDTs would you use?
- How do you handle concurrent moves of the same shape?
- How do you handle move + delete conflict?
CRDTs:
- Shape registry: OR-Set of shape IDs
- Shape properties: LWW-Map per shape (x, y, width, height, color)
Concurrent moves:
- Each shape position is LWW-Register
- Latest move wins (acceptable for whiteboard)
- Alternative: Average positions if needed
Move + delete:
- OR-Set handles this with add-wins
- If shape deleted, move is ignored (shape not in set)
- If move happens after delete, shape stays deleted
class WhiteboardCRDT:
def __init__(self, node_id):
self.shapes = ORSet(node_id) # Set of shape IDs
self.properties: Dict[str, LWWMap] = {} # Per-shape properties
def add_shape(self, shape_id, properties):
self.shapes.add(shape_id)
self.properties[shape_id] = LWWMap(self.node_id)
for k, v in properties.items():
self.properties[shape_id].set(k, v)
def move_shape(self, shape_id, x, y):
if shape_id in self.shapes.value():
self.properties[shape_id].set("x", x)
self.properties[shape_id].set("y", y)
def delete_shape(self, shape_id):
self.shapes.remove(shape_id)
Problem 2: Distributed Like Counter
Setup: Design a like counter for social media posts that works globally.
Requirements:
- Users can like and unlike
- Count should be accurate (no lost likes)
- Low latency reads from any region
Questions:
- What CRDT would you use?
- How do you handle like + unlike from same user?
- How do you handle displaying count efficiently?
CRDT:
- Use OR-Set for user IDs who liked
- Count = size of set
- Unlike removes user from set
Like + unlike:
- OR-Set handles this correctly
- Add wins over concurrent remove
- Last state wins for sequential like/unlike
Efficient display:
- Cache materialized count locally
- Update count incrementally on merge
- Don't recount set on every read
class LikeCounterCRDT:
def __init__(self, node_id, post_id):
self.likers = ORSet(node_id)
self.post_id = post_id
self._cached_count = 0
def like(self, user_id):
self.likers.add(user_id)
self._update_cache()
def unlike(self, user_id):
self.likers.remove(user_id)
self._update_cache()
def get_count(self) -> int:
return self._cached_count
def _update_cache(self):
self._cached_count = len(self.likers.value())
def merge(self, other):
self.likers.merge(other.likers)
self._update_cache()
Chapter 11: Sample Interview Dialogue
Scenario: Design Offline-First Note Taking App
Interviewer: "Design a note-taking app that works offline and syncs across devices."
You: "Before I design, let me understand the conflict scenarios. Users might edit the same note on different devices while offline. When they come online, we need to merge those edits."
Interviewer: "Yes, that's the main challenge."
You: "For conflict resolution, I have a few options. Let me think through them..."
You sketch on the whiteboard:
Conflict Resolution Options:
1. LWW (Last-Write-Wins)
- Simple but loses edits
- User adds paragraph on phone, laptop edit wins
- BAD for notes
2. Manual merge (show conflict)
- Show both versions, user picks
- Annoying for frequent conflicts
- OK as fallback
3. Automatic merge (CRDT)
- Text CRDT like RGA or Yjs
- Concurrent edits both appear
- BEST for collaborative editing
You: "I'd use a text CRDT like Yjs or Automerge. Each character has a unique ID based on node and sequence number. Inserts reference the character they come after. Deletes are tombstones."
Interviewer: "How does that handle concurrent edits?"
You: "Say I type 'Hello' on my phone while you type 'World' on the laptop, both at position 0. Each character gets a unique ID:
Phone: H(phone,1) e(phone,2) l(phone,3) l(phone,4) o(phone,5)
Laptop: W(laptop,1) o(laptop,2) r(laptop,3) l(laptop,4) d(laptop,5)
When we merge, the CRDT uses the IDs to determine order. Characters from different nodes are interleaved based on a deterministic ordering of IDs. Both edits appear — no data loss."
Interviewer: "What about the note structure itself, not just the text?"
You: "For the notes collection, I'd use an OR-Set of note IDs. Each note's metadata (title, tags, folder) uses an LWW-Map — last edit to each field wins. The note content uses the text CRDT.
class NotesCRDT:
def __init__(self, node_id):
self.notes = ORSet(node_id) # Set of note IDs
self.metadata: Dict[str, LWWMap] = {} # Per-note metadata
self.content: Dict[str, TextCRDT] = {} # Per-note content
This gives us:
- Add/delete notes: OR-Set semantics (add-wins)
- Metadata: LWW per field
- Content: Full text merge"
Interviewer: "What about conflicts in the same line of text?"
You: "The text CRDT handles this at the character level. If I insert 'A' after position 5 and you insert 'B' after position 5, both characters appear with deterministic ordering. It might look like 'AB' or 'BA' depending on our node IDs, but both edits are preserved.
For a note-taking app, this is usually fine. If users need more control, I could fall back to showing the conflict and letting them choose, but with text CRDTs that's rarely necessary."
Summary
DAY 4 KEY TAKEAWAYS
THE CONFLICT PROBLEM:
• Concurrent updates create conflicts
• Network partitions, offline operation, replication lag
• Must choose resolution strategy
RESOLUTION STRATEGIES:
1. LAST-WRITE-WINS (LWW)
├── Simple: Timestamp wins
├── Loses data
└── Use for: Location, status, cache
2. VECTOR CLOCKS
├── Detect concurrent vs causal
├── Application resolves
└── Use for: Conflict detection
3. CRDTs
├── Automatic merge
├── No conflicts by design
└── Use for: Collaborative, offline-first
CRDT TYPES:
• G-Counter: Grow-only counter (sum)
• PN-Counter: Positive-negative counter
• G-Set: Grow-only set (union)
• OR-Set: Observed-remove set (add-wins)
• LWW-Map: Last-writer-wins map
• RGA: Replicated sequence (text)
CHOOSING STRATEGY:
• Latest is correct → LWW
• Need to detect conflicts → Vector clocks
• Can auto-merge → CRDT
• Need user input → Manual resolution
DEFAULT APPROACH:
• Shopping cart: OR-Set CRDT
• Counters/votes: PN-Counter CRDT
• Simple fields: LWW
• Collaborative text: RGA/Yjs CRDT
• Complex merge: Vector clocks + application logic
Part V: Production Patterns
Chapter 12: Hybrid Approaches
12.1 Combining Strategies
# Hybrid Conflict Resolution System
from dataclasses import dataclass
from typing import Any, Dict, Optional, Callable, List
from enum import Enum
class ResolutionType(Enum):
LWW = "lww"
CRDT = "crdt"
VECTOR_CLOCK = "vector_clock"
CUSTOM = "custom"
@dataclass
class FieldConfig:
"""Configuration for how to resolve conflicts for a field."""
resolution_type: ResolutionType
crdt_type: Optional[str] = None # For CRDT fields
custom_resolver: Optional[Callable] = None # For custom fields
class HybridDocument:
"""
Document with different resolution strategies per field.
Example: User profile
- name: LWW (latest wins)
- email: LWW (latest wins)
- tags: OR-Set CRDT (union of tags)
- view_count: G-Counter CRDT (sum)
- bio: Custom merge (show conflict for long text)
"""
def __init__(self, node_id: str, field_configs: Dict[str, FieldConfig]):
self.node_id = node_id
self.configs = field_configs
# Storage for different types
self.lww_fields: Dict[str, LWWValue] = {}
self.crdt_fields: Dict[str, Any] = {} # Various CRDT types
self.vector_clock = VectorClock()
def get(self, field: str) -> Any:
"""Get field value."""
config = self.configs.get(field)
if not config:
raise ValueError(f"Unknown field: {field}")
if config.resolution_type == ResolutionType.LWW:
lww = self.lww_fields.get(field)
return lww.value if lww else None
elif config.resolution_type == ResolutionType.CRDT:
crdt = self.crdt_fields.get(field)
if crdt is None:
return self._default_crdt_value(config.crdt_type)
return self._get_crdt_value(crdt, config.crdt_type)
return None
def set(self, field: str, value: Any):
"""Set field value."""
config = self.configs.get(field)
if not config:
raise ValueError(f"Unknown field: {field}")
self.vector_clock = self.vector_clock.increment(self.node_id)
if config.resolution_type == ResolutionType.LWW:
self.lww_fields[field] = LWWValue(
value=value,
timestamp=time.time(),
node_id=self.node_id
)
elif config.resolution_type == ResolutionType.CRDT:
self._update_crdt(field, value, config.crdt_type)
def merge(self, other: 'HybridDocument'):
"""Merge with another document."""
# Merge vector clock
self.vector_clock = self.vector_clock.merge(other.vector_clock)
# Merge each field according to its config
for field, config in self.configs.items():
if config.resolution_type == ResolutionType.LWW:
self._merge_lww(field, other)
elif config.resolution_type == ResolutionType.CRDT:
self._merge_crdt(field, other, config.crdt_type)
elif config.resolution_type == ResolutionType.CUSTOM:
self._merge_custom(field, other, config.custom_resolver)
def _merge_lww(self, field: str, other: 'HybridDocument'):
"""Merge LWW field."""
self_val = self.lww_fields.get(field)
other_val = other.lww_fields.get(field)
if self_val is None:
self.lww_fields[field] = other_val
elif other_val is not None and other_val.timestamp > self_val.timestamp:
self.lww_fields[field] = other_val
def _merge_crdt(self, field: str, other: 'HybridDocument', crdt_type: str):
"""Merge CRDT field."""
self_crdt = self.crdt_fields.get(field)
other_crdt = other.crdt_fields.get(field)
if self_crdt is None:
self.crdt_fields[field] = other_crdt
elif other_crdt is not None:
self_crdt.merge(other_crdt)
def _update_crdt(self, field: str, value: Any, crdt_type: str):
"""Update CRDT field based on type."""
if field not in self.crdt_fields:
self.crdt_fields[field] = self._create_crdt(crdt_type)
crdt = self.crdt_fields[field]
if crdt_type == "g_counter":
crdt.increment(value)
elif crdt_type == "pn_counter":
if value >= 0:
crdt.increment(value)
else:
crdt.decrement(abs(value))
elif crdt_type == "or_set":
crdt.add(value)
elif crdt_type == "lww_map":
for k, v in value.items():
crdt.set(k, v)
def _create_crdt(self, crdt_type: str):
"""Create CRDT of specified type."""
if crdt_type == "g_counter":
return GCounter(self.node_id)
elif crdt_type == "pn_counter":
return PNCounter(self.node_id)
elif crdt_type == "or_set":
return ORSet(self.node_id)
elif crdt_type == "lww_map":
return LWWMap(self.node_id)
raise ValueError(f"Unknown CRDT type: {crdt_type}")
def _get_crdt_value(self, crdt, crdt_type: str):
"""Get value from CRDT."""
if crdt_type in ("g_counter", "pn_counter"):
return crdt.value()
elif crdt_type == "or_set":
return crdt.value()
elif crdt_type == "lww_map":
return crdt.to_dict()
return None
def _default_crdt_value(self, crdt_type: str):
"""Default value for CRDT type."""
if crdt_type in ("g_counter", "pn_counter"):
return 0
elif crdt_type == "or_set":
return set()
elif crdt_type == "lww_map":
return {}
return None
# Example: User Profile
def create_user_profile(node_id: str) -> HybridDocument:
"""Create user profile with hybrid conflict resolution."""
configs = {
"name": FieldConfig(ResolutionType.LWW),
"email": FieldConfig(ResolutionType.LWW),
"bio": FieldConfig(ResolutionType.LWW),
"tags": FieldConfig(ResolutionType.CRDT, crdt_type="or_set"),
"view_count": FieldConfig(ResolutionType.CRDT, crdt_type="g_counter"),
"settings": FieldConfig(ResolutionType.CRDT, crdt_type="lww_map"),
}
return HybridDocument(node_id, configs)
# Usage
profile_us = create_user_profile("us-east")
profile_eu = create_user_profile("eu-west")
# US updates name
profile_us.set("name", "John Doe")
profile_us.set("tags", "developer")
# EU updates (concurrent)
profile_eu.set("name", "John D.") # Different name
profile_eu.set("tags", "python") # Different tag
# Merge
profile_us.merge(profile_eu)
profile_eu.merge(profile_us)
# Result:
# name: "John D." (LWW - EU was later)
# tags: {"developer", "python"} (OR-Set - both tags kept)
12.2 Conflict Detection with Resolution Options
# Conflict Detection and Resolution Service
from dataclasses import dataclass
from typing import List, Optional, Callable
from enum import Enum
class ConflictType(Enum):
NO_CONFLICT = "no_conflict"
CONCURRENT_UPDATE = "concurrent_update"
DELETE_UPDATE = "delete_update"
SEMANTIC_CONFLICT = "semantic_conflict"
@dataclass
class Conflict:
"""Represents a detected conflict."""
type: ConflictType
field: str
local_value: Any
remote_value: Any
local_clock: VectorClock
remote_clock: VectorClock
suggested_resolution: Optional[Any] = None
class ConflictDetector:
"""
Detects and classifies conflicts for resolution.
"""
def __init__(self, semantic_rules: Dict[str, Callable] = None):
self.semantic_rules = semantic_rules or {}
def detect_conflicts(
self,
local: Dict[str, Any],
local_clock: VectorClock,
remote: Dict[str, Any],
remote_clock: VectorClock
) -> List[Conflict]:
"""Detect all conflicts between local and remote state."""
conflicts = []
# Check clock relationship
comparison = local_clock.compare(remote_clock)
if comparison == ClockComparison.BEFORE:
# Local is older - no conflict, just update
return []
if comparison == ClockComparison.AFTER:
# Local is newer - no conflict, remote should update
return []
if comparison == ClockComparison.EQUAL:
# Same version - no conflict
return []
# CONCURRENT - check each field
all_fields = set(local.keys()) | set(remote.keys())
for field in all_fields:
local_val = local.get(field)
remote_val = remote.get(field)
if local_val == remote_val:
continue
conflict = self._classify_conflict(
field, local_val, remote_val,
local_clock, remote_clock
)
if conflict.type != ConflictType.NO_CONFLICT:
conflicts.append(conflict)
return conflicts
def _classify_conflict(
self,
field: str,
local_val: Any,
remote_val: Any,
local_clock: VectorClock,
remote_clock: VectorClock
) -> Conflict:
"""Classify a field conflict."""
# Check for delete vs update
if local_val is None and remote_val is not None:
return Conflict(
type=ConflictType.DELETE_UPDATE,
field=field,
local_value=local_val,
remote_value=remote_val,
local_clock=local_clock,
remote_clock=remote_clock,
suggested_resolution=remote_val # Keep the value
)
if local_val is not None and remote_val is None:
return Conflict(
type=ConflictType.DELETE_UPDATE,
field=field,
local_value=local_val,
remote_value=remote_val,
local_clock=local_clock,
remote_clock=remote_clock,
suggested_resolution=local_val # Keep the value
)
# Check semantic rules
if field in self.semantic_rules:
rule = self.semantic_rules[field]
suggested = rule(local_val, remote_val)
return Conflict(
type=ConflictType.SEMANTIC_CONFLICT,
field=field,
local_value=local_val,
remote_value=remote_val,
local_clock=local_clock,
remote_clock=remote_clock,
suggested_resolution=suggested
)
# Default: concurrent update
return Conflict(
type=ConflictType.CONCURRENT_UPDATE,
field=field,
local_value=local_val,
remote_value=remote_val,
local_clock=local_clock,
remote_clock=remote_clock
)
class ConflictResolver:
"""
Resolves conflicts using various strategies.
"""
def __init__(self):
self.strategies: Dict[str, Callable] = {}
def register_strategy(self, field: str, strategy: Callable):
"""Register a resolution strategy for a field."""
self.strategies[field] = strategy
def resolve(self, conflicts: List[Conflict]) -> Dict[str, Any]:
"""Resolve all conflicts and return merged state."""
resolved = {}
unresolved = []
for conflict in conflicts:
if conflict.suggested_resolution is not None:
resolved[conflict.field] = conflict.suggested_resolution
elif conflict.field in self.strategies:
strategy = self.strategies[conflict.field]
resolved[conflict.field] = strategy(
conflict.local_value,
conflict.remote_value
)
else:
unresolved.append(conflict)
if unresolved:
raise UnresolvedConflictError(unresolved)
return resolved
def auto_resolve(
self,
conflicts: List[Conflict],
default_strategy: str = "lww"
) -> Dict[str, Any]:
"""Auto-resolve conflicts with default strategy."""
resolved = {}
for conflict in conflicts:
if conflict.suggested_resolution is not None:
resolved[conflict.field] = conflict.suggested_resolution
elif conflict.field in self.strategies:
strategy = self.strategies[conflict.field]
resolved[conflict.field] = strategy(
conflict.local_value,
conflict.remote_value
)
elif default_strategy == "lww":
# Use local (assuming local is "latest")
resolved[conflict.field] = conflict.local_value
elif default_strategy == "keep_remote":
resolved[conflict.field] = conflict.remote_value
elif default_strategy == "merge_arrays":
if isinstance(conflict.local_value, list) and isinstance(conflict.remote_value, list):
resolved[conflict.field] = list(set(conflict.local_value) | set(conflict.remote_value))
else:
resolved[conflict.field] = conflict.local_value
return resolved
# Example semantic rules
SEMANTIC_RULES = {
"quantity": lambda local, remote: max(local, remote), # Take larger quantity
"price": lambda local, remote: min(local, remote), # Take lower price
"updated_at": lambda local, remote: max(local, remote), # Latest timestamp
"tags": lambda local, remote: list(set(local) | set(remote)), # Union
}
Chapter 13: Tombstone Management
13.1 Garbage Collecting Tombstones
# Tombstone Garbage Collection
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Set, Optional
import asyncio
@dataclass
class TombstoneRecord:
"""Record of a tombstone for garbage collection."""
key: str
tag: str
deleted_at: datetime
deleted_by: str
synced_to: Set[str] # Nodes that have seen this tombstone
class TombstoneGC:
"""
Garbage collector for CRDT tombstones.
Problem: OR-Set tombstones accumulate forever
Solution: GC tombstones after all nodes have seen them
Requirements:
- All nodes must see tombstone before GC
- Need to track which nodes have synced
- Safe to GC after grace period if node is known dead
"""
def __init__(
self,
node_id: str,
all_nodes: Set[str],
grace_period: timedelta = timedelta(days=7)
):
self.node_id = node_id
self.all_nodes = all_nodes
self.grace_period = grace_period
self.tombstones: Dict[str, TombstoneRecord] = {}
def record_tombstone(self, key: str, tag: str):
"""Record a new tombstone."""
record = TombstoneRecord(
key=key,
tag=tag,
deleted_at=datetime.utcnow(),
deleted_by=self.node_id,
synced_to={self.node_id}
)
self.tombstones[f"{key}:{tag}"] = record
def mark_synced(self, key: str, tag: str, node_id: str):
"""Mark that a node has seen this tombstone."""
record_key = f"{key}:{tag}"
if record_key in self.tombstones:
self.tombstones[record_key].synced_to.add(node_id)
def get_collectable(self) -> list[TombstoneRecord]:
"""Get tombstones that can be garbage collected."""
collectable = []
now = datetime.utcnow()
for record in self.tombstones.values():
# Check if all nodes have seen it
if record.synced_to >= self.all_nodes:
collectable.append(record)
continue
# Check if grace period has passed (for dead nodes)
if now - record.deleted_at > self.grace_period:
collectable.append(record)
return collectable
async def run_gc(self, crdt: ORSet):
"""Run garbage collection on an OR-Set."""
collectable = self.get_collectable()
for record in collectable:
# Remove from CRDT internal state
if record.key in crdt.elements:
if record.tag in crdt.elements[record.key]:
del crdt.elements[record.key][record.tag]
# Clean up empty entries
if not crdt.elements[record.key]:
del crdt.elements[record.key]
# Remove from tombstone tracking
del self.tombstones[f"{record.key}:{record.tag}"]
async def gc_loop(self, crdt: ORSet, interval: int = 3600):
"""Periodic GC loop."""
while True:
await asyncio.sleep(interval)
await self.run_gc(crdt)
📚 Further Reading
Papers
- "A Comprehensive Study of CRDTs" by Shapiro et al.
- "Conflict-free Replicated Data Types" by Shapiro et al.
- "Dynamo: Amazon's Highly Available Key-value Store" — Vector clocks section
Libraries
- Yjs: https://yjs.dev/ — Text CRDT for collaborative editing
- Automerge: https://automerge.org/ — JSON CRDT library
- Riak: https://riak.com/ — Database with CRDT support
Books
- "Designing Data-Intensive Applications" by Martin Kleppmann — Chapter 5
Videos
- Martin Kleppmann CRDTs: Search "Martin Kleppmann CRDT" on YouTube
End of Day 4: Conflict Resolution
Tomorrow: Day 5 — Leader Election and Coordination. We'll learn about fencing tokens, split-brain prevention, and how ZooKeeper/etcd enable distributed coordination.