Week 2 β Day 5: Distributed Cron & Job Scheduling
System Design Mastery Series
π― Week 2 Finale: Bringing It All Together
Preface: The Week 2 Journey
We've come a long way this week. Let's see where we started and where we are now:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β WEEK 2: FAILURE-FIRST DESIGN β
β β
β Day 1: Timeouts β
β βββ "Don't wait forever for slow services" β
β βββ Timeout budgets, cascading timeouts β
β β
β Day 2: Idempotency β
β βββ "Safe to retry without duplicate effects" β
β βββ Idempotency keys, request fingerprinting β
β β
β Day 3: Circuit Breakers β
β βββ "Stop calling broken services" β
β βββ Fail fast, protect the system β
β β
β Day 4: Webhooks β
β βββ "Reliable async delivery to external systems" β
β βββ At-least-once delivery, retries, DLQ β
β β
β Day 5: Distributed Cron (TODAY) β
β βββ "Run scheduled jobs exactly once across multiple servers" β
β βββ Leader election, fencing tokens, distributed coordination β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Today's challenge is the culmination of everything we've learned. Distributed cron combines:
- Timeouts (Day 1): Jobs that run too long
- Idempotency (Day 2): Jobs that might run twice
- Circuit Breakers (Day 3): External dependencies that fail
- Async Processing (Day 4): Fire-and-forget execution
And it adds a new critical concept: distributed coordination.
The Fundamental Problem
On a single server, cron is simple:
# /etc/crontab
0 * * * * /scripts/send_daily_report.sh
*/5 * * * * /scripts/cleanup_old_sessions.sh
0 0 * * * /scripts/generate_invoices.sh
The OS guarantees:
- Jobs run at the scheduled time
- Only one instance of each job runs
- Jobs survive server restarts (cron daemon restarts)
But in a distributed system:
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Server 1 β β Server 2 β β Server 3 β
β [cron] β β [cron] β β [cron] β
β β β β β β
β 10:00 AM: β β 10:00 AM: β β 10:00 AM: β
β Run job! β β Run job! β β Run job! β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β β β
ββββββββββββββββββββΌβββββββββββββββββββ
β
βΌ
JOB RUNS 3 TIMES! π₯
The distributed cron problem: How do you ensure a scheduled job runs exactly once across multiple servers, even when:
- Servers crash
- Networks partition
- Clocks drift
- Deployments happen
This is one of the hardest problems in distributed systems, and it's what we'll solve today.
Part I: Foundations
Chapter 1: Why Distributed Cron Is Hard
1.1 The Illusion of Simplicity
At first glance, the solution seems obvious:
Attempt 1: "Just use a database lock"
def run_scheduled_job(job_id):
# Try to acquire lock
if db.execute("UPDATE jobs SET locked = true WHERE id = ? AND locked = false", job_id):
try:
execute_job(job_id)
finally:
db.execute("UPDATE jobs SET locked = false WHERE id = ?", job_id)
Problem: What if the server crashes while holding the lock?
10:00:00 - Server 1 acquires lock
10:00:01 - Server 1 starts job
10:00:05 - Server 1 crashes (power failure)
Lock is still held!
10:00:06 - Server 2 tries to run job β blocked
10:01:00 - Server 3 tries to run job β blocked
...
Job never runs again until manual intervention
Attempt 2: "Add a lock timeout"
def run_scheduled_job(job_id):
lock_until = datetime.now() + timedelta(minutes=5)
if db.execute("""
UPDATE jobs
SET locked_until = ?
WHERE id = ? AND (locked_until IS NULL OR locked_until < NOW())
""", lock_until, job_id):
try:
execute_job(job_id)
finally:
db.execute("UPDATE jobs SET locked_until = NULL WHERE id = ?", job_id)
Problem: What if the job takes longer than the timeout?
10:00:00 - Server 1 acquires lock (expires at 10:05:00)
10:00:01 - Server 1 starts job (a slow report generation)
10:05:00 - Lock expires!
10:05:01 - Server 2 acquires lock, starts SAME job
10:08:00 - Server 1 finishes job
10:08:01 - Server 1 releases lock (but it's Server 2's lock now!)
10:10:00 - Server 2 finishes job
Result: Job ran twice, and lock state is corrupted
Attempt 3: "Use distributed locking with Redis"
def run_scheduled_job(job_id):
lock = redis.set(f"lock:{job_id}", server_id, nx=True, ex=300)
if lock:
try:
execute_job(job_id)
finally:
# Only release if we still own the lock
if redis.get(f"lock:{job_id}") == server_id:
redis.delete(f"lock:{job_id}")
Problem: This is still vulnerable to the "slow job" problem and also has a race condition in the finally block.
10:05:00 - Lock expires
10:05:01 - Server 2 acquires lock
10:05:02 - Server 1's GET returns server_id (stale data in flight)
10:05:03 - Server 1 DELETEs the lock (but it's Server 2's lock!)
1.2 The Core Challenges
These attempts reveal the fundamental challenges:
| Challenge | Description |
|---|---|
| Mutual Exclusion | Only one server should run a job at a time |
| Deadlock Prevention | Crashed servers shouldn't hold locks forever |
| Split Brain | Network partitions can cause multiple leaders |
| Clock Skew | Servers' clocks may disagree on "now" |
| Exactly-Once Execution | Jobs should run once, not zero, not twice |
| Failure Recovery | Crashed jobs should be detected and handled |
1.3 The Fundamental Impossibility
The FLP Impossibility Result (Fischer, Lynch, Paterson, 1985) proves that in an asynchronous distributed system with even one faulty process, it's impossible to guarantee consensus will be reached.
In practical terms: You cannot have perfect distributed cron.
But you can get very close with the right trade-offs:
Perfect: Job runs exactly once, on time, every time
(Impossible in distributed systems)
Practical: Job runs at-least-once, we use idempotency to handle duplicates
Job may be slightly delayed during failures
We detect and alert on missed executions
This is what production systems actually do.
Chapter 2: Leader Election
The foundation of distributed cron is leader election: choosing one server to be responsible for scheduling and triggering jobs.
2.1 What Is Leader Election?
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LEADER ELECTION β
β β
β Before Election: After Election: β
β β
β βββββββββββ βββββββββββ β
β βServer 1 β βServer 1 β β LEADER β
β β(wants β β(runs β (schedules jobs) β
β β to lead)β β cron) β β
β βββββββββββ βββββββββββ β
β β
β βββββββββββ βββββββββββ β
β βServer 2 β βServer 2 β β FOLLOWER β
β β(wants β β(standby)β (ready to take over) β
β β to lead)β βββββββββββ β
β βββββββββββ β
β βββββββββββ β
β βββββββββββ βServer 3 β β FOLLOWER β
β βServer 3 β β(standby)β (ready to take over) β
β β(wants β βββββββββββ β
β β to lead)β β
β βββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Only the leader runs the cron scheduler. Followers monitor the leader and are ready to take over if it fails.
2.2 The Split-Brain Problem
The most dangerous scenario in leader election:
Network Partition:
βββββββββββββββββββββββ
β PARTITION β
βββββββββββββββββββββββ
β
βββββββββββββββββββββββΌββββββββββββββββββββββ
β β β
βΌ β βΌ
βββββββββββββββββ β βββββββββββββββββ
β Servers β β β Servers β
β 1, 2, 3 β β β 4, 5 β
β β β β β
β "Server 1 is β β β "We can't see β
β still leader"β β β Server 1... β
β β β β Elect new β
β β β β leader: 4!" β
βββββββββββββββββ β βββββββββββββββββ
β β β
βΌ β βΌ
Server 1 β Server 4
runs jobs β runs jobs
β
TWO LEADERS! π₯
When the partition heals, you have two servers that both think they're the leader, both running jobs.
2.3 Fencing Tokens
The solution to split-brain is fencing tokens (also called epoch numbers or generation IDs):
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β FENCING TOKENS β
β β
β Every time a new leader is elected, increment a global counter. β
β The leader includes this token in all operations. β
β Resources reject operations with old tokens. β
β β
β Timeline: β
β β
β Token 1: Server A becomes leader β
β Server A: "Run job with token=1" β
β Database: "OK, token 1 accepted" β
β β
β Token 2: Network partition, Server B becomes leader β
β Server B: "Run job with token=2" β
β Database: "OK, token 2 accepted" β
β β
β [Partition heals] β
β Server A (stale): "Run job with token=1" β
β Database: "REJECTED - token 1 < current token 2" β
β β
β Server A realizes it's no longer leader and steps down. β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Implementation:
class FencedJobExecutor:
def __init__(self, db):
self.db = db
def execute_job(self, job_id: str, fencing_token: int, job_func):
"""Execute job only if fencing token is current."""
# Atomically check and update token
result = self.db.execute("""
UPDATE jobs
SET last_execution_token = ?,
last_execution_started = NOW()
WHERE id = ?
AND (last_execution_token IS NULL OR last_execution_token < ?)
RETURNING id
""", fencing_token, job_id, fencing_token)
if not result:
raise StaleLeaderError(f"Token {fencing_token} is stale")
try:
job_func()
# Mark completion with token
self.db.execute("""
UPDATE jobs
SET last_execution_completed = NOW()
WHERE id = ? AND last_execution_token = ?
""", job_id, fencing_token)
except Exception as e:
# Mark failure with token
self.db.execute("""
UPDATE jobs
SET last_execution_error = ?
WHERE id = ? AND last_execution_token = ?
""", str(e), job_id, fencing_token)
raise
2.4 Why ZooKeeper and etcd Exist
Implementing leader election correctly is so hard that dedicated systems exist just for this:
ZooKeeper (Apache):
- Created by Yahoo for Hadoop
- Uses ZAB (ZooKeeper Atomic Broadcast) consensus
- Provides: leader election, distributed locks, configuration management
- Used by: Kafka, HBase, Solr
etcd (CoreOS/CNCF):
- Created for Kubernetes
- Uses Raft consensus
- Provides: key-value store, leader election, watch notifications
- Used by: Kubernetes, CoreDNS, many cloud-native tools
Consul (HashiCorp):
- Uses Raft consensus
- Provides: service discovery, health checking, leader election
- Used by: Many HashiCorp tools, service meshes
Why use these instead of building your own?
| DIY Implementation | Dedicated System |
|---|---|
| Months of development | Days of integration |
| Subtle bugs (split-brain, etc.) | Battle-tested |
| No monitoring/tooling | Rich observability |
| Your team maintains it | Community maintains it |
The rule: Unless you're building a distributed database or similar infrastructure, use an existing coordination service.
2.5 Leader Election with etcd
import etcd3
from threading import Thread, Event
import time
class EtcdLeaderElection:
"""Leader election using etcd leases."""
def __init__(self, etcd_client, election_name: str, ttl: int = 10):
self.client = etcd_client
self.election_name = election_name
self.ttl = ttl
self.lease = None
self.is_leader = False
self.leader_key = f"/elections/{election_name}/leader"
self.stop_event = Event()
def campaign(self, candidate_id: str, on_elected, on_demoted):
"""Campaign to become leader."""
while not self.stop_event.is_set():
try:
# Create a lease that must be renewed
self.lease = self.client.lease(self.ttl)
# Try to become leader
success, _ = self.client.transaction(
compare=[
self.client.transactions.create(self.leader_key) == 0
],
success=[
self.client.transactions.put(
self.leader_key,
candidate_id,
lease=self.lease
)
],
failure=[]
)
if success:
self.is_leader = True
on_elected()
self._maintain_leadership(on_demoted)
else:
# Someone else is leader, watch for changes
self._wait_for_leader_change()
except Exception as e:
if self.is_leader:
self.is_leader = False
on_demoted()
time.sleep(1)
def _maintain_leadership(self, on_demoted):
"""Keep refreshing lease while leader."""
while self.is_leader and not self.stop_event.is_set():
try:
self.lease.refresh()
time.sleep(self.ttl / 3)
except:
self.is_leader = False
on_demoted()
break
def _wait_for_leader_change(self):
"""Watch for leader key deletion."""
events, cancel = self.client.watch(self.leader_key)
for event in events:
if isinstance(event, etcd3.events.DeleteEvent):
cancel()
return # Leader gone, try to become leader
if self.stop_event.is_set():
cancel()
return
def resign(self):
"""Voluntarily give up leadership."""
if self.is_leader and self.lease:
self.lease.revoke()
self.is_leader = False
def stop(self):
"""Stop the election process."""
self.stop_event.set()
self.resign()
Chapter 3: Job Execution Guarantees
3.1 The Spectrum of Guarantees
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β JOB EXECUTION GUARANTEES β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββΊ β
β AT-MOST-ONCE AT-LEAST-ONCE EXACTLY-ONCE β
β β
β "Fire and forget" "Retry until done" "Magic" (myth) β
β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββ
β β - Simple β β - Reliable β β - Requires ββ
β β - Fast β β - May duplicate β β idempotency ββ
β β - May lose jobs β β - Need idempot- β β - Complex ββ
β β β β ent jobs β β - Expensive ββ
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββ
β β
β Use for: Use for: Use for: β
β - Metrics/logs - Most jobs - Financial β
β - Cache warming - Email sending - Critical audit β
β - Best-effort - Report generation - Exactly-once β
β semantics needed β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
3.2 Implementing At-Least-Once Job Execution
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Callable, Optional
import uuid
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
DEAD = "dead" # Exceeded all retries
@dataclass
class ScheduledJob:
id: str
name: str
cron_expression: str
handler: str # e.g., "app.jobs.send_daily_report"
# Execution tracking
status: JobStatus = JobStatus.PENDING
last_run_at: Optional[datetime] = None
next_run_at: Optional[datetime] = None
# Current execution
execution_id: Optional[str] = None
execution_started_at: Optional[datetime] = None
execution_token: Optional[int] = None
# Retry configuration
max_retries: int = 3
retry_count: int = 0
timeout_seconds: int = 3600 # 1 hour default
# Failure tracking
last_error: Optional[str] = None
consecutive_failures: int = 0
class AtLeastOnceJobExecutor:
"""
Job executor with at-least-once guarantee.
Guarantees:
- Jobs will run at least once (may run multiple times on failures)
- Failed jobs will be retried
- Stuck jobs will be detected and re-run
- Jobs are idempotent (caller's responsibility)
"""
def __init__(self, db, job_registry, fencing_token: int):
self.db = db
self.job_registry = job_registry
self.fencing_token = fencing_token
def execute_job(self, job: ScheduledJob) -> bool:
"""
Execute a scheduled job with at-least-once semantics.
Returns True if job completed successfully.
"""
execution_id = str(uuid.uuid4())
# Try to claim the job
claimed = self._claim_job(job.id, execution_id)
if not claimed:
return False # Another instance is running it
try:
# Get the handler function
handler = self.job_registry.get_handler(job.handler)
if not handler:
raise ValueError(f"Unknown job handler: {job.handler}")
# Execute with timeout
self._execute_with_timeout(handler, job.timeout_seconds)
# Mark success
self._mark_completed(job.id, execution_id)
return True
except TimeoutError:
self._mark_failed(job.id, execution_id, "Job timed out")
return False
except Exception as e:
self._mark_failed(job.id, execution_id, str(e))
return False
def _claim_job(self, job_id: str, execution_id: str) -> bool:
"""
Atomically claim a job for execution.
Uses fencing token to prevent stale leaders from claiming.
"""
result = self.db.execute("""
UPDATE scheduled_jobs
SET status = 'running',
execution_id = ?,
execution_started_at = NOW(),
execution_token = ?
WHERE id = ?
AND status IN ('pending', 'failed')
AND (execution_token IS NULL OR execution_token < ?)
RETURNING id
""", execution_id, self.fencing_token, job_id, self.fencing_token)
return result is not None
def _execute_with_timeout(self, handler: Callable, timeout: int):
"""Execute handler with timeout."""
import signal
def timeout_handler(signum, frame):
raise TimeoutError("Job execution timed out")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
try:
handler()
finally:
signal.alarm(0)
def _mark_completed(self, job_id: str, execution_id: str):
"""Mark job as completed."""
self.db.execute("""
UPDATE scheduled_jobs
SET status = 'completed',
last_run_at = NOW(),
next_run_at = ?,
retry_count = 0,
consecutive_failures = 0,
last_error = NULL
WHERE id = ? AND execution_id = ?
""", self._calculate_next_run(job_id), job_id, execution_id)
def _mark_failed(self, job_id: str, execution_id: str, error: str):
"""Mark job as failed, potentially scheduling retry."""
job = self.db.get_job(job_id)
if job.retry_count < job.max_retries:
# Schedule retry
retry_delay = self._calculate_retry_delay(job.retry_count)
self.db.execute("""
UPDATE scheduled_jobs
SET status = 'pending',
next_run_at = NOW() + INTERVAL ? SECOND,
retry_count = retry_count + 1,
consecutive_failures = consecutive_failures + 1,
last_error = ?
WHERE id = ? AND execution_id = ?
""", retry_delay, error, job_id, execution_id)
else:
# Move to dead status
self.db.execute("""
UPDATE scheduled_jobs
SET status = 'dead',
last_error = ?,
consecutive_failures = consecutive_failures + 1
WHERE id = ? AND execution_id = ?
""", error, job_id, execution_id)
# Alert operations
self._alert_dead_job(job, error)
def _calculate_retry_delay(self, retry_count: int) -> int:
"""Exponential backoff for retries."""
base_delay = 60 # 1 minute
max_delay = 3600 # 1 hour
delay = base_delay * (2 ** retry_count)
return min(delay, max_delay)
3.3 Detecting and Recovering Stuck Jobs
What happens when a job starts but the server crashes mid-execution?
class StuckJobDetector:
"""
Detect and recover jobs that are stuck in 'running' state.
A job is considered stuck if:
- Status is 'running'
- Started more than timeout_seconds ago
- No heartbeat received recently
"""
def __init__(self, db, alert_service):
self.db = db
self.alert_service = alert_service
def find_stuck_jobs(self) -> list:
"""Find jobs that appear to be stuck."""
return self.db.query("""
SELECT * FROM scheduled_jobs
WHERE status = 'running'
AND execution_started_at < NOW() - INTERVAL timeout_seconds SECOND
AND (last_heartbeat_at IS NULL
OR last_heartbeat_at < NOW() - INTERVAL 60 SECOND)
""")
def recover_stuck_jobs(self):
"""Reset stuck jobs to pending for re-execution."""
stuck_jobs = self.find_stuck_jobs()
for job in stuck_jobs:
# Log the stuck job
logger.warning(
"Recovering stuck job",
job_id=job.id,
execution_id=job.execution_id,
started_at=job.execution_started_at
)
# Check if it's a chronic problem
if job.consecutive_failures >= 3:
self.alert_service.alert(
severity="high",
message=f"Job {job.name} has failed {job.consecutive_failures} times",
job_id=job.id
)
# Reset to pending (will be picked up on next schedule)
self.db.execute("""
UPDATE scheduled_jobs
SET status = 'failed',
last_error = 'Job stuck - server may have crashed',
consecutive_failures = consecutive_failures + 1
WHERE id = ? AND execution_id = ?
""", job.id, job.execution_id)
def run_detector(self, interval: int = 60):
"""Run stuck job detection periodically."""
while True:
try:
self.recover_stuck_jobs()
except Exception as e:
logger.error("Stuck job detector error", error=str(e))
time.sleep(interval)
3.4 Job Heartbeats
For long-running jobs, use heartbeats to signal progress:
class HeartbeatJobExecutor:
"""
Job executor with heartbeat support for long-running jobs.
"""
def __init__(self, db, heartbeat_interval: int = 30):
self.db = db
self.heartbeat_interval = heartbeat_interval
self.heartbeat_thread = None
self.stop_heartbeat = False
def execute_with_heartbeat(self, job: ScheduledJob, handler: Callable):
"""Execute job while sending heartbeats."""
self.stop_heartbeat = False
# Start heartbeat thread
self.heartbeat_thread = Thread(
target=self._heartbeat_loop,
args=(job.id, job.execution_id)
)
self.heartbeat_thread.start()
try:
handler()
finally:
self.stop_heartbeat = True
self.heartbeat_thread.join()
def _heartbeat_loop(self, job_id: str, execution_id: str):
"""Send periodic heartbeats."""
while not self.stop_heartbeat:
try:
self.db.execute("""
UPDATE scheduled_jobs
SET last_heartbeat_at = NOW()
WHERE id = ? AND execution_id = ?
""", job_id, execution_id)
except:
pass # Best effort
time.sleep(self.heartbeat_interval)
Chapter 4: The Complete Distributed Cron System
4.1 Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DISTRIBUTED CRON ARCHITECTURE β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β COORDINATION LAYER β β
β β (etcd / ZooKeeper) β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β Leader β β Config β β Fencing β β β
β β β Election β β Store β β Tokens β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SCHEDULER (Leader Only) β β
β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β Cron β β Job β β Trigger β β β
β β β Parser β β Registry β β Generator β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β JOB QUEUE β β
β β (Redis / RabbitMQ / SQS) β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββΌββββββββββββββββ β
β βΌ βΌ βΌ β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
β β Worker 1 β β Worker 2 β β Worker 3 β β
β β β β β β β β
β β βββββββββββββββ β β βββββββββββββββ β β βββββββββββββββ β β
β β β Executor β β β β Executor β β β β Executor β β β
β β βββββββββββββββ β β βββββββββββββββ β β βββββββββββββββ β β
β β βββββββββββββββ β β βββββββββββββββ β β βββββββββββββββ β β
β β β Heartbeat β β β β Heartbeat β β β β Heartbeat β β β
β β βββββββββββββββ β β βββββββββββββββ β β βββββββββββββββ β β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PERSISTENCE LAYER β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β Job β β Execution β β Audit β β β
β β β Store β β Logs β β Trail β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
4.2 Complete Implementation
"""
Distributed Cron System
Combines: Leader Election + Job Scheduling + At-Least-Once Execution
"""
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Callable, Optional
from dataclasses import dataclass, field
from croniter import croniter
import structlog
logger = structlog.get_logger()
# =============================================================================
# Configuration
# =============================================================================
@dataclass
class CronConfig:
# Leader election
etcd_endpoints: List[str] = field(default_factory=lambda: ["localhost:2379"])
election_ttl: int = 10 # seconds
# Scheduling
tick_interval: int = 1 # seconds
lookahead_seconds: int = 60 # How far ahead to schedule
# Execution
default_timeout: int = 3600 # 1 hour
max_retries: int = 3
stuck_job_threshold: int = 300 # 5 minutes without heartbeat
# Workers
num_workers: int = 4
heartbeat_interval: int = 30
# =============================================================================
# Job Definitions
# =============================================================================
@dataclass
class JobDefinition:
name: str
cron: str
handler: str
timeout: int = 3600
max_retries: int = 3
enabled: bool = True
metadata: Dict = field(default_factory=dict)
@dataclass
class JobExecution:
id: str
job_name: str
scheduled_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
status: str = "pending" # pending, running, completed, failed, dead
fencing_token: Optional[int] = None
worker_id: Optional[str] = None
error: Optional[str] = None
retry_count: int = 0
# =============================================================================
# Distributed Cron Scheduler
# =============================================================================
class DistributedCronScheduler:
"""
Distributed cron scheduler with leader election.
Only the leader schedules jobs. Workers execute them.
"""
def __init__(
self,
config: CronConfig,
etcd_client,
job_store,
job_queue,
job_registry: Dict[str, Callable]
):
self.config = config
self.etcd = etcd_client
self.job_store = job_store
self.job_queue = job_queue
self.job_registry = job_registry
self.is_leader = False
self.fencing_token = 0
self.node_id = str(uuid.uuid4())[:8]
self.leader_election = EtcdLeaderElection(
etcd_client,
"cron-scheduler",
ttl=config.election_ttl
)
async def start(self):
"""Start the distributed cron system."""
logger.info("Starting distributed cron", node_id=self.node_id)
# Start leader election in background
asyncio.create_task(self._run_election())
# Start workers
for i in range(self.config.num_workers):
asyncio.create_task(self._run_worker(f"{self.node_id}-worker-{i}"))
# Start stuck job detector
asyncio.create_task(self._run_stuck_job_detector())
# Keep running
while True:
await asyncio.sleep(1)
async def _run_election(self):
"""Run leader election."""
await self.leader_election.campaign(
candidate_id=self.node_id,
on_elected=self._on_elected,
on_demoted=self._on_demoted
)
def _on_elected(self):
"""Called when this node becomes leader."""
self.is_leader = True
self.fencing_token = int(time.time() * 1000) # Millisecond timestamp as token
logger.info(
"Became leader",
node_id=self.node_id,
fencing_token=self.fencing_token
)
# Start scheduler loop
asyncio.create_task(self._scheduler_loop())
def _on_demoted(self):
"""Called when this node loses leadership."""
self.is_leader = False
logger.info("Lost leadership", node_id=self.node_id)
async def _scheduler_loop(self):
"""Main scheduler loop (runs only on leader)."""
logger.info("Starting scheduler loop", node_id=self.node_id)
while self.is_leader:
try:
await self._schedule_due_jobs()
except Exception as e:
logger.error("Scheduler error", error=str(e))
await asyncio.sleep(self.config.tick_interval)
logger.info("Scheduler loop stopped", node_id=self.node_id)
async def _schedule_due_jobs(self):
"""Find and queue jobs that are due to run."""
now = datetime.utcnow()
# Get all job definitions
jobs = await self.job_store.get_all_jobs()
for job in jobs:
if not job.enabled:
continue
# Check if job is due
cron = croniter(job.cron, now - timedelta(seconds=self.config.lookahead_seconds))
next_run = cron.get_next(datetime)
if next_run <= now:
# Check if already scheduled
existing = await self.job_store.get_pending_execution(
job.name,
next_run
)
if not existing:
# Create execution record
execution = JobExecution(
id=str(uuid.uuid4()),
job_name=job.name,
scheduled_at=next_run,
fencing_token=self.fencing_token
)
await self.job_store.create_execution(execution)
# Queue for workers
await self.job_queue.enqueue({
'execution_id': execution.id,
'job_name': job.name,
'handler': job.handler,
'timeout': job.timeout,
'fencing_token': self.fencing_token
})
logger.info(
"Scheduled job",
job_name=job.name,
execution_id=execution.id,
scheduled_at=next_run
)
async def _run_worker(self, worker_id: str):
"""Worker loop that executes jobs."""
logger.info("Starting worker", worker_id=worker_id)
while True:
try:
# Get job from queue
task = await self.job_queue.dequeue(timeout=5)
if task:
await self._execute_job(worker_id, task)
except Exception as e:
logger.error("Worker error", worker_id=worker_id, error=str(e))
await asyncio.sleep(1)
async def _execute_job(self, worker_id: str, task: dict):
"""Execute a job task."""
execution_id = task['execution_id']
job_name = task['job_name']
handler_name = task['handler']
timeout = task['timeout']
fencing_token = task['fencing_token']
logger.info(
"Executing job",
worker_id=worker_id,
execution_id=execution_id,
job_name=job_name
)
# Claim the execution
claimed = await self.job_store.claim_execution(
execution_id,
worker_id,
fencing_token
)
if not claimed:
logger.warning(
"Could not claim execution",
execution_id=execution_id,
reason="Already claimed or stale token"
)
return
# Get handler
handler = self.job_registry.get(handler_name)
if not handler:
await self.job_store.fail_execution(
execution_id,
f"Unknown handler: {handler_name}"
)
return
# Start heartbeat
heartbeat_task = asyncio.create_task(
self._heartbeat_loop(execution_id, worker_id)
)
try:
# Execute with timeout
await asyncio.wait_for(
handler(),
timeout=timeout
)
# Mark success
await self.job_store.complete_execution(execution_id)
logger.info(
"Job completed",
execution_id=execution_id,
job_name=job_name
)
except asyncio.TimeoutError:
await self.job_store.fail_execution(
execution_id,
"Job timed out"
)
logger.error(
"Job timed out",
execution_id=execution_id,
job_name=job_name,
timeout=timeout
)
except Exception as e:
await self.job_store.fail_execution(
execution_id,
str(e)
)
logger.error(
"Job failed",
execution_id=execution_id,
job_name=job_name,
error=str(e)
)
finally:
heartbeat_task.cancel()
async def _heartbeat_loop(self, execution_id: str, worker_id: str):
"""Send heartbeats while job is running."""
while True:
try:
await self.job_store.update_heartbeat(execution_id, worker_id)
await asyncio.sleep(self.config.heartbeat_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error("Heartbeat error", error=str(e))
async def _run_stuck_job_detector(self):
"""Detect and recover stuck jobs."""
while True:
try:
stuck_jobs = await self.job_store.find_stuck_executions(
threshold_seconds=self.config.stuck_job_threshold
)
for execution in stuck_jobs:
logger.warning(
"Found stuck job",
execution_id=execution.id,
job_name=execution.job_name,
started_at=execution.started_at
)
# Reset to failed for retry
await self.job_store.fail_execution(
execution.id,
"Job stuck - worker may have crashed"
)
# Re-queue if retries remaining
job = await self.job_store.get_job(execution.job_name)
if execution.retry_count < job.max_retries:
await self.job_queue.enqueue({
'execution_id': execution.id,
'job_name': execution.job_name,
'handler': job.handler,
'timeout': job.timeout,
'fencing_token': self.fencing_token,
'retry_count': execution.retry_count + 1
})
except Exception as e:
logger.error("Stuck job detector error", error=str(e))
await asyncio.sleep(60)
Part II: The Design Challenge
Chapter 5: "The Leader Dies Mid-Job"
5.1 The Scenario
This is the classic distributed systems interview question:
Timeline:
10:00:00 - Leader (Server A) triggers daily report job
10:00:01 - Worker starts generating report
10:00:30 - Leader (Server A) crashes (power failure)
10:00:35 - etcd lease expires
10:00:36 - Server B becomes new leader
Questions:
1. What happens to the in-progress job?
2. Will the job run twice?
3. How do we ensure exactly-once semantics?
4. What if the job was half-finished (e.g., sent some emails)?
5.2 Analyzing the Failure Modes
Mode 1: Leader crash, job still running on worker
Leader A crashes, but worker is on different server.
Worker continues running the job.
New leader B doesn't know job is running.
New leader B might schedule the same job again!
Solution: Job execution is tracked in persistent storage, not just in leader's memory.
# Before scheduling, check if already running
existing = await self.job_store.get_running_execution(job_name)
if existing:
if existing.is_stale(): # No heartbeat for 5+ minutes
# Assume dead, reschedule
pass
else:
# Still running, skip
return
Mode 2: Leader crash, job on same server
Leader A is also running the job.
Both crash together.
Job is partially complete (e.g., 50 emails sent).
Solution: Make jobs idempotent and resumable.
class IdempotentEmailJob:
"""Send daily email report - idempotent implementation."""
async def run(self, execution_id: str):
# Get all users who should receive email
users = await self.db.get_email_recipients()
for user in users:
# Check if already sent (idempotency)
already_sent = await self.db.check_email_sent(
execution_id=execution_id,
user_id=user.id
)
if already_sent:
continue
# Send email
await self.email_service.send(user.email, self.report)
# Mark as sent (for idempotency)
await self.db.mark_email_sent(
execution_id=execution_id,
user_id=user.id
)
Mode 3: Split-brain during failover
Network partition:
Partition A: Old leader + some workers
Partition B: New leader + other workers
Both leaders think they're in charge!
Solution: Fencing tokens.
# Worker checks fencing token before executing
async def execute_job(self, task):
current_token = await self.get_current_fencing_token()
if task['fencing_token'] < current_token:
# Stale task from old leader
logger.warning("Rejecting stale task",
task_token=task['fencing_token'],
current_token=current_token)
return
# Safe to execute
await self._do_execute(task)
5.3 Complete Solution
class ResilientJobExecution:
"""
Job execution that handles leader failure gracefully.
"""
def __init__(self, job_store, fencing_store):
self.job_store = job_store
self.fencing_store = fencing_store
async def execute_with_resilience(
self,
job: JobDefinition,
execution: JobExecution,
handler: Callable
):
"""
Execute job with full resilience:
- Fencing token validation
- Idempotency support
- Progress tracking
- Graceful failure handling
"""
# Step 1: Validate fencing token
current_token = await self.fencing_store.get_current_token()
if execution.fencing_token < current_token:
raise StaleExecutionError(
f"Execution {execution.id} has stale token"
)
# Step 2: Create execution context for idempotency
context = ExecutionContext(
execution_id=execution.id,
job_name=job.name,
checkpoint_store=self.job_store
)
try:
# Step 3: Execute with progress tracking
await handler(context)
# Step 4: Mark complete
await self.job_store.complete_execution(
execution.id,
fencing_token=execution.fencing_token
)
except Exception as e:
# Step 5: Handle failure
await self._handle_failure(execution, e)
raise
async def _handle_failure(self, execution: JobExecution, error: Exception):
"""Handle job failure."""
# Update execution record
await self.job_store.fail_execution(
execution.id,
str(error),
fencing_token=execution.fencing_token
)
# Check if retryable
job = await self.job_store.get_job(execution.job_name)
if execution.retry_count < job.max_retries:
# Schedule retry
await self.job_store.schedule_retry(
execution.id,
delay_seconds=self._calculate_backoff(execution.retry_count)
)
else:
# Move to dead letter
await self.job_store.mark_dead(execution.id)
await self._alert_dead_job(execution, error)
class ExecutionContext:
"""
Context for idempotent job execution.
Provides checkpoint/resume capabilities.
"""
def __init__(self, execution_id: str, job_name: str, checkpoint_store):
self.execution_id = execution_id
self.job_name = job_name
self.checkpoint_store = checkpoint_store
async def checkpoint(self, key: str, data: dict):
"""Save progress checkpoint."""
await self.checkpoint_store.save_checkpoint(
self.execution_id,
key,
data
)
async def get_checkpoint(self, key: str) -> Optional[dict]:
"""Get saved checkpoint."""
return await self.checkpoint_store.get_checkpoint(
self.execution_id,
key
)
async def mark_step_complete(self, step: str):
"""Mark a step as complete (for idempotency)."""
await self.checkpoint_store.mark_step_complete(
self.execution_id,
step
)
async def is_step_complete(self, step: str) -> bool:
"""Check if step already completed."""
return await self.checkpoint_store.is_step_complete(
self.execution_id,
step
)
5.4 Example: Resumable Invoice Generation
async def generate_monthly_invoices(ctx: ExecutionContext):
"""
Generate invoices for all customers.
Resumable and idempotent.
"""
# Get last checkpoint (if resuming)
checkpoint = await ctx.get_checkpoint("progress")
last_customer_id = checkpoint.get("last_customer_id") if checkpoint else None
# Get customers to process
customers = await db.get_customers_for_invoicing(after_id=last_customer_id)
for customer in customers:
# Check if already processed (idempotency)
step_key = f"customer_{customer.id}"
if await ctx.is_step_complete(step_key):
continue
# Generate invoice
invoice = await generate_invoice(customer)
# Save invoice
await db.save_invoice(invoice)
# Send email
await send_invoice_email(customer, invoice)
# Mark step complete
await ctx.mark_step_complete(step_key)
# Checkpoint progress
await ctx.checkpoint("progress", {"last_customer_id": customer.id})
logger.info("Invoice generation complete",
execution_id=ctx.execution_id,
customers_processed=len(customers))
Chapter 6: Comparing Real-World Solutions
6.1 Celery Beat
What it is: Periodic task scheduler for Celery (Python).
How it works:
# celeryconfig.py
from celery.schedules import crontab
beat_schedule = {
'send-daily-report': {
'task': 'tasks.send_daily_report',
'schedule': crontab(hour=9, minute=0),
},
}
Single-node problem: By default, only ONE Celery Beat process should run.
Solution: celery-beat-redis-scheduler
# Use Redis for distributed locking
from redbeat import RedBeatSchedulerEntry
# Only one beat will schedule tasks
Pros:
- Simple setup
- Integrates with Celery ecosystem
- Many community extensions
Cons:
- Not truly distributed (single beat process)
- Requires external solution for HA
- No built-in exactly-once guarantees
6.2 Kubernetes CronJobs
What it is: Kubernetes-native scheduled jobs.
apiVersion: batch/v1
kind: CronJob
metadata:
name: daily-report
spec:
schedule: "0 9 * * *"
concurrencyPolicy: Forbid # Don't run if previous still running
jobTemplate:
spec:
template:
spec:
containers:
- name: report
image: my-report-generator
restartPolicy: OnFailure
Key features:
concurrencyPolicy: Forbid- prevents overlapping runsstartingDeadlineSeconds- miss window for scheduling- Automatic retries with
backoffLimit
Pros:
- Built into Kubernetes
- Handles pod failures
- Good observability
Cons:
- At-least-once (not exactly-once)
- 1-minute minimum granularity
- Can miss jobs during cluster issues
6.3 Temporal (formerly Cadence)
What it is: Workflow orchestration platform with built-in durability.
from temporalio import workflow
from datetime import timedelta
@workflow.defn
class DailyReportWorkflow:
@workflow.run
async def run(self):
# This is durable - survives crashes
await workflow.execute_activity(
generate_report,
start_to_close_timeout=timedelta(hours=1)
)
await workflow.execute_activity(
send_emails,
start_to_close_timeout=timedelta(minutes=30)
)
# Schedule
schedule = Schedule(
action=ScheduleActionStartWorkflow(
DailyReportWorkflow.run,
),
spec=ScheduleSpec(cron_expressions=["0 9 * * *"]),
)
Key features:
- Durable execution: Workflow state survives crashes
- Exactly-once semantics: Built into the platform
- Automatic retries: With exponential backoff
- Visibility: Full execution history
How it achieves exactly-once:
- Event sourcing for workflow state
- Deterministic replay
- Automatic activity deduplication
Pros:
- True exactly-once semantics
- Handles long-running jobs elegantly
- Great debugging/visibility
Cons:
- Additional infrastructure
- Learning curve
- Overkill for simple cron jobs
6.4 Comparison Table
| Feature | Celery Beat | K8s CronJobs | Temporal |
|---|---|---|---|
| Deployment | Simple | Kubernetes required | Dedicated cluster |
| Exactly-once | No | No | Yes |
| Crash recovery | Manual | Automatic | Automatic |
| Long-running jobs | Timeout issues | Pod limits | Native support |
| Visibility | Basic | kubectl | Excellent UI |
| Complexity | Low | Medium | High |
| Best for | Simple Python apps | Cloud-native apps | Critical workflows |
6.5 When to Use What
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DECISION FLOWCHART β
β β
β Need scheduled jobs? β
β β β
β βΌ β
β Simple cron, single server? ββββYESβββββΆ Use Linux cron β
β β β
β NO β
β βΌ β
β Using Kubernetes? ββββYESβββββΆ Start with K8s CronJobs β
β β β β
β β ββββΆ Need exactly-once? βββΆ Add β
β β idempotency in your job β
β NO β
β βΌ β
β Python/Celery app? ββββYESβββββΆ Use Celery Beat + Redis lock β
β β β
β NO β
β βΌ β
β Critical financial/ β
β compliance jobs? ββββYESβββββΆ Use Temporal or similar β
β β β
β NO β
β βΌ β
β Build custom solution using β
β this day's patterns β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Part III: Week 2 Synthesis
Chapter 7: Bringing It All Together
7.1 The Failure-First Mindset
This week, we've internalized a crucial mindset shift:
Before Week 2:
"How do I make this work?"
β Design for the happy path
β Handle errors as afterthought
β Hope failures are rare
After Week 2:
"How will this fail?"
β Design for failures first
β Make failures visible and recoverable
β Assume failures are constant
7.2 The Patterns We've Learned
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β WEEK 2: COMPLETE PATTERN CATALOG β
β β
β DAY 1: TIMEOUTS β
β βββ Timeout budgets (total time allocation) β
β βββ Cascading timeouts (shrinking budgets through call chain) β
β βββ Deadline propagation (pass deadline, not duration) β
β βββ Connection vs read vs write timeouts β
β β
β DAY 2: IDEMPOTENCY β
β βββ Idempotency keys (client-generated unique IDs) β
β βββ Request fingerprinting (hash of request content) β
β βββ Idempotency store (remember processed requests) β
β βββ TTL management (how long to remember) β
β β
β DAY 3: CIRCUIT BREAKERS β
β βββ Three states (Closed β Open β Half-Open) β
β βββ Failure rate vs count thresholds β
β βββ Gradual recovery (prevent thundering herd) β
β βββ Fallback strategies (graceful degradation) β
β β
β DAY 4: WEBHOOKS β
β βββ At-least-once delivery (retry until acknowledged) β
β βββ Exponential backoff with jitter β
β βββ Dead letter queues (for failed deliveries) β
β βββ Signature verification (security) β
β β
β DAY 5: DISTRIBUTED CRON β
β βββ Leader election (single scheduler) β
β βββ Fencing tokens (prevent split-brain) β
β βββ Job heartbeats (detect stuck jobs) β
β βββ Checkpoint/resume (idempotent execution) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
7.3 How They Connect
βββββββββββββββββββ
β DISTRIBUTED β
β CRON β
β (Day 5) β
ββββββββββ¬βββββββββ
β
Uses leader election + fencing tokens
β
βΌ
ββββββββββββββββββββββ΄βββββββββββββββββββββ
β β
βΌ βΌ
βββββββββββββββββ βββββββββββββββββ
β TIMEOUTS β β IDEMPOTENCY β
β (Day 1) β β (Day 2) β
β β β β
β Job timeouts, β β Jobs must be β
β execution β β idempotent β
β limits β β for retries β
βββββββββ¬ββββββββ βββββββββ¬ββββββββ
β β
β βββββββββββββββββ β
β β CIRCUIT β β
ββββββββββΆβ BREAKERS βββββββββββββββββ
β (Day 3) β
β β
β Jobs calling β
β external APIs β
βββββββββ¬ββββββββ
β
βΌ
βββββββββββββββββ
β WEBHOOKS β
β (Day 4) β
β β
β Jobs trigger β
β async events β
βββββββββββββββββ
7.4 A Complete Example: Order Processing Job
Let's see all patterns working together:
class OrderProcessingJob:
"""
Daily job to process pending orders.
Demonstrates all Week 2 patterns.
"""
def __init__(
self,
db,
payment_client,
inventory_client,
webhook_service,
circuit_breaker,
idempotency_store
):
self.db = db
self.payment = payment_client
self.inventory = inventory_client
self.webhooks = webhook_service
self.circuit = circuit_breaker
self.idempotency = idempotency_store
async def run(self, ctx: ExecutionContext):
"""
Process all pending orders.
Patterns used:
- Idempotency: Skip already-processed orders
- Timeouts: Limit time per order
- Circuit breaker: Protect external calls
- Webhooks: Notify external systems
- Checkpointing: Resume after crash
"""
# Get checkpoint (for resume after crash) [Day 5]
checkpoint = await ctx.get_checkpoint("progress")
last_order_id = checkpoint.get("last_order_id") if checkpoint else None
# Get pending orders
orders = await self.db.get_pending_orders(after_id=last_order_id)
for order in orders:
# Idempotency check [Day 2]
idem_key = f"process_order_{order.id}"
if await self.idempotency.exists(idem_key):
continue
try:
# Process with timeout [Day 1]
await asyncio.wait_for(
self._process_order(order, ctx),
timeout=30
)
# Mark idempotent [Day 2]
await self.idempotency.set(idem_key, ttl=86400)
except asyncio.TimeoutError:
logger.error("Order processing timeout", order_id=order.id)
await ctx.checkpoint("progress", {"last_order_id": order.id})
raise
except CircuitOpenError:
# Circuit open, stop processing [Day 3]
logger.warning("Circuit open, pausing job")
await ctx.checkpoint("progress", {"last_order_id": order.id})
raise
# Checkpoint progress [Day 5]
await ctx.checkpoint("progress", {"last_order_id": order.id})
async def _process_order(self, order, ctx: ExecutionContext):
"""Process single order with all patterns."""
# Step 1: Check inventory (with circuit breaker) [Day 3]
try:
available = await self.circuit.call(
lambda: self.inventory.check(order.items)
)
except CircuitOpenError:
raise # Propagate to caller
if not available:
await self.db.mark_order_backordered(order.id)
return
# Step 2: Charge payment (with timeout) [Day 1]
try:
payment_result = await asyncio.wait_for(
self.payment.charge(
order.user_id,
order.total,
idempotency_key=f"order_{order.id}" # Day 2
),
timeout=10
)
except asyncio.TimeoutError:
logger.error("Payment timeout", order_id=order.id)
raise
# Step 3: Reserve inventory
await self.inventory.reserve(order.items, order.id)
# Step 4: Update order status
await self.db.mark_order_processing(order.id, payment_result.id)
# Step 5: Send webhook (async) [Day 4]
await self.webhooks.emit(
event_type="order.processing",
data={
"order_id": order.id,
"status": "processing",
"payment_id": payment_result.id
}
)
Part IV: Interview Questions
Chapter 8: Week 2 Interview Mastery
8.1 Distributed Cron Questions
Q: "How would you design a job scheduler that ensures jobs run exactly once across multiple servers?"
Strong Answer:
"This is a distributed coordination problem. Here's my approach:
Leader Election: First, I need a single scheduler to prevent duplicate scheduling. I'd use etcd or ZooKeeper for leader election. The leader has a lease that must be renewed; if it crashes, the lease expires and another node becomes leader.
Fencing Tokens: To prevent split-brain, each leader gets a monotonically increasing fencing token. Workers validate this token before executing. Stale tokens from old leaders are rejected.
At-Least-Once Execution: True exactly-once is impossible, so I implement at-least-once with idempotent jobs. Each execution has a unique ID. Jobs use this ID to check if work was already done.
Stuck Job Detection: Workers send heartbeats during execution. A background process finds jobs that started but haven't heartbeated recently β these are stuck. We reset them to 'failed' for retry.
Checkpointing: Long-running jobs checkpoint progress. If a job crashes at 50%, it resumes from the checkpoint rather than starting over.
Connection to Week 2: This combines everything from this week:
- Day 1: Job timeouts prevent infinite execution
- Day 2: Idempotent jobs handle retries safely
- Day 3: Circuit breakers for jobs calling external services
- Day 4: Jobs triggering webhooks for notifications
- Day 5: Leader election and fencing tokens"
8.2 "The Leader Dies" Question
Q: "The leader crashes while a job is running. What happens?"
Strong Answer:
"Three scenarios to consider:
Scenario 1: Job on different worker The job continues running. New leader checks job store before scheduling and sees execution is in-progress with recent heartbeat. It skips scheduling until that execution completes or times out.
Scenario 2: Job on same server as leader Both crash. The execution record shows 'running' but heartbeat stops. Stuck job detector (on new leader) finds it after threshold, marks it failed, and reschedules.
Scenario 3: Job was just scheduled The job message is in the queue. Workers have the fencing token from old leader. New leader has new token. Workers should accept the job because it has the token that was valid when scheduled.
Key protections:
- Persistent execution records β not just in-memory
- Heartbeats β distinguish 'running' from 'stuck'
- Fencing tokens β prevent stale leaders from interfering
- Idempotent jobs β safe to re-run if uncertain
I'd also have comprehensive alerting: alert on stuck jobs, alert on jobs exceeding retry limits, alert on leader failovers."
8.3 Week 2 Synthesis Question
Q: "Walk me through how you'd make a payment processing job reliable."
Strong Answer:
"I'll use all of Week 2's patterns:
Scheduling (Day 5): Job scheduled via distributed cron with leader election. Only one instance schedules. Fencing tokens prevent split-brain.
Execution Setup:
- Execution ID: unique per run
- Idempotency key: derived from job + date
- Timeout: 30 minutes max
- Heartbeat: every 30 seconds
Processing Loop: For each pending payment:
- Idempotency check (Day 2): Skip if
processed:{payment_id}exists - Call payment provider with circuit breaker (Day 3): If circuit open, pause job
- Timeout the call (Day 1): Max 10 seconds per payment
- Mark processed (Day 2): Set
processed:{payment_id}with TTL - Emit webhook (Day 4):
payment.completedevent - Checkpoint (Day 5): Save
last_payment_idfor resume
Failure Handling:
- Timeout β Log, checkpoint, continue
- Circuit open β Checkpoint, pause, retry later
- Crash β Resume from checkpoint, idempotency prevents duplicates
Monitoring:
- Track: success rate, latency, circuit state
- Alert: high failure rate, stuck jobs, circuit open
- Dashboard: pending payments, processing rate, DLQ depth"
Chapter 9: Week 2 Checklist
Concepts You Should Master
- Explain why timeouts are essential in distributed systems
- Implement timeout budgets that propagate through service calls
- Design idempotency key strategies for different use cases
- Explain the three states of a circuit breaker
- Choose between count-based and rate-based circuit breakers
- Design at-least-once webhook delivery with retries
- Implement HMAC signature verification
- Explain why exactly-once is impossible and what to do instead
- Implement leader election using etcd or ZooKeeper
- Explain fencing tokens and their role in preventing split-brain
- Design a distributed job scheduler
- Handle "leader dies mid-job" scenario
Implementations You Should Be Able to Code
- Timeout budget wrapper
- Idempotency store with Redis
- Circuit breaker with three states
- Webhook signature generation and verification
- Retry scheduler with exponential backoff
- Leader election client
- Job executor with heartbeats
- Stuck job detector
Trade-offs You Should Articulate
| Decision | Trade-off |
|---|---|
| Timeout duration | Too short = false failures; Too long = resource waste |
| Idempotency TTL | Too short = duplicates; Too long = storage cost |
| Circuit threshold | Too low = false opens; Too high = slow detection |
| Retry count | Too few = lost events; Too many = delayed failures |
| Leader lease TTL | Too short = flapping; Too long = slow failover |
| Heartbeat interval | Too frequent = overhead; Too infrequent = slow detection |
Part V: Resources and Further Learning
Books
-
"Designing Data-Intensive Applications" by Martin Kleppmann
- Chapter 8: The Trouble with Distributed Systems
- Chapter 9: Consistency and Consensus
- Essential reading for understanding distributed coordination
-
"Database Internals" by Alex Petrov
- Deep dive into consensus algorithms
- Leader election implementations
-
"Site Reliability Engineering" by Google
- Chapter 22: Addressing Cascading Failures
- Chapter 23: Managing Critical State
Papers
-
"The Raft Consensus Algorithm" (Ongaro & Ousterhout)
- https://raft.github.io/raft.pdf
- Understandable consensus
-
"ZooKeeper: Wait-free coordination"
-
"FLP Impossibility Result"
- Why consensus is fundamentally hard
Tools and Libraries
- etcd: https://etcd.io/ - Distributed key-value store
- ZooKeeper: https://zookeeper.apache.org/ - Coordination service
- Temporal: https://temporal.io/ - Workflow orchestration
- Celery: https://docs.celeryproject.org/ - Task queue for Python
- APScheduler: https://apscheduler.readthedocs.io/ - Python job scheduling
Online Resources
- Jepsen: https://jepsen.io/ - Distributed systems testing
- Fly.io Blog: Excellent distributed systems articles
- Cloudflare Blog: Real-world distributed systems stories
Exercises
Exercise 1: Implement Leader Election
Build a leader election system using Redis:
- Use SETNX for atomic leader claim
- Implement lease renewal
- Handle leader failure detection
- Add fencing tokens
Exercise 2: Build Distributed Cron
Create a complete distributed cron system:
- Leader election for scheduler
- Job queue for workers
- At-least-once execution
- Stuck job detection
- Comprehensive logging
Exercise 3: Chaos Testing
Write chaos tests for your distributed cron:
- Kill leader mid-job
- Network partition simulation
- Clock skew injection
- Verify no duplicate executions
Week 2 Conclusion
What We've Accomplished
This week transformed how you think about distributed systems:
Week 1: Foundations of Scale
"How do I handle more traffic?"
Week 2: Failure-First Design
"How do I handle failures gracefully?"
You now have a complete toolkit for building reliable systems:
| Day | Pattern | Superpower |
|---|---|---|
| 1 | Timeouts | Never wait forever |
| 2 | Idempotency | Safe to retry anything |
| 3 | Circuit Breakers | Fail fast, protect the system |
| 4 | Webhooks | Reliable async communication |
| 5 | Distributed Cron | Exactly-once job execution |
Looking Ahead
Week 3 will build on this foundation as we dive into Building Blocks of Distributed Systems:
- Consistent hashing
- Replication strategies
- Distributed transactions
- CAP theorem in practice
The failure-first mindset you've developed this week will be essential as we tackle these more complex topics.
Congratulations on completing Week 2! π
End of Week 2 β Day 5: Distributed Cron
You've completed the "Failure-First Design" week. Next up: Week 3 β Building Blocks of Distributed Systems.