Week 5 — Day 5: Leader Election and Coordination
System Design Mastery Series
Preface
This week, we've built a comprehensive understanding of consistency and coordination:
- Day 1: Consistency models — how to reason about when reads see writes
- Day 2: Saga pattern — coordinating distributed transactions
- Day 3: Workflow orchestration — durable execution with Temporal
- Day 4: Conflict resolution — CRDTs and vector clocks for concurrent updates
But there's one critical pattern we haven't addressed:
THE LEADER PROBLEM
Many distributed systems need exactly ONE node to be "in charge":
├── Database primary (only one accepts writes)
├── Job scheduler (only one assigns work)
├── Distributed lock holder (only one accesses resource)
├── Kafka partition leader (only one serves reads/writes)
└── Cache invalidation coordinator (only one broadcasts)
The challenge:
Node A: "I am the leader!"
Node B: "No, I am the leader!"
Both act as leader → DATA CORRUPTION
This is called SPLIT-BRAIN.
Today, we'll learn how to elect leaders safely and prevent split-brain scenarios.
Part I: Foundations
Chapter 1: Why Leader Election Is Hard
1.1 The Split-Brain Problem
SPLIT-BRAIN SCENARIO
Initial state:
Node A is leader, Node B is follower
Both connected to each other
Network partition occurs:
┌─────────────┐ ┌─────────────┐
│ Node A │ ✗ │ Node B │
│ (leader) │◄───────▶│ (follower) │
└─────────────┘ └─────────────┘
Node B can't reach Node A:
- Is Node A dead? Or just network issue?
- If A is dead, B should become leader
- If A is alive, B becoming leader = SPLIT-BRAIN
Node B decides to become leader:
┌─────────────┐ ┌─────────────┐
│ Node A │ ✗ │ Node B │
│ (leader) │ │ (leader) │
└─────────────┘ └─────────────┘
BOTH nodes accept writes!
When partition heals:
- Two different versions of data
- Which is correct?
- DATA LOSS or CORRUPTION
1.2 Why Timeouts Aren't Enough
THE TIMEOUT TRAP
Naive approach: "If leader doesn't respond in 5 seconds, elect new leader"
Problems:
1. NETWORK DELAYS
├── Leader is alive but network is slow
├── Follower times out
├── Follower becomes new leader
└── Two leaders!
2. GARBAGE COLLECTION PAUSES
├── Leader's JVM does GC for 6 seconds
├── Followers time out
├── Elect new leader
├── Old leader comes back
└── Two leaders!
3. CLOCK SKEW
├── Leader's clock is slow
├── Its "lease" hasn't expired (by its clock)
├── Followers' clocks show lease expired
├── Elect new leader
└── Two leaders!
4. PROCESS PAUSES
├── Leader gets swapped out by OS
├── Followers time out
├── Elect new leader
├── Leader gets scheduled again
└── Two leaders!
1.3 Requirements for Safe Leader Election
SAFE LEADER ELECTION REQUIREMENTS
1. SAFETY (no split-brain)
└── At most ONE leader at any time
2. LIVENESS (progress)
└── Eventually a leader is elected
3. LEADER DETECTION
└── All nodes agree on who the leader is
4. LEADER FAILURE HANDLING
└── New leader elected when old one fails
5. NETWORK PARTITION HANDLING
└── System behaves safely during partitions
FUNDAMENTAL TRADEOFF (FLP Impossibility):
In an asynchronous system with potential failures,
you CANNOT guarantee both:
- Safety (no split-brain)
- Liveness (always make progress)
In practice: Favor safety, accept temporary unavailability
Chapter 2: Fencing Tokens
2.1 The Fencing Token Pattern
Fencing tokens solve the "zombie leader" problem — when an old leader doesn't know it's been replaced.
FENCING TOKEN MECHANISM
Each leader election generates a MONOTONICALLY INCREASING token.
Election 1: Leader A gets token 1
Election 2: Leader B gets token 2
Election 3: Leader C gets token 3
Resources (databases, services) track the highest token seen.
They REJECT requests with lower tokens.
EXAMPLE: Preventing Zombie Writes
Timeline:
T=0: Leader A elected with token=1
T=1: Leader A sends write with token=1 → ACCEPTED
T=2: Leader A has GC pause (appears dead)
T=3: Leader B elected with token=2
T=4: Leader B sends write with token=2 → ACCEPTED
T=5: Leader A wakes up, sends write with token=1
→ REJECTED (token < highest seen)
Without fencing token:
T=5: Leader A sends write → ACCEPTED → DATA CORRUPTION
With fencing token:
T=5: Leader A sends write → REJECTED → SAFE
2.2 Fencing Token Implementation
# Fencing Token Implementation
from dataclasses import dataclass
from typing import Optional, Any
from datetime import datetime, timedelta
import threading
import time
@dataclass
class LeaderLease:
"""A leader's lease with fencing token."""
leader_id: str
token: int
expires_at: datetime
def is_valid(self) -> bool:
"""Check if lease is still valid."""
return datetime.utcnow() < self.expires_at
class FencedResource:
"""
A resource protected by fencing tokens.
Rejects operations from stale leaders.
"""
def __init__(self, resource_id: str):
self.resource_id = resource_id
self.highest_token_seen = 0
self.lock = threading.Lock()
self.data: dict = {}
def write(self, key: str, value: Any, token: int) -> bool:
"""
Write to resource with fencing token.
Returns True if write was accepted.
"""
with self.lock:
if token < self.highest_token_seen:
# Stale leader - reject
return False
self.highest_token_seen = token
self.data[key] = value
return True
def read(self, key: str) -> Optional[Any]:
"""Read from resource."""
return self.data.get(key)
class FencedLeaderClient:
"""
Client that uses fencing tokens for leader operations.
"""
def __init__(self, client_id: str):
self.client_id = client_id
self.current_lease: Optional[LeaderLease] = None
def set_lease(self, lease: LeaderLease):
"""Set the current lease (from election)."""
self.current_lease = lease
def execute_as_leader(
self,
resource: FencedResource,
key: str,
value: Any
) -> bool:
"""
Execute a write operation as leader.
Includes fencing token validation.
"""
if not self.current_lease:
raise RuntimeError("Not a leader - no lease")
if not self.current_lease.is_valid():
raise RuntimeError("Lease expired")
# Include fencing token with request
success = resource.write(
key,
value,
self.current_lease.token
)
if not success:
# We're a zombie leader - stop operations
self.current_lease = None
raise RuntimeError("Fenced - no longer leader")
return True
# Example: Database with fencing
class FencedDatabaseClient:
"""Database client that enforces fencing tokens."""
def __init__(self, db_connection):
self.db = db_connection
self.highest_token_seen = 0
async def execute(self, query: str, params: tuple, token: int) -> Any:
"""
Execute query with fencing token check.
"""
# First, check token
if token < self.highest_token_seen:
raise FencingError(f"Stale token {token} < {self.highest_token_seen}")
self.highest_token_seen = token
# Execute the actual query
result = await self.db.execute(query, params)
return result
# Example: Distributed lock with fencing
class FencedLock:
"""
Distributed lock that provides fencing tokens.
"""
def __init__(self, lock_service, lock_name: str):
self.service = lock_service
self.lock_name = lock_name
self.token: Optional[int] = None
async def acquire(self, ttl_seconds: int = 30) -> int:
"""
Acquire lock and return fencing token.
The token should be passed to all protected resources.
"""
result = await self.service.acquire_lock(
self.lock_name,
ttl_seconds=ttl_seconds
)
self.token = result.fencing_token
return self.token
async def release(self):
"""Release the lock."""
if self.token:
await self.service.release_lock(self.lock_name, self.token)
self.token = None
def get_token(self) -> Optional[int]:
"""Get current fencing token."""
return self.token
Chapter 3: Consensus Algorithms
3.1 The Consensus Problem
CONSENSUS: Getting nodes to agree
All nodes must agree on:
├── Who is the leader
├── What is the current value
├── What operations to apply
└── In what order
REQUIREMENTS:
1. AGREEMENT
└── All non-faulty nodes decide same value
2. VALIDITY
└── Decided value was proposed by some node
3. TERMINATION
└── All non-faulty nodes eventually decide
4. INTEGRITY
└── Each node decides at most once
FAMOUS ALGORITHMS:
├── Paxos (Lamport, 1989)
│ └── Correct but hard to understand
│
├── Raft (Ongaro & Ousterhout, 2014)
│ └── Designed for understandability
│
├── Zab (Apache ZooKeeper)
│ └── Used in ZooKeeper
│
└── PBFT (Castro & Liskov, 1999)
└── For Byzantine fault tolerance
3.2 Raft Overview
RAFT CONSENSUS ALGORITHM
Nodes have three states:
├── LEADER: Handles all client requests
├── FOLLOWER: Replicates leader's log
└── CANDIDATE: Trying to become leader
LEADER ELECTION IN RAFT:
1. All nodes start as FOLLOWERS
2. Followers expect heartbeats from leader
3. If no heartbeat in election timeout → become CANDIDATE
4. Candidate increments TERM and requests votes
5. Node votes for first candidate in each term
6. Candidate with majority votes becomes LEADER
7. Leader sends heartbeats to maintain leadership
TERMS (Logical Clock):
Term 1: Leader A
Term 2: Leader A (re-elected)
Term 3: Leader B (A failed)
Term 4: Leader B (re-elected)
Term 5: Leader C
Terms act like fencing tokens!
Requests from old terms are rejected.
SAFETY GUARANTEE:
At most one leader per term.
Leader's log is authoritative.
Committed entries are never lost.
LEADER ELECTION FLOW:
┌─────────────────────────────────────────────────────────────────────────┐
│ RAFT ELECTION │
│ │
│ Follower Candidate Leader │
│ ──────── ───────── ────── │
│ │
│ Election timeout │ │
│ ──────────────────────────▶ │ │
│ │ │
│ Increment term │ │
│ Vote for self │ │
│ Send RequestVote │ │
│ │ │
│ ◀────RequestVote────────────│ │
│ │ │
│ ────────Vote────────────────▶ │
│ │ │
│ │ Majority votes │
│ │ ──────────────────────────────────────▶ │
│ │ │
│ Send heartbeats │
│ ◀───────────────────────────────────────── AppendEntries ───────────── │
│ │
└─────────────────────────────────────────────────────────────────────────┘
3.3 Raft Implementation Sketch
# Simplified Raft Implementation
import asyncio
import random
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Set
from enum import Enum
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class NodeState(Enum):
FOLLOWER = "follower"
CANDIDATE = "candidate"
LEADER = "leader"
@dataclass
class LogEntry:
"""Entry in the Raft log."""
term: int
index: int
command: str
data: dict
@dataclass
class RequestVoteRequest:
term: int
candidate_id: str
last_log_index: int
last_log_term: int
@dataclass
class RequestVoteResponse:
term: int
vote_granted: bool
@dataclass
class AppendEntriesRequest:
term: int
leader_id: str
prev_log_index: int
prev_log_term: int
entries: List[LogEntry]
leader_commit: int
@dataclass
class AppendEntriesResponse:
term: int
success: bool
class RaftNode:
"""
Simplified Raft consensus node.
This implementation focuses on leader election.
Log replication is simplified for clarity.
"""
def __init__(
self,
node_id: str,
peers: List[str],
rpc_client, # Client to communicate with peers
election_timeout_ms: tuple = (150, 300), # Random range
heartbeat_interval_ms: int = 50
):
self.node_id = node_id
self.peers = peers
self.rpc = rpc_client
self.election_timeout_range = election_timeout_ms
self.heartbeat_interval = heartbeat_interval_ms / 1000
# Persistent state
self.current_term = 0
self.voted_for: Optional[str] = None
self.log: List[LogEntry] = []
# Volatile state
self.state = NodeState.FOLLOWER
self.commit_index = 0
self.last_applied = 0
# Leader state
self.next_index: Dict[str, int] = {}
self.match_index: Dict[str, int] = {}
# Timing
self.last_heartbeat = datetime.utcnow()
self.election_timeout = self._random_election_timeout()
# Control
self.running = False
def _random_election_timeout(self) -> float:
"""Get random election timeout in seconds."""
min_ms, max_ms = self.election_timeout_range
return random.randint(min_ms, max_ms) / 1000
async def start(self):
"""Start the Raft node."""
self.running = True
asyncio.create_task(self._run_loop())
async def stop(self):
"""Stop the Raft node."""
self.running = False
async def _run_loop(self):
"""Main loop for Raft node."""
while self.running:
if self.state == NodeState.LEADER:
await self._leader_loop()
else:
await self._follower_candidate_loop()
await asyncio.sleep(0.01) # Small delay
async def _follower_candidate_loop(self):
"""Loop for follower/candidate state."""
now = datetime.utcnow()
time_since_heartbeat = (now - self.last_heartbeat).total_seconds()
if time_since_heartbeat > self.election_timeout:
# Election timeout - start election
await self._start_election()
async def _leader_loop(self):
"""Loop for leader state - send heartbeats."""
await self._send_heartbeats()
await asyncio.sleep(self.heartbeat_interval)
async def _start_election(self):
"""Start a leader election."""
self.state = NodeState.CANDIDATE
self.current_term += 1
self.voted_for = self.node_id
self.election_timeout = self._random_election_timeout()
self.last_heartbeat = datetime.utcnow()
logger.info(f"Node {self.node_id} starting election for term {self.current_term}")
# Request votes from all peers
votes_received = 1 # Vote for self
last_log_index = len(self.log) - 1 if self.log else -1
last_log_term = self.log[last_log_index].term if self.log else 0
request = RequestVoteRequest(
term=self.current_term,
candidate_id=self.node_id,
last_log_index=last_log_index,
last_log_term=last_log_term
)
# Send to all peers in parallel
tasks = [
self._request_vote(peer, request)
for peer in self.peers
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
for response in responses:
if isinstance(response, Exception):
continue
if response.term > self.current_term:
# Discovered higher term - step down
self._step_down(response.term)
return
if response.vote_granted:
votes_received += 1
# Check if we won
majority = (len(self.peers) + 1) // 2 + 1
if votes_received >= majority and self.state == NodeState.CANDIDATE:
self._become_leader()
async def _request_vote(self, peer: str, request: RequestVoteRequest) -> RequestVoteResponse:
"""Request vote from a peer."""
try:
response = await self.rpc.request_vote(peer, request)
return response
except Exception as e:
logger.warning(f"Failed to request vote from {peer}: {e}")
raise
def _become_leader(self):
"""Transition to leader state."""
self.state = NodeState.LEADER
logger.info(f"Node {self.node_id} became leader for term {self.current_term}")
# Initialize leader state
next_idx = len(self.log)
for peer in self.peers:
self.next_index[peer] = next_idx
self.match_index[peer] = 0
def _step_down(self, term: int):
"""Step down to follower state."""
self.current_term = term
self.state = NodeState.FOLLOWER
self.voted_for = None
logger.info(f"Node {self.node_id} stepping down, term={term}")
async def _send_heartbeats(self):
"""Send heartbeats to all peers."""
tasks = [
self._send_append_entries(peer)
for peer in self.peers
]
await asyncio.gather(*tasks, return_exceptions=True)
async def _send_append_entries(self, peer: str):
"""Send AppendEntries to a peer."""
prev_log_index = self.next_index.get(peer, 0) - 1
prev_log_term = 0
if prev_log_index >= 0 and prev_log_index < len(self.log):
prev_log_term = self.log[prev_log_index].term
# Get entries to send
entries = self.log[self.next_index.get(peer, 0):]
request = AppendEntriesRequest(
term=self.current_term,
leader_id=self.node_id,
prev_log_index=prev_log_index,
prev_log_term=prev_log_term,
entries=entries,
leader_commit=self.commit_index
)
try:
response = await self.rpc.append_entries(peer, request)
if response.term > self.current_term:
self._step_down(response.term)
return
if response.success:
# Update next_index and match_index
self.next_index[peer] = prev_log_index + len(entries) + 1
self.match_index[peer] = self.next_index[peer] - 1
else:
# Decrement next_index and retry
self.next_index[peer] = max(0, self.next_index.get(peer, 1) - 1)
except Exception as e:
logger.warning(f"Failed to send AppendEntries to {peer}: {e}")
# ==========================================================================
# RPC Handlers
# ==========================================================================
async def handle_request_vote(self, request: RequestVoteRequest) -> RequestVoteResponse:
"""Handle RequestVote RPC from a candidate."""
# If request term is higher, update our term
if request.term > self.current_term:
self._step_down(request.term)
# Deny vote if our term is higher
if request.term < self.current_term:
return RequestVoteResponse(
term=self.current_term,
vote_granted=False
)
# Check if we can vote for this candidate
can_vote = (
self.voted_for is None or
self.voted_for == request.candidate_id
)
# Check if candidate's log is at least as up-to-date
last_log_index = len(self.log) - 1 if self.log else -1
last_log_term = self.log[last_log_index].term if self.log else 0
log_ok = (
request.last_log_term > last_log_term or
(request.last_log_term == last_log_term and
request.last_log_index >= last_log_index)
)
if can_vote and log_ok:
self.voted_for = request.candidate_id
self.last_heartbeat = datetime.utcnow() # Reset election timer
return RequestVoteResponse(
term=self.current_term,
vote_granted=True
)
return RequestVoteResponse(
term=self.current_term,
vote_granted=False
)
async def handle_append_entries(self, request: AppendEntriesRequest) -> AppendEntriesResponse:
"""Handle AppendEntries RPC from leader."""
# If request term is higher, update our term
if request.term > self.current_term:
self._step_down(request.term)
# Reject if term is lower
if request.term < self.current_term:
return AppendEntriesResponse(
term=self.current_term,
success=False
)
# Valid AppendEntries from leader - reset election timer
self.last_heartbeat = datetime.utcnow()
self.state = NodeState.FOLLOWER
# Check log consistency
if request.prev_log_index >= 0:
if request.prev_log_index >= len(self.log):
return AppendEntriesResponse(
term=self.current_term,
success=False
)
if self.log[request.prev_log_index].term != request.prev_log_term:
# Conflicting entry - delete and all following
self.log = self.log[:request.prev_log_index]
return AppendEntriesResponse(
term=self.current_term,
success=False
)
# Append new entries
for entry in request.entries:
if entry.index < len(self.log):
if self.log[entry.index].term != entry.term:
self.log = self.log[:entry.index]
self.log.append(entry)
else:
self.log.append(entry)
# Update commit index
if request.leader_commit > self.commit_index:
self.commit_index = min(
request.leader_commit,
len(self.log) - 1 if self.log else 0
)
return AppendEntriesResponse(
term=self.current_term,
success=True
)
# ==========================================================================
# Client API
# ==========================================================================
def is_leader(self) -> bool:
"""Check if this node is the leader."""
return self.state == NodeState.LEADER
def get_leader_info(self) -> dict:
"""Get information about current leadership."""
return {
"node_id": self.node_id,
"state": self.state.value,
"term": self.current_term,
"is_leader": self.is_leader()
}
Chapter 4: Coordination Services
4.1 ZooKeeper
ZOOKEEPER OVERVIEW
ZooKeeper is a centralized coordination service that provides:
├── Configuration management
├── Naming registry
├── Distributed synchronization (locks)
├── Group membership
└── Leader election
ZOOKEEPER DATA MODEL:
Hierarchical namespace (like filesystem):
/
├── /config
│ ├── /config/database
│ └── /config/cache
├── /locks
│ ├── /locks/resource-1
│ └── /locks/resource-2
├── /leader-election
│ └── /leader-election/candidates
└── /service-discovery
├── /service-discovery/api-server
└── /service-discovery/web-server
Each node (znode) can:
├── Store data (up to 1MB)
├── Have children
├── Be ephemeral (deleted when session ends)
└── Be sequential (auto-incrementing suffix)
KEY FEATURES:
1. EPHEMERAL NODES
└── Automatically deleted when client disconnects
└── Perfect for leader election, locks
2. SEQUENTIAL NODES
└── ZooKeeper appends monotonic counter
└── /lock/lock-0000000001, /lock/lock-0000000002
└── Natural ordering for fair locks
3. WATCHES
└── Get notified when znode changes
└── One-time trigger (must re-register)
└── Used for leader change notification
4. SESSIONS
└── Client maintains session with ZK cluster
└── Heartbeats keep session alive
└── Session timeout = client considered dead
4.2 Leader Election with ZooKeeper
# Leader Election with ZooKeeper
from kazoo.client import KazooClient
from kazoo.recipe.election import Election
from kazoo.recipe.watchers import DataWatch
import threading
import logging
logger = logging.getLogger(__name__)
class ZooKeeperLeaderElection:
"""
Leader election using ZooKeeper.
Uses ephemeral sequential nodes for fair election.
"""
def __init__(
self,
zk_hosts: str,
election_path: str,
node_id: str,
on_leader: callable,
on_follower: callable
):
self.zk = KazooClient(hosts=zk_hosts)
self.election_path = election_path
self.node_id = node_id
self.on_leader = on_leader
self.on_follower = on_follower
self.is_leader = False
self.my_node: str = None
self.running = False
def start(self):
"""Start the leader election."""
self.zk.start()
self.running = True
# Ensure election path exists
self.zk.ensure_path(self.election_path)
# Create ephemeral sequential node
self.my_node = self.zk.create(
f"{self.election_path}/candidate-",
value=self.node_id.encode(),
ephemeral=True,
sequence=True
)
logger.info(f"Created election node: {self.my_node}")
# Check if we're the leader
self._check_leadership()
def stop(self):
"""Stop the leader election."""
self.running = False
if self.my_node:
try:
self.zk.delete(self.my_node)
except Exception:
pass
self.zk.stop()
def _check_leadership(self):
"""Check if we're the leader."""
# Get all candidates
candidates = self.zk.get_children(self.election_path)
candidates.sort() # Sort by sequence number
my_name = self.my_node.split("/")[-1]
my_index = candidates.index(my_name)
if my_index == 0:
# We're first - we're the leader!
if not self.is_leader:
self.is_leader = True
logger.info(f"Node {self.node_id} became leader")
self.on_leader()
else:
# Watch the node before us
self.is_leader = False
watch_node = f"{self.election_path}/{candidates[my_index - 1]}"
@self.zk.DataWatch(watch_node)
def watch_predecessor(data, stat, event):
if event and event.type == "DELETED":
# Predecessor died - check leadership again
self._check_leadership()
return False # Stop watching
return True # Keep watching
self.on_follower()
def get_leader(self) -> str:
"""Get current leader's node ID."""
candidates = self.zk.get_children(self.election_path)
candidates.sort()
if candidates:
leader_path = f"{self.election_path}/{candidates[0]}"
data, _ = self.zk.get(leader_path)
return data.decode()
return None
# Using Kazoo's built-in Election recipe
class KazooLeaderElection:
"""
Simpler leader election using Kazoo's Election recipe.
"""
def __init__(
self,
zk_hosts: str,
election_path: str,
node_id: str
):
self.zk = KazooClient(hosts=zk_hosts)
self.election = None
self.election_path = election_path
self.node_id = node_id
self.leader_event = threading.Event()
def start(self):
"""Start and run for election."""
self.zk.start()
self.election = Election(self.zk, self.election_path, self.node_id)
def run_for_leader(self, leader_func: callable):
"""
Run for leader - blocks until elected.
leader_func is called when we become leader.
When leader_func returns (or raises), leadership is released.
"""
self.election.run(leader_func)
def contenders(self) -> list:
"""Get list of election contenders."""
return self.election.contenders()
def stop(self):
"""Stop the election."""
self.election.cancel()
self.zk.stop()
# Example usage
def leader_task():
"""Task to run while we're the leader."""
logger.info("I am the leader! Starting leader duties...")
try:
while True:
# Do leader work
process_as_leader()
time.sleep(1)
except Exception as e:
logger.error(f"Leader task failed: {e}")
raise # Will release leadership
def main():
election = KazooLeaderElection(
zk_hosts="localhost:2181",
election_path="/my-service/leader",
node_id="node-1"
)
election.start()
# This blocks until we're elected, then runs leader_task
# When leader_task returns/fails, we release leadership
election.run_for_leader(leader_task)
4.3 Distributed Locks with ZooKeeper
# Distributed Locks with ZooKeeper
from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock
from contextlib import contextmanager
import time
class DistributedLock:
"""
Distributed lock using ZooKeeper.
Features:
- Automatic release on disconnect (ephemeral nodes)
- Fair ordering (sequential nodes)
- Fencing token support
"""
def __init__(self, zk_hosts: str, lock_path: str, identifier: str):
self.zk = KazooClient(hosts=zk_hosts)
self.lock_path = lock_path
self.identifier = identifier
self.lock = None
self.fencing_token: int = None
def start(self):
"""Connect to ZooKeeper."""
self.zk.start()
self.lock = Lock(self.zk, self.lock_path, self.identifier)
def stop(self):
"""Disconnect from ZooKeeper."""
self.zk.stop()
def acquire(self, timeout: float = None, blocking: bool = True) -> bool:
"""
Acquire the lock.
Returns True if acquired, False if timeout/non-blocking fail.
"""
acquired = self.lock.acquire(blocking=blocking, timeout=timeout)
if acquired:
# Get fencing token from our node's sequence number
self.fencing_token = self._get_fencing_token()
return acquired
def release(self):
"""Release the lock."""
self.lock.release()
self.fencing_token = None
def get_fencing_token(self) -> int:
"""Get the fencing token for this lock acquisition."""
return self.fencing_token
def _get_fencing_token(self) -> int:
"""Extract fencing token from ZooKeeper sequential node."""
# The lock creates nodes like /lock/lock-0000000001
# Extract the sequence number as fencing token
if hasattr(self.lock, 'node') and self.lock.node:
node_name = self.lock.node.split("/")[-1]
# Extract sequence number (last 10 digits)
return int(node_name[-10:])
return 0
@contextmanager
def acquired(self, timeout: float = None):
"""Context manager for lock acquisition."""
if self.acquire(timeout=timeout):
try:
yield self.fencing_token
finally:
self.release()
else:
raise LockAcquisitionError("Failed to acquire lock")
# Example: Protected critical section
async def critical_section_example():
"""Example of using distributed lock with fencing."""
lock = DistributedLock(
zk_hosts="localhost:2181",
lock_path="/locks/critical-resource",
identifier="worker-1"
)
lock.start()
try:
with lock.acquired(timeout=10) as fencing_token:
print(f"Acquired lock with token: {fencing_token}")
# Use fencing token when accessing resources
await database.execute(
"UPDATE resource SET value = $1 WHERE fencing_token < $2",
new_value,
fencing_token
)
except LockAcquisitionError:
print("Could not acquire lock")
finally:
lock.stop()
4.4 etcd for Coordination
# Leader Election and Locks with etcd
import etcd3
from typing import Optional, Callable
import threading
import time
class EtcdLeaderElection:
"""
Leader election using etcd.
Uses etcd's lease and election mechanisms.
"""
def __init__(
self,
etcd_host: str,
etcd_port: int,
election_name: str,
node_id: str,
lease_ttl: int = 10
):
self.client = etcd3.client(host=etcd_host, port=etcd_port)
self.election_name = election_name
self.node_id = node_id
self.lease_ttl = lease_ttl
self.lease = None
self.election = None
self.is_leader = False
def campaign(self, on_elected: Callable):
"""
Campaign for leadership.
Blocks until elected, then calls on_elected.
"""
# Create lease
self.lease = self.client.lease(self.lease_ttl)
# Create election
self.election = self.client.election(self.election_name)
# Campaign (blocks until elected)
self.election.campaign(self.node_id.encode(), lease=self.lease)
self.is_leader = True
# Start lease keepalive in background
keepalive_thread = threading.Thread(target=self._keepalive_loop, daemon=True)
keepalive_thread.start()
# Run leader function
try:
on_elected()
finally:
self.resign()
def _keepalive_loop(self):
"""Keep lease alive while we're leader."""
while self.is_leader:
try:
self.lease.refresh()
time.sleep(self.lease_ttl / 3)
except Exception as e:
print(f"Keepalive failed: {e}")
self.is_leader = False
break
def resign(self):
"""Resign from leadership."""
self.is_leader = False
if self.election:
self.election.resign()
if self.lease:
self.lease.revoke()
def get_leader(self) -> Optional[str]:
"""Get current leader."""
if self.election:
leader = self.election.leader()
if leader:
return leader.value.decode()
return None
class EtcdDistributedLock:
"""
Distributed lock using etcd.
"""
def __init__(
self,
etcd_host: str,
etcd_port: int,
lock_name: str,
ttl: int = 30
):
self.client = etcd3.client(host=etcd_host, port=etcd_port)
self.lock_name = lock_name
self.ttl = ttl
self.lock = None
def acquire(self, timeout: float = None) -> bool:
"""Acquire the lock."""
self.lock = self.client.lock(self.lock_name, ttl=self.ttl)
return self.lock.acquire(timeout=timeout)
def release(self):
"""Release the lock."""
if self.lock:
self.lock.release()
self.lock = None
def refresh(self):
"""Refresh the lock's TTL."""
if self.lock:
self.lock.refresh()
@property
def is_acquired(self) -> bool:
"""Check if lock is acquired."""
return self.lock is not None and self.lock.is_acquired()
# Example: Job scheduler with leader election
class DistributedJobScheduler:
"""
Job scheduler that uses leader election.
Only the leader schedules jobs.
"""
def __init__(self, etcd_host: str, node_id: str):
self.election = EtcdLeaderElection(
etcd_host=etcd_host,
etcd_port=2379,
election_name="job-scheduler",
node_id=node_id
)
self.running = False
def start(self):
"""Start the scheduler."""
self.running = True
# Campaign for leadership (blocks until elected)
self.election.campaign(self._run_as_leader)
def _run_as_leader(self):
"""Run scheduler as leader."""
print(f"Became leader, starting job scheduling...")
while self.running and self.election.is_leader:
# Schedule jobs
self._schedule_pending_jobs()
time.sleep(1)
print("No longer leader, stopping scheduling")
def _schedule_pending_jobs(self):
"""Schedule pending jobs (leader only)."""
# Implementation would fetch pending jobs and assign to workers
pass
def stop(self):
"""Stop the scheduler."""
self.running = False
self.election.resign()
Part II: Production Patterns
Chapter 5: Common Patterns
5.1 The Lease Pattern
# Lease-Based Leadership
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import asyncio
@dataclass
class Lease:
"""A time-limited lease for leadership."""
holder: str
token: int
expires_at: datetime
def is_valid(self) -> bool:
return datetime.utcnow() < self.expires_at
def remaining_time(self) -> timedelta:
return self.expires_at - datetime.utcnow()
class LeaseBasedLeader:
"""
Leader election using time-based leases.
Leader must renew lease before expiration.
If lease expires, another node can claim leadership.
"""
def __init__(
self,
node_id: str,
lease_store, # Database or coordination service
lease_duration: timedelta = timedelta(seconds=30),
renewal_interval: timedelta = timedelta(seconds=10)
):
self.node_id = node_id
self.store = lease_store
self.lease_duration = lease_duration
self.renewal_interval = renewal_interval
self.current_lease: Optional[Lease] = None
self.is_leader = False
self.running = False
async def start(self):
"""Start the leader election process."""
self.running = True
asyncio.create_task(self._election_loop())
async def stop(self):
"""Stop and release leadership."""
self.running = False
if self.is_leader:
await self._release_lease()
async def _election_loop(self):
"""Main election loop."""
while self.running:
try:
if self.is_leader:
await self._renew_lease()
else:
await self._try_acquire_lease()
except Exception as e:
logger.error(f"Election loop error: {e}")
self.is_leader = False
await asyncio.sleep(self.renewal_interval.total_seconds())
async def _try_acquire_lease(self):
"""Try to acquire leadership lease."""
# Atomic compare-and-set in database
result = await self.store.try_acquire_lease(
lease_key="leader",
holder=self.node_id,
duration=self.lease_duration
)
if result.acquired:
self.current_lease = Lease(
holder=self.node_id,
token=result.token,
expires_at=result.expires_at
)
self.is_leader = True
logger.info(f"Acquired leadership with token {result.token}")
async def _renew_lease(self):
"""Renew the leadership lease."""
if not self.current_lease or not self.current_lease.is_valid():
self.is_leader = False
return
result = await self.store.renew_lease(
lease_key="leader",
holder=self.node_id,
current_token=self.current_lease.token,
duration=self.lease_duration
)
if result.renewed:
self.current_lease = Lease(
holder=self.node_id,
token=result.token,
expires_at=result.expires_at
)
else:
# Lost leadership
self.is_leader = False
self.current_lease = None
logger.warning("Lost leadership - lease not renewed")
async def _release_lease(self):
"""Explicitly release the lease."""
if self.current_lease:
await self.store.release_lease(
lease_key="leader",
holder=self.node_id,
token=self.current_lease.token
)
self.current_lease = None
self.is_leader = False
def get_fencing_token(self) -> Optional[int]:
"""Get fencing token for leader operations."""
if self.current_lease and self.is_leader:
return self.current_lease.token
return None
# Database-backed lease store
class PostgresLeaseStore:
"""
Lease store backed by PostgreSQL.
Uses advisory locks and atomic operations.
"""
def __init__(self, db_pool):
self.db = db_pool
async def try_acquire_lease(
self,
lease_key: str,
holder: str,
duration: timedelta
) -> dict:
"""Try to acquire a lease atomically."""
expires_at = datetime.utcnow() + duration
# Atomic: Insert if not exists, or update if expired
result = await self.db.fetchrow("""
INSERT INTO leases (lease_key, holder, token, expires_at)
VALUES ($1, $2, 1, $3)
ON CONFLICT (lease_key) DO UPDATE
SET holder = $2,
token = leases.token + 1,
expires_at = $3
WHERE leases.expires_at < NOW()
RETURNING token, expires_at
""", lease_key, holder, expires_at)
if result:
return {
"acquired": True,
"token": result["token"],
"expires_at": result["expires_at"]
}
return {"acquired": False}
async def renew_lease(
self,
lease_key: str,
holder: str,
current_token: int,
duration: timedelta
) -> dict:
"""Renew an existing lease."""
expires_at = datetime.utcnow() + duration
# Only renew if we still hold the lease with the same token
result = await self.db.fetchrow("""
UPDATE leases
SET expires_at = $4,
token = token + 1
WHERE lease_key = $1
AND holder = $2
AND token = $3
RETURNING token, expires_at
""", lease_key, holder, current_token, expires_at)
if result:
return {
"renewed": True,
"token": result["token"],
"expires_at": result["expires_at"]
}
return {"renewed": False}
async def release_lease(
self,
lease_key: str,
holder: str,
token: int
):
"""Release a lease."""
await self.db.execute("""
DELETE FROM leases
WHERE lease_key = $1
AND holder = $2
AND token = $3
""", lease_key, holder, token)
5.2 Preventing Split-Brain with Quorums
# Quorum-Based Leader Election
class QuorumLeaderElection:
"""
Leader election requiring quorum agreement.
Prevents split-brain by requiring majority.
"""
def __init__(
self,
node_id: str,
all_nodes: list[str],
rpc_client
):
self.node_id = node_id
self.all_nodes = all_nodes
self.rpc = rpc_client
self.quorum_size = len(all_nodes) // 2 + 1
self.current_term = 0
self.voted_for: Optional[str] = None
self.is_leader = False
async def try_become_leader(self) -> bool:
"""
Attempt to become leader.
Returns True if elected with quorum support.
"""
self.current_term += 1
self.voted_for = self.node_id
votes = 1 # Vote for self
# Request votes from all other nodes
for node in self.all_nodes:
if node == self.node_id:
continue
try:
response = await self.rpc.request_vote(
node,
term=self.current_term,
candidate=self.node_id
)
if response.vote_granted:
votes += 1
except Exception as e:
logger.warning(f"Failed to get vote from {node}: {e}")
# Check if we have quorum
if votes >= self.quorum_size:
self.is_leader = True
logger.info(f"Elected leader with {votes}/{len(self.all_nodes)} votes")
return True
logger.info(f"Election failed: {votes}/{self.quorum_size} votes needed")
return False
async def verify_leadership(self) -> bool:
"""
Verify we still have quorum support.
Call periodically to ensure we're still the valid leader.
"""
if not self.is_leader:
return False
confirmations = 1 # Self
for node in self.all_nodes:
if node == self.node_id:
continue
try:
response = await self.rpc.heartbeat(
node,
term=self.current_term,
leader=self.node_id
)
if response.acknowledged:
confirmations += 1
except Exception:
pass # Node unreachable
if confirmations < self.quorum_size:
# Lost quorum - step down
self.is_leader = False
logger.warning("Lost quorum - stepping down")
return False
return True
Part III: Real-World Application
Chapter 6: Case Studies
6.1 Case Study: Kafka Partition Leadership
KAFKA PARTITION LEADERSHIP
Each Kafka partition has ONE leader replica.
LEADERSHIP ASSIGNMENT:
├── Controller (special broker) assigns leaders
├── Controller is itself elected via ZooKeeper
├── Leaders handle all reads and writes
└── Followers replicate from leader
LEADER ELECTION TRIGGER:
├── Broker failure (leader dies)
├── Controlled shutdown (graceful)
├── Partition reassignment (rebalance)
└── Preferred leader election
ISR (In-Sync Replicas):
├── Replicas that are caught up with leader
├── New leader must be from ISR (data safety)
├── If ISR empty, either wait or allow "unclean" election
FENCING IN KAFKA:
├── Epoch number (like fencing token)
├── Incremented on each leader change
├── Replicas reject requests from old leaders
├── Prevents zombie leader writes
EXAMPLE FAILURE SCENARIO:
Before:
Partition 0: Leader=Broker1, ISR=[Broker1, Broker2, Broker3]
Broker1 fails:
Controller detects via ZooKeeper session timeout
Controller selects new leader from ISR (Broker2)
Controller increments epoch
Controller notifies all brokers
After:
Partition 0: Leader=Broker2, ISR=[Broker2, Broker3], Epoch=2
If Broker1 comes back:
Broker1 tries to act as leader with Epoch=1
Other brokers reject (epoch too old)
Broker1 becomes follower, catches up
6.2 Case Study: Redis Sentinel
REDIS SENTINEL LEADER ELECTION
Sentinel monitors Redis and handles failover.
COMPONENTS:
├── Multiple Sentinel instances (typically 3+)
├── Each Sentinel monitors Redis master and replicas
├── Sentinels communicate via pub/sub and direct connections
└── Quorum required for failover decisions
FAILOVER PROCESS:
1. DETECTION
├── Sentinel can't reach master (SDOWN - subjective down)
├── Sentinel asks other Sentinels
└── Quorum agrees master is down (ODOWN - objective down)
2. SENTINEL LEADER ELECTION
├── Sentinels elect a leader to perform failover
├── Uses Raft-like voting
├── Need majority vote
└── Leader performs the actual failover
3. REPLICA SELECTION
├── Leader selects best replica
├── Criteria: priority, replication offset, runid
└── Selected replica promoted to master
4. CONFIGURATION PROPAGATION
├── Other replicas reconfigured to follow new master
├── Clients notified of new master
└── Old master (if returns) becomes replica
CONFIGURATION EPOCH:
├── Monotonically increasing counter
├── Incremented on each failover
├── Used as fencing token
└── Prevents stale configuration
6.3 Case Study: Kubernetes Controller Leader Election
KUBERNETES LEADER ELECTION
Kubernetes controllers use leader election for HA.
MECHANISM:
├── Uses Kubernetes API (Lease, ConfigMap, or Endpoints)
├── Lease object with holder identity and renewal time
├── Controller that holds lease is the leader
└── Other controllers are standby
LEASE OBJECT:
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
name: my-controller
namespace: kube-system
spec:
holderIdentity: controller-pod-xyz
leaseDurationSeconds: 15
acquireTime: "2024-01-15T10:00:00Z"
renewTime: "2024-01-15T10:00:10Z"
leaseTransitions: 5
CODE EXAMPLE (Go):
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: "my-controller",
Namespace: "kube-system",
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: podName,
},
}
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// Start controller logic
runController(ctx)
},
OnStoppedLeading: func() {
// Lost leadership
os.Exit(0) // Restart and try again
},
},
})
Chapter 7: Common Mistakes
7.1 Mistake 1: Not Using Fencing Tokens
❌ WRONG: Lock without fencing
# Process A acquires lock
lock.acquire()
# Process A does slow operation
# Lock expires during operation
# Process B acquires lock
# Process A still thinks it has lock
# Process A writes to database
# Process B also writes to database
# CORRUPTED DATA!
✅ CORRECT: Lock with fencing token
# Process A acquires lock with token
token = lock.acquire() # token = 33
# Process A sends write with token
database.write(data, fencing_token=33)
# Lock expires, Process B acquires
token_b = lock.acquire() # token = 34
# Process A tries to write (stale token)
database.write(data, fencing_token=33) # REJECTED
# Process B writes (valid token)
database.write(data, fencing_token=34) # ACCEPTED
7.2 Mistake 2: Assuming Network Is Reliable
❌ WRONG: Simple timeout-based failure detection
if not heartbeat_received_in(5_seconds):
# Assume leader is dead
become_leader() # DANGEROUS!
✅ CORRECT: Quorum-based detection
async def check_leader_status():
responses = await ask_all_nodes("is_leader_alive?")
alive_votes = sum(1 for r in responses if r.alive)
if alive_votes < quorum:
# Quorum agrees leader is dead
# Safe to elect new leader
await start_election()
7.3 Mistake 3: Ignoring Clock Skew
❌ WRONG: Relying on wall-clock time for lease
class BadLease:
def is_valid(self):
# Problem: clock might be wrong!
return datetime.now() < self.expires_at
def acquire(self):
# If my clock is behind, I think lease is valid longer
# Other nodes with correct clocks elect new leader
# Split-brain!
✅ CORRECT: Use monotonic clocks and short leases
class BetterLease:
def is_valid(self):
# Use monotonic clock (can't go backwards)
elapsed = time.monotonic() - self.acquired_at_monotonic
return elapsed < self.duration_seconds
def should_renew(self):
# Renew well before expiration
elapsed = time.monotonic() - self.acquired_at_monotonic
return elapsed > self.duration_seconds * 0.5
7.4 Mistake Checklist
- Always use fencing tokens — Prevent zombie leaders
- Require quorum for elections — Prevent split-brain
- Use monotonic clocks — Avoid clock skew issues
- Renew leases early — Buffer for network delays
- Handle election failures — Retry with backoff
- Test network partitions — Chaos engineering
- Monitor leader changes — Alert on frequent failovers
Part IV: Interview Preparation
Chapter 8: Interview Tips
8.1 Key Phrases
INTRODUCING LEADER ELECTION:
"For this system, we need exactly one node to be the leader.
I'd use a coordination service like ZooKeeper or etcd for
leader election. They provide consensus-based primitives
that guarantee at most one leader."
ON SPLIT-BRAIN:
"The main danger is split-brain — two nodes both thinking
they're the leader. To prevent this, I'd require quorum
agreement and use fencing tokens. Resources reject requests
from old leaders based on their fencing token."
ON FENCING TOKENS:
"A fencing token is a monotonically increasing number given
to each new leader. When the leader makes requests, it
includes the token. Resources track the highest token seen
and reject lower ones. This prevents zombie leaders from
corrupting data."
ON COORDINATION SERVICES:
"ZooKeeper and etcd provide consensus-based coordination.
They use Raft or Zab for internal consensus, ensuring
their state is consistent even during failures. We can
build leader election on top using ephemeral nodes or
leases."
ON FAILURE DETECTION:
"Detecting leader failure is tricky — a slow network looks
like a dead leader. I'd use a lease-based approach with
short TTLs. If the leader doesn't renew its lease, it
expires and others can claim leadership. Combined with
fencing tokens, this is safe."
8.2 Common Questions
| Question | Good Answer |
|---|---|
| "How do you prevent split-brain?" | "Three mechanisms: (1) Quorum-based election requiring majority, (2) Fencing tokens that resources use to reject stale leaders, (3) Leases that expire if not renewed. A zombie leader's requests get rejected because its fencing token is stale." |
| "What if ZooKeeper itself fails?" | "ZooKeeper runs as a cluster (typically 3-5 nodes) with its own consensus. It can tolerate minority failures. If majority fails, it becomes unavailable (CP system). Applications should handle ZK unavailability gracefully — typically the current leader continues until ZK recovers." |
| "How do you choose lease duration?" | "Trade-off between failover time and false positives. Shorter leases = faster failover but more risk of spurious elections during network hiccups. I'd start with 10-30 seconds, renew at 1/3 of lease duration, and tune based on network reliability." |
| "Raft vs Paxos?" | "Both solve consensus. Paxos is older, proven correct, but hard to understand and implement. Raft was designed for understandability with clear leader election and log replication phases. Most new systems use Raft (etcd, CockroachDB, TiKV)." |
Chapter 9: Practice Problems
Problem 1: Job Scheduler Leader
Setup: Design leader election for a distributed job scheduler. Only the leader should assign jobs to workers.
Requirements:
- At most one scheduler assigns jobs at a time
- Fast failover (< 30 seconds)
- Workers should not receive duplicate job assignments
Questions:
- How do you elect the leader?
- How do you prevent duplicate job assignments during failover?
- What happens to in-progress jobs during failover?
Leader Election:
# Use etcd lease-based election
election = EtcdLeaderElection(
election_name="/scheduler/leader",
lease_ttl=10 # 10 second lease
)
# Campaign for leadership
election.campaign(run_scheduler)
Preventing Duplicates:
# Include fencing token in job assignments
def assign_job(job_id, worker_id, fencing_token):
# Atomic: Only assign if token is highest seen
db.execute("""
UPDATE jobs
SET worker_id = $2, assigned_token = $3
WHERE id = $1
AND (assigned_token IS NULL OR assigned_token < $3)
""", job_id, worker_id, fencing_token)
In-Progress Jobs:
- Jobs have timeout - if not completed, become available again
- New leader scans for timed-out jobs
- Workers heartbeat progress to prevent timeout
Problem 2: Database Primary Failover
Setup: Design automatic failover for a database with one primary and two replicas.
Requirements:
- No data loss on failover
- Minimal downtime
- Prevent split-brain (two primaries)
Questions:
- How do you detect primary failure?
- How do you select the new primary?
- How do you prevent the old primary from accepting writes?
Failure Detection:
# Consensus-based detection
# All replicas monitor primary
# Require majority to agree primary is down
async def check_primary():
responses = await ask_replicas("can_reach_primary?")
if sum(not r.can_reach for r in responses) >= majority:
# Majority can't reach - initiate failover
await initiate_failover()
New Primary Selection:
# Select replica with most up-to-date data
# Use replication position (LSN) as criteria
def select_new_primary(replicas):
# Sort by replication position (descending)
replicas.sort(key=lambda r: r.replication_lsn, reverse=True)
return replicas[0]
Preventing Split-Brain:
# Use fencing via network isolation or STONITH
# 1. Revoke old primary's network access to storage
# 2. Or use fencing token in storage layer
async def fence_old_primary():
# Increment epoch in coordination service
new_epoch = await etcd.increment("/db/epoch")
# Storage rejects writes with old epoch
# Old primary's writes fail
Chapter 10: Sample Interview Dialogue
Scenario: Design Leader Election for Distributed Cache Invalidation
Interviewer: "Design a system where only one node broadcasts cache invalidation messages to prevent duplicate invalidations."
You: "I'll use leader election to ensure only one node is the invalidator. Let me walk through the design..."
You draw on the whiteboard:
Cache Invalidation System:
┌───────────────────────────────────────────────────────────────────────┐
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Server │ │ Server │ │ Server │ │
│ │ A │ │ B │ │ C │ │
│ │(leader) │ │(standby)│ │(standby)│ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └───────────────┼───────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ ZooKeeper │ │
│ │ Cluster │ │
│ └─────────────┘ │
│ │
└───────────────────────────────────────────────────────────────────────┘
You: "I'd use ZooKeeper for leader election. Each server creates an ephemeral sequential node. The server with the lowest sequence number is the leader. If the leader dies, its ephemeral node is deleted, and the next server becomes leader."
Interviewer: "What if the leader is slow, not dead?"
You: "This is the zombie leader problem. To handle it, I'd use fencing tokens. Each time a new leader is elected, the epoch increments. When the leader broadcasts invalidations, it includes its epoch.
# Leader broadcasts with epoch
def broadcast_invalidation(key, epoch):
message = {
'key': key,
'epoch': epoch,
'timestamp': time.time()
}
kafka.send('invalidations', message)
# Cache servers filter by epoch
def handle_invalidation(message):
if message['epoch'] < last_seen_epoch:
return # Ignore stale leader
last_seen_epoch = message['epoch']
cache.invalidate(message['key'])
A zombie leader's messages have an old epoch and get ignored."
Interviewer: "How fast can you failover?"
You: "ZooKeeper session timeout is typically 10-30 seconds. After the leader's session expires, election happens immediately. So failover is roughly the session timeout.
For faster failover, I could:
- Use shorter session timeouts (but risk spurious failovers)
- Have standbys watch the leader's node and prepare to take over
- Use a separate 'leader health' znode with shorter TTL
In practice, 10-15 seconds is acceptable for cache invalidation. Missing a few invalidations during failover is okay — caches have TTL anyway."
Interviewer: "What if ZooKeeper is unavailable?"
You: "ZooKeeper itself is a replicated system — it tolerates minority failures. If the majority of ZooKeeper nodes fail, it becomes unavailable.
During ZooKeeper unavailability:
- Current leader continues operating (its session is still valid locally)
- No new elections can happen
- If leader crashes during ZK outage, no failover until ZK recovers
For critical systems, I'd run ZooKeeper across multiple availability zones and monitor it closely. The current leader continues working until ZK explicitly tells it to stop (session expiration)."
Summary
DAY 5 KEY TAKEAWAYS
THE SPLIT-BRAIN PROBLEM:
• Two leaders = data corruption
• Can't rely on timeouts alone
• Need consensus for safety
FENCING TOKENS:
• Monotonically increasing per election
• Include with all leader operations
• Resources reject stale tokens
• Prevents zombie leader writes
CONSENSUS ALGORITHMS:
• Raft: Understandable, widely used
• Paxos: Classic, proven
• Zab: ZooKeeper's protocol
• All guarantee at most one leader per term
COORDINATION SERVICES:
• ZooKeeper: Mature, Zab consensus
• etcd: Modern, Raft consensus
• Both provide: leader election, locks, watches
PATTERNS:
• Lease-based: Time-limited leadership
• Quorum-based: Majority agreement
• Ephemeral nodes: Auto-cleanup on failure
• Sequential nodes: Fair ordering
PRODUCTION CHECKLIST:
• Always use fencing tokens
• Require quorum for elections
• Use monotonic clocks for leases
• Test network partition scenarios
• Monitor leader transitions
DEFAULT APPROACH:
• Use etcd or ZooKeeper (don't build your own)
• Fencing tokens for all protected resources
• 10-30 second lease duration
• Renew at 1/3 of lease duration
📚 Further Reading
Papers
- "In Search of an Understandable Consensus Algorithm" (Raft) — Ongaro & Ousterhout
- "Paxos Made Simple" — Leslie Lamport
- "ZooKeeper: Wait-free coordination" — Hunt et al.
Documentation
- ZooKeeper: https://zookeeper.apache.org/doc/
- etcd: https://etcd.io/docs/
- Raft Visualization: https://raft.github.io/
Books
- "Designing Data-Intensive Applications" by Martin Kleppmann — Chapter 8, 9
- "Database Internals" by Alex Petrov — Distributed consensus chapters
Week 5 Summary
This week covered Consistency and Coordination:
| Day | Topic | Key Concept |
|---|---|---|
| 1 | Consistency Models | Spectrum from linearizable to eventual |
| 2 | Saga Pattern | Distributed transactions with compensation |
| 3 | Workflow Orchestration | Durable execution with Temporal |
| 4 | Conflict Resolution | CRDTs and vector clocks |
| 5 | Leader Election | Fencing tokens and consensus |
Coming up in Week 6: Designing a complete Notification Platform, applying everything we've learned.
End of Day 5: Leader Election and Coordination
End of Week 5: Consistency and Coordination