Himanshu Kukreja
0%
LearnSystem DesignWeek 2Distributed Cron
Day 05

Week 2 β€” Day 5: Distributed Cron & Job Scheduling

System Design Mastery Series

🎯 Week 2 Finale: Bringing It All Together


Preface: The Week 2 Journey

We've come a long way this week. Let's see where we started and where we are now:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                         WEEK 2: FAILURE-FIRST DESIGN                    β”‚
β”‚                                                                          β”‚
β”‚  Day 1: Timeouts                                                        β”‚
β”‚  └── "Don't wait forever for slow services"                             β”‚
β”‚  └── Timeout budgets, cascading timeouts                                β”‚
β”‚                                                                          β”‚
β”‚  Day 2: Idempotency                                                     β”‚
β”‚  └── "Safe to retry without duplicate effects"                          β”‚
β”‚  └── Idempotency keys, request fingerprinting                           β”‚
β”‚                                                                          β”‚
β”‚  Day 3: Circuit Breakers                                                β”‚
β”‚  └── "Stop calling broken services"                                     β”‚
β”‚  └── Fail fast, protect the system                                      β”‚
β”‚                                                                          β”‚
β”‚  Day 4: Webhooks                                                        β”‚
β”‚  └── "Reliable async delivery to external systems"                      β”‚
β”‚  └── At-least-once delivery, retries, DLQ                               β”‚
β”‚                                                                          β”‚
β”‚  Day 5: Distributed Cron (TODAY)                                        β”‚
β”‚  └── "Run scheduled jobs exactly once across multiple servers"          β”‚
β”‚  └── Leader election, fencing tokens, distributed coordination          β”‚
β”‚                                                                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Today's challenge is the culmination of everything we've learned. Distributed cron combines:

  • Timeouts (Day 1): Jobs that run too long
  • Idempotency (Day 2): Jobs that might run twice
  • Circuit Breakers (Day 3): External dependencies that fail
  • Async Processing (Day 4): Fire-and-forget execution

And it adds a new critical concept: distributed coordination.

The Fundamental Problem

On a single server, cron is simple:

# /etc/crontab
0 * * * * /scripts/send_daily_report.sh
*/5 * * * * /scripts/cleanup_old_sessions.sh
0 0 * * * /scripts/generate_invoices.sh

The OS guarantees:

  • Jobs run at the scheduled time
  • Only one instance of each job runs
  • Jobs survive server restarts (cron daemon restarts)

But in a distributed system:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Server 1   β”‚     β”‚  Server 2   β”‚     β”‚  Server 3   β”‚
β”‚  [cron]     β”‚     β”‚  [cron]     β”‚     β”‚  [cron]     β”‚
β”‚             β”‚     β”‚             β”‚     β”‚             β”‚
β”‚  10:00 AM:  β”‚     β”‚  10:00 AM:  β”‚     β”‚  10:00 AM:  β”‚
β”‚  Run job!   β”‚     β”‚  Run job!   β”‚     β”‚  Run job!   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
        β”‚                  β”‚                  β”‚
        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                           β”‚
                           β–Ό
              JOB RUNS 3 TIMES! πŸ’₯

The distributed cron problem: How do you ensure a scheduled job runs exactly once across multiple servers, even when:

  • Servers crash
  • Networks partition
  • Clocks drift
  • Deployments happen

This is one of the hardest problems in distributed systems, and it's what we'll solve today.


Part I: Foundations

Chapter 1: Why Distributed Cron Is Hard

1.1 The Illusion of Simplicity

At first glance, the solution seems obvious:

Attempt 1: "Just use a database lock"

def run_scheduled_job(job_id):
    # Try to acquire lock
    if db.execute("UPDATE jobs SET locked = true WHERE id = ? AND locked = false", job_id):
        try:
            execute_job(job_id)
        finally:
            db.execute("UPDATE jobs SET locked = false WHERE id = ?", job_id)

Problem: What if the server crashes while holding the lock?

10:00:00 - Server 1 acquires lock
10:00:01 - Server 1 starts job
10:00:05 - Server 1 crashes (power failure)
          Lock is still held!
10:00:06 - Server 2 tries to run job β†’ blocked
10:01:00 - Server 3 tries to run job β†’ blocked
...
          Job never runs again until manual intervention

Attempt 2: "Add a lock timeout"

def run_scheduled_job(job_id):
    lock_until = datetime.now() + timedelta(minutes=5)
    
    if db.execute("""
        UPDATE jobs 
        SET locked_until = ? 
        WHERE id = ? AND (locked_until IS NULL OR locked_until < NOW())
    """, lock_until, job_id):
        try:
            execute_job(job_id)
        finally:
            db.execute("UPDATE jobs SET locked_until = NULL WHERE id = ?", job_id)

Problem: What if the job takes longer than the timeout?

10:00:00 - Server 1 acquires lock (expires at 10:05:00)
10:00:01 - Server 1 starts job (a slow report generation)
10:05:00 - Lock expires!
10:05:01 - Server 2 acquires lock, starts SAME job
10:08:00 - Server 1 finishes job
10:08:01 - Server 1 releases lock (but it's Server 2's lock now!)
10:10:00 - Server 2 finishes job

Result: Job ran twice, and lock state is corrupted

Attempt 3: "Use distributed locking with Redis"

def run_scheduled_job(job_id):
    lock = redis.set(f"lock:{job_id}", server_id, nx=True, ex=300)
    
    if lock:
        try:
            execute_job(job_id)
        finally:
            # Only release if we still own the lock
            if redis.get(f"lock:{job_id}") == server_id:
                redis.delete(f"lock:{job_id}")

Problem: This is still vulnerable to the "slow job" problem and also has a race condition in the finally block.

10:05:00 - Lock expires
10:05:01 - Server 2 acquires lock
10:05:02 - Server 1's GET returns server_id (stale data in flight)
10:05:03 - Server 1 DELETEs the lock (but it's Server 2's lock!)

1.2 The Core Challenges

These attempts reveal the fundamental challenges:

Challenge Description
Mutual Exclusion Only one server should run a job at a time
Deadlock Prevention Crashed servers shouldn't hold locks forever
Split Brain Network partitions can cause multiple leaders
Clock Skew Servers' clocks may disagree on "now"
Exactly-Once Execution Jobs should run once, not zero, not twice
Failure Recovery Crashed jobs should be detected and handled

1.3 The Fundamental Impossibility

The FLP Impossibility Result (Fischer, Lynch, Paterson, 1985) proves that in an asynchronous distributed system with even one faulty process, it's impossible to guarantee consensus will be reached.

In practical terms: You cannot have perfect distributed cron.

But you can get very close with the right trade-offs:

Perfect: Job runs exactly once, on time, every time
         (Impossible in distributed systems)

Practical: Job runs at-least-once, we use idempotency to handle duplicates
           Job may be slightly delayed during failures
           We detect and alert on missed executions
           
This is what production systems actually do.

Chapter 2: Leader Election

The foundation of distributed cron is leader election: choosing one server to be responsible for scheduling and triggering jobs.

2.1 What Is Leader Election?

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                           LEADER ELECTION                                β”‚
β”‚                                                                          β”‚
β”‚   Before Election:           After Election:                            β”‚
β”‚                                                                          β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                β”‚
β”‚   β”‚Server 1 β”‚                β”‚Server 1 β”‚ ← LEADER                       β”‚
β”‚   β”‚(wants   β”‚                β”‚(runs    β”‚   (schedules jobs)             β”‚
β”‚   β”‚ to lead)β”‚                β”‚ cron)   β”‚                                β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                β”‚
β”‚                                                                          β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                β”‚
β”‚   β”‚Server 2 β”‚                β”‚Server 2 β”‚ ← FOLLOWER                     β”‚
β”‚   β”‚(wants   β”‚                β”‚(standby)β”‚   (ready to take over)         β”‚
β”‚   β”‚ to lead)β”‚                β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                                            β”‚
β”‚                              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚Server 3 β”‚ ← FOLLOWER                     β”‚
β”‚   β”‚Server 3 β”‚                β”‚(standby)β”‚   (ready to take over)         β”‚
β”‚   β”‚(wants   β”‚                β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                β”‚
β”‚   β”‚ to lead)β”‚                                                            β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                                            β”‚
β”‚                                                                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Only the leader runs the cron scheduler. Followers monitor the leader and are ready to take over if it fails.

2.2 The Split-Brain Problem

The most dangerous scenario in leader election:

Network Partition:
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚     PARTITION       β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              β”‚
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β”‚                     β”‚                     β”‚
        β–Ό                     β”‚                     β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Servers     β”‚             β”‚             β”‚   Servers     β”‚
β”‚   1, 2, 3     β”‚             β”‚             β”‚   4, 5        β”‚
β”‚               β”‚             β”‚             β”‚               β”‚
β”‚ "Server 1 is  β”‚             β”‚             β”‚ "We can't see β”‚
β”‚  still leader"β”‚             β”‚             β”‚  Server 1...  β”‚
β”‚               β”‚             β”‚             β”‚  Elect new    β”‚
β”‚               β”‚             β”‚             β”‚  leader: 4!"  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
        β”‚                     β”‚                     β”‚
        β–Ό                     β”‚                     β–Ό
   Server 1                   β”‚                Server 4
   runs jobs                  β”‚                runs jobs
                              β”‚
                    TWO LEADERS! πŸ’₯

When the partition heals, you have two servers that both think they're the leader, both running jobs.

2.3 Fencing Tokens

The solution to split-brain is fencing tokens (also called epoch numbers or generation IDs):

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                           FENCING TOKENS                                 β”‚
β”‚                                                                          β”‚
β”‚  Every time a new leader is elected, increment a global counter.        β”‚
β”‚  The leader includes this token in all operations.                      β”‚
β”‚  Resources reject operations with old tokens.                           β”‚
β”‚                                                                          β”‚
β”‚  Timeline:                                                               β”‚
β”‚                                                                          β”‚
β”‚  Token 1: Server A becomes leader                                       β”‚
β”‚           Server A: "Run job with token=1"                              β”‚
β”‚           Database: "OK, token 1 accepted"                              β”‚
β”‚                                                                          β”‚
β”‚  Token 2: Network partition, Server B becomes leader                    β”‚
β”‚           Server B: "Run job with token=2"                              β”‚
β”‚           Database: "OK, token 2 accepted"                              β”‚
β”‚                                                                          β”‚
β”‚  [Partition heals]                                                       β”‚
β”‚  Server A (stale): "Run job with token=1"                               β”‚
β”‚  Database: "REJECTED - token 1 < current token 2"                       β”‚
β”‚                                                                          β”‚
β”‚  Server A realizes it's no longer leader and steps down.                β”‚
β”‚                                                                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Implementation:

class FencedJobExecutor:
    def __init__(self, db):
        self.db = db
    
    def execute_job(self, job_id: str, fencing_token: int, job_func):
        """Execute job only if fencing token is current."""
        
        # Atomically check and update token
        result = self.db.execute("""
            UPDATE jobs 
            SET last_execution_token = ?,
                last_execution_started = NOW()
            WHERE id = ? 
            AND (last_execution_token IS NULL OR last_execution_token < ?)
            RETURNING id
        """, fencing_token, job_id, fencing_token)
        
        if not result:
            raise StaleLeaderError(f"Token {fencing_token} is stale")
        
        try:
            job_func()
            
            # Mark completion with token
            self.db.execute("""
                UPDATE jobs 
                SET last_execution_completed = NOW()
                WHERE id = ? AND last_execution_token = ?
            """, job_id, fencing_token)
            
        except Exception as e:
            # Mark failure with token
            self.db.execute("""
                UPDATE jobs 
                SET last_execution_error = ?
                WHERE id = ? AND last_execution_token = ?
            """, str(e), job_id, fencing_token)
            raise

2.4 Why ZooKeeper and etcd Exist

Implementing leader election correctly is so hard that dedicated systems exist just for this:

ZooKeeper (Apache):

  • Created by Yahoo for Hadoop
  • Uses ZAB (ZooKeeper Atomic Broadcast) consensus
  • Provides: leader election, distributed locks, configuration management
  • Used by: Kafka, HBase, Solr

etcd (CoreOS/CNCF):

  • Created for Kubernetes
  • Uses Raft consensus
  • Provides: key-value store, leader election, watch notifications
  • Used by: Kubernetes, CoreDNS, many cloud-native tools

Consul (HashiCorp):

  • Uses Raft consensus
  • Provides: service discovery, health checking, leader election
  • Used by: Many HashiCorp tools, service meshes

Why use these instead of building your own?

DIY Implementation Dedicated System
Months of development Days of integration
Subtle bugs (split-brain, etc.) Battle-tested
No monitoring/tooling Rich observability
Your team maintains it Community maintains it

The rule: Unless you're building a distributed database or similar infrastructure, use an existing coordination service.

2.5 Leader Election with etcd

import etcd3
from threading import Thread, Event
import time

class EtcdLeaderElection:
    """Leader election using etcd leases."""
    
    def __init__(self, etcd_client, election_name: str, ttl: int = 10):
        self.client = etcd_client
        self.election_name = election_name
        self.ttl = ttl
        self.lease = None
        self.is_leader = False
        self.leader_key = f"/elections/{election_name}/leader"
        self.stop_event = Event()
    
    def campaign(self, candidate_id: str, on_elected, on_demoted):
        """Campaign to become leader."""
        
        while not self.stop_event.is_set():
            try:
                # Create a lease that must be renewed
                self.lease = self.client.lease(self.ttl)
                
                # Try to become leader
                success, _ = self.client.transaction(
                    compare=[
                        self.client.transactions.create(self.leader_key) == 0
                    ],
                    success=[
                        self.client.transactions.put(
                            self.leader_key, 
                            candidate_id,
                            lease=self.lease
                        )
                    ],
                    failure=[]
                )
                
                if success:
                    self.is_leader = True
                    on_elected()
                    self._maintain_leadership(on_demoted)
                else:
                    # Someone else is leader, watch for changes
                    self._wait_for_leader_change()
            
            except Exception as e:
                if self.is_leader:
                    self.is_leader = False
                    on_demoted()
                time.sleep(1)
    
    def _maintain_leadership(self, on_demoted):
        """Keep refreshing lease while leader."""
        
        while self.is_leader and not self.stop_event.is_set():
            try:
                self.lease.refresh()
                time.sleep(self.ttl / 3)
            except:
                self.is_leader = False
                on_demoted()
                break
    
    def _wait_for_leader_change(self):
        """Watch for leader key deletion."""
        
        events, cancel = self.client.watch(self.leader_key)
        
        for event in events:
            if isinstance(event, etcd3.events.DeleteEvent):
                cancel()
                return  # Leader gone, try to become leader
            
            if self.stop_event.is_set():
                cancel()
                return
    
    def resign(self):
        """Voluntarily give up leadership."""
        
        if self.is_leader and self.lease:
            self.lease.revoke()
            self.is_leader = False
    
    def stop(self):
        """Stop the election process."""
        self.stop_event.set()
        self.resign()

Chapter 3: Job Execution Guarantees

3.1 The Spectrum of Guarantees

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                      JOB EXECUTION GUARANTEES                            β”‚
β”‚                                                                          β”‚
β”‚  ◄─────────────────────────────────────────────────────────────────────► β”‚
β”‚  AT-MOST-ONCE              AT-LEAST-ONCE              EXACTLY-ONCE      β”‚
β”‚                                                                          β”‚
β”‚  "Fire and forget"         "Retry until done"         "Magic" (myth)    β”‚
β”‚                                                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
β”‚  β”‚ - Simple        β”‚       β”‚ - Reliable      β”‚       β”‚ - Requires      β”‚β”‚
β”‚  β”‚ - Fast          β”‚       β”‚ - May duplicate β”‚       β”‚   idempotency   β”‚β”‚
β”‚  β”‚ - May lose jobs β”‚       β”‚ - Need idempot- β”‚       β”‚ - Complex       β”‚β”‚
β”‚  β”‚                 β”‚       β”‚   ent jobs      β”‚       β”‚ - Expensive     β”‚β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚
β”‚                                                                          β”‚
β”‚  Use for:                  Use for:                  Use for:           β”‚
β”‚  - Metrics/logs            - Most jobs               - Financial        β”‚
β”‚  - Cache warming           - Email sending           - Critical audit   β”‚
β”‚  - Best-effort             - Report generation       - Exactly-once     β”‚
β”‚                                                       semantics needed  β”‚
β”‚                                                                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

3.2 Implementing At-Least-Once Job Execution

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Callable, Optional
import uuid

class JobStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    DEAD = "dead"  # Exceeded all retries

@dataclass
class ScheduledJob:
    id: str
    name: str
    cron_expression: str
    handler: str  # e.g., "app.jobs.send_daily_report"
    
    # Execution tracking
    status: JobStatus = JobStatus.PENDING
    last_run_at: Optional[datetime] = None
    next_run_at: Optional[datetime] = None
    
    # Current execution
    execution_id: Optional[str] = None
    execution_started_at: Optional[datetime] = None
    execution_token: Optional[int] = None
    
    # Retry configuration
    max_retries: int = 3
    retry_count: int = 0
    timeout_seconds: int = 3600  # 1 hour default
    
    # Failure tracking
    last_error: Optional[str] = None
    consecutive_failures: int = 0

class AtLeastOnceJobExecutor:
    """
    Job executor with at-least-once guarantee.
    
    Guarantees:
    - Jobs will run at least once (may run multiple times on failures)
    - Failed jobs will be retried
    - Stuck jobs will be detected and re-run
    - Jobs are idempotent (caller's responsibility)
    """
    
    def __init__(self, db, job_registry, fencing_token: int):
        self.db = db
        self.job_registry = job_registry
        self.fencing_token = fencing_token
    
    def execute_job(self, job: ScheduledJob) -> bool:
        """
        Execute a scheduled job with at-least-once semantics.
        Returns True if job completed successfully.
        """
        
        execution_id = str(uuid.uuid4())
        
        # Try to claim the job
        claimed = self._claim_job(job.id, execution_id)
        if not claimed:
            return False  # Another instance is running it
        
        try:
            # Get the handler function
            handler = self.job_registry.get_handler(job.handler)
            if not handler:
                raise ValueError(f"Unknown job handler: {job.handler}")
            
            # Execute with timeout
            self._execute_with_timeout(handler, job.timeout_seconds)
            
            # Mark success
            self._mark_completed(job.id, execution_id)
            return True
        
        except TimeoutError:
            self._mark_failed(job.id, execution_id, "Job timed out")
            return False
        
        except Exception as e:
            self._mark_failed(job.id, execution_id, str(e))
            return False
    
    def _claim_job(self, job_id: str, execution_id: str) -> bool:
        """
        Atomically claim a job for execution.
        Uses fencing token to prevent stale leaders from claiming.
        """
        
        result = self.db.execute("""
            UPDATE scheduled_jobs
            SET status = 'running',
                execution_id = ?,
                execution_started_at = NOW(),
                execution_token = ?
            WHERE id = ?
            AND status IN ('pending', 'failed')
            AND (execution_token IS NULL OR execution_token < ?)
            RETURNING id
        """, execution_id, self.fencing_token, job_id, self.fencing_token)
        
        return result is not None
    
    def _execute_with_timeout(self, handler: Callable, timeout: int):
        """Execute handler with timeout."""
        
        import signal
        
        def timeout_handler(signum, frame):
            raise TimeoutError("Job execution timed out")
        
        signal.signal(signal.SIGALRM, timeout_handler)
        signal.alarm(timeout)
        
        try:
            handler()
        finally:
            signal.alarm(0)
    
    def _mark_completed(self, job_id: str, execution_id: str):
        """Mark job as completed."""
        
        self.db.execute("""
            UPDATE scheduled_jobs
            SET status = 'completed',
                last_run_at = NOW(),
                next_run_at = ?,
                retry_count = 0,
                consecutive_failures = 0,
                last_error = NULL
            WHERE id = ? AND execution_id = ?
        """, self._calculate_next_run(job_id), job_id, execution_id)
    
    def _mark_failed(self, job_id: str, execution_id: str, error: str):
        """Mark job as failed, potentially scheduling retry."""
        
        job = self.db.get_job(job_id)
        
        if job.retry_count < job.max_retries:
            # Schedule retry
            retry_delay = self._calculate_retry_delay(job.retry_count)
            
            self.db.execute("""
                UPDATE scheduled_jobs
                SET status = 'pending',
                    next_run_at = NOW() + INTERVAL ? SECOND,
                    retry_count = retry_count + 1,
                    consecutive_failures = consecutive_failures + 1,
                    last_error = ?
                WHERE id = ? AND execution_id = ?
            """, retry_delay, error, job_id, execution_id)
        else:
            # Move to dead status
            self.db.execute("""
                UPDATE scheduled_jobs
                SET status = 'dead',
                    last_error = ?,
                    consecutive_failures = consecutive_failures + 1
                WHERE id = ? AND execution_id = ?
            """, error, job_id, execution_id)
            
            # Alert operations
            self._alert_dead_job(job, error)
    
    def _calculate_retry_delay(self, retry_count: int) -> int:
        """Exponential backoff for retries."""
        base_delay = 60  # 1 minute
        max_delay = 3600  # 1 hour
        delay = base_delay * (2 ** retry_count)
        return min(delay, max_delay)

3.3 Detecting and Recovering Stuck Jobs

What happens when a job starts but the server crashes mid-execution?

class StuckJobDetector:
    """
    Detect and recover jobs that are stuck in 'running' state.
    
    A job is considered stuck if:
    - Status is 'running'
    - Started more than timeout_seconds ago
    - No heartbeat received recently
    """
    
    def __init__(self, db, alert_service):
        self.db = db
        self.alert_service = alert_service
    
    def find_stuck_jobs(self) -> list:
        """Find jobs that appear to be stuck."""
        
        return self.db.query("""
            SELECT * FROM scheduled_jobs
            WHERE status = 'running'
            AND execution_started_at < NOW() - INTERVAL timeout_seconds SECOND
            AND (last_heartbeat_at IS NULL 
                 OR last_heartbeat_at < NOW() - INTERVAL 60 SECOND)
        """)
    
    def recover_stuck_jobs(self):
        """Reset stuck jobs to pending for re-execution."""
        
        stuck_jobs = self.find_stuck_jobs()
        
        for job in stuck_jobs:
            # Log the stuck job
            logger.warning(
                "Recovering stuck job",
                job_id=job.id,
                execution_id=job.execution_id,
                started_at=job.execution_started_at
            )
            
            # Check if it's a chronic problem
            if job.consecutive_failures >= 3:
                self.alert_service.alert(
                    severity="high",
                    message=f"Job {job.name} has failed {job.consecutive_failures} times",
                    job_id=job.id
                )
            
            # Reset to pending (will be picked up on next schedule)
            self.db.execute("""
                UPDATE scheduled_jobs
                SET status = 'failed',
                    last_error = 'Job stuck - server may have crashed',
                    consecutive_failures = consecutive_failures + 1
                WHERE id = ? AND execution_id = ?
            """, job.id, job.execution_id)
    
    def run_detector(self, interval: int = 60):
        """Run stuck job detection periodically."""
        
        while True:
            try:
                self.recover_stuck_jobs()
            except Exception as e:
                logger.error("Stuck job detector error", error=str(e))
            
            time.sleep(interval)

3.4 Job Heartbeats

For long-running jobs, use heartbeats to signal progress:

class HeartbeatJobExecutor:
    """
    Job executor with heartbeat support for long-running jobs.
    """
    
    def __init__(self, db, heartbeat_interval: int = 30):
        self.db = db
        self.heartbeat_interval = heartbeat_interval
        self.heartbeat_thread = None
        self.stop_heartbeat = False
    
    def execute_with_heartbeat(self, job: ScheduledJob, handler: Callable):
        """Execute job while sending heartbeats."""
        
        self.stop_heartbeat = False
        
        # Start heartbeat thread
        self.heartbeat_thread = Thread(
            target=self._heartbeat_loop,
            args=(job.id, job.execution_id)
        )
        self.heartbeat_thread.start()
        
        try:
            handler()
        finally:
            self.stop_heartbeat = True
            self.heartbeat_thread.join()
    
    def _heartbeat_loop(self, job_id: str, execution_id: str):
        """Send periodic heartbeats."""
        
        while not self.stop_heartbeat:
            try:
                self.db.execute("""
                    UPDATE scheduled_jobs
                    SET last_heartbeat_at = NOW()
                    WHERE id = ? AND execution_id = ?
                """, job_id, execution_id)
            except:
                pass  # Best effort
            
            time.sleep(self.heartbeat_interval)

Chapter 4: The Complete Distributed Cron System

4.1 Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    DISTRIBUTED CRON ARCHITECTURE                         β”‚
β”‚                                                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                        COORDINATION LAYER                         β”‚   β”‚
β”‚  β”‚                         (etcd / ZooKeeper)                        β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”               β”‚   β”‚
β”‚  β”‚  β”‚   Leader    β”‚  β”‚   Config    β”‚  β”‚  Fencing    β”‚               β”‚   β”‚
β”‚  β”‚  β”‚  Election   β”‚  β”‚   Store     β”‚  β”‚   Tokens    β”‚               β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜               β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                β”‚                                         β”‚
β”‚                                β–Ό                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                        SCHEDULER (Leader Only)                    β”‚   β”‚
β”‚  β”‚                                                                   β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”               β”‚   β”‚
β”‚  β”‚  β”‚    Cron     β”‚  β”‚    Job      β”‚  β”‚   Trigger   β”‚               β”‚   β”‚
β”‚  β”‚  β”‚   Parser    β”‚  β”‚  Registry   β”‚  β”‚  Generator  β”‚               β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜               β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                β”‚                                         β”‚
β”‚                                β–Ό                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                         JOB QUEUE                                 β”‚   β”‚
β”‚  β”‚                    (Redis / RabbitMQ / SQS)                       β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                β”‚                                         β”‚
β”‚                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                        β”‚
β”‚                β–Ό               β–Ό               β–Ό                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚
β”‚  β”‚    Worker 1     β”‚ β”‚    Worker 2     β”‚ β”‚    Worker 3     β”‚           β”‚
β”‚  β”‚                 β”‚ β”‚                 β”‚ β”‚                 β”‚           β”‚
β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚           β”‚
β”‚  β”‚ β”‚  Executor   β”‚ β”‚ β”‚ β”‚  Executor   β”‚ β”‚ β”‚ β”‚  Executor   β”‚ β”‚           β”‚
β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚           β”‚
β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚           β”‚
β”‚  β”‚ β”‚ Heartbeat   β”‚ β”‚ β”‚ β”‚ Heartbeat   β”‚ β”‚ β”‚ β”‚ Heartbeat   β”‚ β”‚           β”‚
β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚           β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚
β”‚                                                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                      PERSISTENCE LAYER                            β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”               β”‚   β”‚
β”‚  β”‚  β”‚    Job      β”‚  β”‚  Execution  β”‚  β”‚   Audit     β”‚               β”‚   β”‚
β”‚  β”‚  β”‚   Store     β”‚  β”‚    Logs     β”‚  β”‚    Trail    β”‚               β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜               β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

4.2 Complete Implementation

"""
Distributed Cron System
Combines: Leader Election + Job Scheduling + At-Least-Once Execution
"""

import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Callable, Optional
from dataclasses import dataclass, field
from croniter import croniter
import structlog

logger = structlog.get_logger()

# =============================================================================
# Configuration
# =============================================================================

@dataclass
class CronConfig:
    # Leader election
    etcd_endpoints: List[str] = field(default_factory=lambda: ["localhost:2379"])
    election_ttl: int = 10  # seconds
    
    # Scheduling
    tick_interval: int = 1  # seconds
    lookahead_seconds: int = 60  # How far ahead to schedule
    
    # Execution
    default_timeout: int = 3600  # 1 hour
    max_retries: int = 3
    stuck_job_threshold: int = 300  # 5 minutes without heartbeat
    
    # Workers
    num_workers: int = 4
    heartbeat_interval: int = 30

# =============================================================================
# Job Definitions
# =============================================================================

@dataclass
class JobDefinition:
    name: str
    cron: str
    handler: str
    timeout: int = 3600
    max_retries: int = 3
    enabled: bool = True
    metadata: Dict = field(default_factory=dict)

@dataclass
class JobExecution:
    id: str
    job_name: str
    scheduled_at: datetime
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    status: str = "pending"  # pending, running, completed, failed, dead
    fencing_token: Optional[int] = None
    worker_id: Optional[str] = None
    error: Optional[str] = None
    retry_count: int = 0

# =============================================================================
# Distributed Cron Scheduler
# =============================================================================

class DistributedCronScheduler:
    """
    Distributed cron scheduler with leader election.
    
    Only the leader schedules jobs. Workers execute them.
    """
    
    def __init__(
        self,
        config: CronConfig,
        etcd_client,
        job_store,
        job_queue,
        job_registry: Dict[str, Callable]
    ):
        self.config = config
        self.etcd = etcd_client
        self.job_store = job_store
        self.job_queue = job_queue
        self.job_registry = job_registry
        
        self.is_leader = False
        self.fencing_token = 0
        self.node_id = str(uuid.uuid4())[:8]
        
        self.leader_election = EtcdLeaderElection(
            etcd_client,
            "cron-scheduler",
            ttl=config.election_ttl
        )
    
    async def start(self):
        """Start the distributed cron system."""
        
        logger.info("Starting distributed cron", node_id=self.node_id)
        
        # Start leader election in background
        asyncio.create_task(self._run_election())
        
        # Start workers
        for i in range(self.config.num_workers):
            asyncio.create_task(self._run_worker(f"{self.node_id}-worker-{i}"))
        
        # Start stuck job detector
        asyncio.create_task(self._run_stuck_job_detector())
        
        # Keep running
        while True:
            await asyncio.sleep(1)
    
    async def _run_election(self):
        """Run leader election."""
        
        await self.leader_election.campaign(
            candidate_id=self.node_id,
            on_elected=self._on_elected,
            on_demoted=self._on_demoted
        )
    
    def _on_elected(self):
        """Called when this node becomes leader."""
        
        self.is_leader = True
        self.fencing_token = int(time.time() * 1000)  # Millisecond timestamp as token
        
        logger.info(
            "Became leader",
            node_id=self.node_id,
            fencing_token=self.fencing_token
        )
        
        # Start scheduler loop
        asyncio.create_task(self._scheduler_loop())
    
    def _on_demoted(self):
        """Called when this node loses leadership."""
        
        self.is_leader = False
        
        logger.info("Lost leadership", node_id=self.node_id)
    
    async def _scheduler_loop(self):
        """Main scheduler loop (runs only on leader)."""
        
        logger.info("Starting scheduler loop", node_id=self.node_id)
        
        while self.is_leader:
            try:
                await self._schedule_due_jobs()
            except Exception as e:
                logger.error("Scheduler error", error=str(e))
            
            await asyncio.sleep(self.config.tick_interval)
        
        logger.info("Scheduler loop stopped", node_id=self.node_id)
    
    async def _schedule_due_jobs(self):
        """Find and queue jobs that are due to run."""
        
        now = datetime.utcnow()
        
        # Get all job definitions
        jobs = await self.job_store.get_all_jobs()
        
        for job in jobs:
            if not job.enabled:
                continue
            
            # Check if job is due
            cron = croniter(job.cron, now - timedelta(seconds=self.config.lookahead_seconds))
            next_run = cron.get_next(datetime)
            
            if next_run <= now:
                # Check if already scheduled
                existing = await self.job_store.get_pending_execution(
                    job.name, 
                    next_run
                )
                
                if not existing:
                    # Create execution record
                    execution = JobExecution(
                        id=str(uuid.uuid4()),
                        job_name=job.name,
                        scheduled_at=next_run,
                        fencing_token=self.fencing_token
                    )
                    
                    await self.job_store.create_execution(execution)
                    
                    # Queue for workers
                    await self.job_queue.enqueue({
                        'execution_id': execution.id,
                        'job_name': job.name,
                        'handler': job.handler,
                        'timeout': job.timeout,
                        'fencing_token': self.fencing_token
                    })
                    
                    logger.info(
                        "Scheduled job",
                        job_name=job.name,
                        execution_id=execution.id,
                        scheduled_at=next_run
                    )
    
    async def _run_worker(self, worker_id: str):
        """Worker loop that executes jobs."""
        
        logger.info("Starting worker", worker_id=worker_id)
        
        while True:
            try:
                # Get job from queue
                task = await self.job_queue.dequeue(timeout=5)
                
                if task:
                    await self._execute_job(worker_id, task)
            
            except Exception as e:
                logger.error("Worker error", worker_id=worker_id, error=str(e))
                await asyncio.sleep(1)
    
    async def _execute_job(self, worker_id: str, task: dict):
        """Execute a job task."""
        
        execution_id = task['execution_id']
        job_name = task['job_name']
        handler_name = task['handler']
        timeout = task['timeout']
        fencing_token = task['fencing_token']
        
        logger.info(
            "Executing job",
            worker_id=worker_id,
            execution_id=execution_id,
            job_name=job_name
        )
        
        # Claim the execution
        claimed = await self.job_store.claim_execution(
            execution_id,
            worker_id,
            fencing_token
        )
        
        if not claimed:
            logger.warning(
                "Could not claim execution",
                execution_id=execution_id,
                reason="Already claimed or stale token"
            )
            return
        
        # Get handler
        handler = self.job_registry.get(handler_name)
        if not handler:
            await self.job_store.fail_execution(
                execution_id,
                f"Unknown handler: {handler_name}"
            )
            return
        
        # Start heartbeat
        heartbeat_task = asyncio.create_task(
            self._heartbeat_loop(execution_id, worker_id)
        )
        
        try:
            # Execute with timeout
            await asyncio.wait_for(
                handler(),
                timeout=timeout
            )
            
            # Mark success
            await self.job_store.complete_execution(execution_id)
            
            logger.info(
                "Job completed",
                execution_id=execution_id,
                job_name=job_name
            )
        
        except asyncio.TimeoutError:
            await self.job_store.fail_execution(
                execution_id,
                "Job timed out"
            )
            
            logger.error(
                "Job timed out",
                execution_id=execution_id,
                job_name=job_name,
                timeout=timeout
            )
        
        except Exception as e:
            await self.job_store.fail_execution(
                execution_id,
                str(e)
            )
            
            logger.error(
                "Job failed",
                execution_id=execution_id,
                job_name=job_name,
                error=str(e)
            )
        
        finally:
            heartbeat_task.cancel()
    
    async def _heartbeat_loop(self, execution_id: str, worker_id: str):
        """Send heartbeats while job is running."""
        
        while True:
            try:
                await self.job_store.update_heartbeat(execution_id, worker_id)
                await asyncio.sleep(self.config.heartbeat_interval)
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error("Heartbeat error", error=str(e))
    
    async def _run_stuck_job_detector(self):
        """Detect and recover stuck jobs."""
        
        while True:
            try:
                stuck_jobs = await self.job_store.find_stuck_executions(
                    threshold_seconds=self.config.stuck_job_threshold
                )
                
                for execution in stuck_jobs:
                    logger.warning(
                        "Found stuck job",
                        execution_id=execution.id,
                        job_name=execution.job_name,
                        started_at=execution.started_at
                    )
                    
                    # Reset to failed for retry
                    await self.job_store.fail_execution(
                        execution.id,
                        "Job stuck - worker may have crashed"
                    )
                    
                    # Re-queue if retries remaining
                    job = await self.job_store.get_job(execution.job_name)
                    if execution.retry_count < job.max_retries:
                        await self.job_queue.enqueue({
                            'execution_id': execution.id,
                            'job_name': execution.job_name,
                            'handler': job.handler,
                            'timeout': job.timeout,
                            'fencing_token': self.fencing_token,
                            'retry_count': execution.retry_count + 1
                        })
            
            except Exception as e:
                logger.error("Stuck job detector error", error=str(e))
            
            await asyncio.sleep(60)

Part II: The Design Challenge

Chapter 5: "The Leader Dies Mid-Job"

5.1 The Scenario

This is the classic distributed systems interview question:

Timeline:
  10:00:00 - Leader (Server A) triggers daily report job
  10:00:01 - Worker starts generating report
  10:00:30 - Leader (Server A) crashes (power failure)
  10:00:35 - etcd lease expires
  10:00:36 - Server B becomes new leader
  
Questions:
  1. What happens to the in-progress job?
  2. Will the job run twice?
  3. How do we ensure exactly-once semantics?
  4. What if the job was half-finished (e.g., sent some emails)?

5.2 Analyzing the Failure Modes

Mode 1: Leader crash, job still running on worker

Leader A crashes, but worker is on different server.
Worker continues running the job.
New leader B doesn't know job is running.
New leader B might schedule the same job again!

Solution: Job execution is tracked in persistent storage, not just in leader's memory.

# Before scheduling, check if already running
existing = await self.job_store.get_running_execution(job_name)
if existing:
    if existing.is_stale():  # No heartbeat for 5+ minutes
        # Assume dead, reschedule
        pass
    else:
        # Still running, skip
        return

Mode 2: Leader crash, job on same server

Leader A is also running the job.
Both crash together.
Job is partially complete (e.g., 50 emails sent).

Solution: Make jobs idempotent and resumable.

class IdempotentEmailJob:
    """Send daily email report - idempotent implementation."""
    
    async def run(self, execution_id: str):
        # Get all users who should receive email
        users = await self.db.get_email_recipients()
        
        for user in users:
            # Check if already sent (idempotency)
            already_sent = await self.db.check_email_sent(
                execution_id=execution_id,
                user_id=user.id
            )
            
            if already_sent:
                continue
            
            # Send email
            await self.email_service.send(user.email, self.report)
            
            # Mark as sent (for idempotency)
            await self.db.mark_email_sent(
                execution_id=execution_id,
                user_id=user.id
            )

Mode 3: Split-brain during failover

Network partition:
  Partition A: Old leader + some workers
  Partition B: New leader + other workers

Both leaders think they're in charge!

Solution: Fencing tokens.

# Worker checks fencing token before executing
async def execute_job(self, task):
    current_token = await self.get_current_fencing_token()
    
    if task['fencing_token'] < current_token:
        # Stale task from old leader
        logger.warning("Rejecting stale task", 
            task_token=task['fencing_token'],
            current_token=current_token)
        return
    
    # Safe to execute
    await self._do_execute(task)

5.3 Complete Solution

class ResilientJobExecution:
    """
    Job execution that handles leader failure gracefully.
    """
    
    def __init__(self, job_store, fencing_store):
        self.job_store = job_store
        self.fencing_store = fencing_store
    
    async def execute_with_resilience(
        self,
        job: JobDefinition,
        execution: JobExecution,
        handler: Callable
    ):
        """
        Execute job with full resilience:
        - Fencing token validation
        - Idempotency support
        - Progress tracking
        - Graceful failure handling
        """
        
        # Step 1: Validate fencing token
        current_token = await self.fencing_store.get_current_token()
        if execution.fencing_token < current_token:
            raise StaleExecutionError(
                f"Execution {execution.id} has stale token"
            )
        
        # Step 2: Create execution context for idempotency
        context = ExecutionContext(
            execution_id=execution.id,
            job_name=job.name,
            checkpoint_store=self.job_store
        )
        
        try:
            # Step 3: Execute with progress tracking
            await handler(context)
            
            # Step 4: Mark complete
            await self.job_store.complete_execution(
                execution.id,
                fencing_token=execution.fencing_token
            )
            
        except Exception as e:
            # Step 5: Handle failure
            await self._handle_failure(execution, e)
            raise
    
    async def _handle_failure(self, execution: JobExecution, error: Exception):
        """Handle job failure."""
        
        # Update execution record
        await self.job_store.fail_execution(
            execution.id,
            str(error),
            fencing_token=execution.fencing_token
        )
        
        # Check if retryable
        job = await self.job_store.get_job(execution.job_name)
        
        if execution.retry_count < job.max_retries:
            # Schedule retry
            await self.job_store.schedule_retry(
                execution.id,
                delay_seconds=self._calculate_backoff(execution.retry_count)
            )
        else:
            # Move to dead letter
            await self.job_store.mark_dead(execution.id)
            await self._alert_dead_job(execution, error)


class ExecutionContext:
    """
    Context for idempotent job execution.
    Provides checkpoint/resume capabilities.
    """
    
    def __init__(self, execution_id: str, job_name: str, checkpoint_store):
        self.execution_id = execution_id
        self.job_name = job_name
        self.checkpoint_store = checkpoint_store
    
    async def checkpoint(self, key: str, data: dict):
        """Save progress checkpoint."""
        await self.checkpoint_store.save_checkpoint(
            self.execution_id,
            key,
            data
        )
    
    async def get_checkpoint(self, key: str) -> Optional[dict]:
        """Get saved checkpoint."""
        return await self.checkpoint_store.get_checkpoint(
            self.execution_id,
            key
        )
    
    async def mark_step_complete(self, step: str):
        """Mark a step as complete (for idempotency)."""
        await self.checkpoint_store.mark_step_complete(
            self.execution_id,
            step
        )
    
    async def is_step_complete(self, step: str) -> bool:
        """Check if step already completed."""
        return await self.checkpoint_store.is_step_complete(
            self.execution_id,
            step
        )

5.4 Example: Resumable Invoice Generation

async def generate_monthly_invoices(ctx: ExecutionContext):
    """
    Generate invoices for all customers.
    Resumable and idempotent.
    """
    
    # Get last checkpoint (if resuming)
    checkpoint = await ctx.get_checkpoint("progress")
    last_customer_id = checkpoint.get("last_customer_id") if checkpoint else None
    
    # Get customers to process
    customers = await db.get_customers_for_invoicing(after_id=last_customer_id)
    
    for customer in customers:
        # Check if already processed (idempotency)
        step_key = f"customer_{customer.id}"
        if await ctx.is_step_complete(step_key):
            continue
        
        # Generate invoice
        invoice = await generate_invoice(customer)
        
        # Save invoice
        await db.save_invoice(invoice)
        
        # Send email
        await send_invoice_email(customer, invoice)
        
        # Mark step complete
        await ctx.mark_step_complete(step_key)
        
        # Checkpoint progress
        await ctx.checkpoint("progress", {"last_customer_id": customer.id})
    
    logger.info("Invoice generation complete", 
        execution_id=ctx.execution_id,
        customers_processed=len(customers))

Chapter 6: Comparing Real-World Solutions

6.1 Celery Beat

What it is: Periodic task scheduler for Celery (Python).

How it works:

# celeryconfig.py
from celery.schedules import crontab

beat_schedule = {
    'send-daily-report': {
        'task': 'tasks.send_daily_report',
        'schedule': crontab(hour=9, minute=0),
    },
}

Single-node problem: By default, only ONE Celery Beat process should run.

Solution: celery-beat-redis-scheduler

# Use Redis for distributed locking
from redbeat import RedBeatSchedulerEntry

# Only one beat will schedule tasks

Pros:

  • Simple setup
  • Integrates with Celery ecosystem
  • Many community extensions

Cons:

  • Not truly distributed (single beat process)
  • Requires external solution for HA
  • No built-in exactly-once guarantees

6.2 Kubernetes CronJobs

What it is: Kubernetes-native scheduled jobs.

apiVersion: batch/v1
kind: CronJob
metadata:
  name: daily-report
spec:
  schedule: "0 9 * * *"
  concurrencyPolicy: Forbid  # Don't run if previous still running
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: report
            image: my-report-generator
          restartPolicy: OnFailure

Key features:

  • concurrencyPolicy: Forbid - prevents overlapping runs
  • startingDeadlineSeconds - miss window for scheduling
  • Automatic retries with backoffLimit

Pros:

  • Built into Kubernetes
  • Handles pod failures
  • Good observability

Cons:

  • At-least-once (not exactly-once)
  • 1-minute minimum granularity
  • Can miss jobs during cluster issues

6.3 Temporal (formerly Cadence)

What it is: Workflow orchestration platform with built-in durability.

from temporalio import workflow
from datetime import timedelta

@workflow.defn
class DailyReportWorkflow:
    @workflow.run
    async def run(self):
        # This is durable - survives crashes
        await workflow.execute_activity(
            generate_report,
            start_to_close_timeout=timedelta(hours=1)
        )
        await workflow.execute_activity(
            send_emails,
            start_to_close_timeout=timedelta(minutes=30)
        )

# Schedule
schedule = Schedule(
    action=ScheduleActionStartWorkflow(
        DailyReportWorkflow.run,
    ),
    spec=ScheduleSpec(cron_expressions=["0 9 * * *"]),
)

Key features:

  • Durable execution: Workflow state survives crashes
  • Exactly-once semantics: Built into the platform
  • Automatic retries: With exponential backoff
  • Visibility: Full execution history

How it achieves exactly-once:

  1. Event sourcing for workflow state
  2. Deterministic replay
  3. Automatic activity deduplication

Pros:

  • True exactly-once semantics
  • Handles long-running jobs elegantly
  • Great debugging/visibility

Cons:

  • Additional infrastructure
  • Learning curve
  • Overkill for simple cron jobs

6.4 Comparison Table

Feature Celery Beat K8s CronJobs Temporal
Deployment Simple Kubernetes required Dedicated cluster
Exactly-once No No Yes
Crash recovery Manual Automatic Automatic
Long-running jobs Timeout issues Pod limits Native support
Visibility Basic kubectl Excellent UI
Complexity Low Medium High
Best for Simple Python apps Cloud-native apps Critical workflows

6.5 When to Use What

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        DECISION FLOWCHART                                β”‚
β”‚                                                                          β”‚
β”‚  Need scheduled jobs?                                                   β”‚
β”‚         β”‚                                                                β”‚
β”‚         β–Ό                                                                β”‚
β”‚  Simple cron, single server? ────YES────▢ Use Linux cron               β”‚
β”‚         β”‚                                                                β”‚
β”‚         NO                                                               β”‚
β”‚         β–Ό                                                                β”‚
β”‚  Using Kubernetes? ────YES────▢ Start with K8s CronJobs                 β”‚
β”‚         β”‚                        β”‚                                       β”‚
β”‚         β”‚                        └──▢ Need exactly-once? ──▢ Add        β”‚
β”‚         β”‚                             idempotency in your job            β”‚
β”‚         NO                                                               β”‚
β”‚         β–Ό                                                                β”‚
β”‚  Python/Celery app? ────YES────▢ Use Celery Beat + Redis lock          β”‚
β”‚         β”‚                                                                β”‚
β”‚         NO                                                               β”‚
β”‚         β–Ό                                                                β”‚
β”‚  Critical financial/                                                     β”‚
β”‚  compliance jobs? ────YES────▢ Use Temporal or similar                  β”‚
β”‚         β”‚                                                                β”‚
β”‚         NO                                                               β”‚
β”‚         β–Ό                                                                β”‚
β”‚  Build custom solution using                                            β”‚
β”‚  this day's patterns                                                    β”‚
β”‚                                                                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Part III: Week 2 Synthesis

Chapter 7: Bringing It All Together

7.1 The Failure-First Mindset

This week, we've internalized a crucial mindset shift:

Before Week 2:
  "How do I make this work?"
  β†’ Design for the happy path
  β†’ Handle errors as afterthought
  β†’ Hope failures are rare

After Week 2:
  "How will this fail?"
  β†’ Design for failures first
  β†’ Make failures visible and recoverable
  β†’ Assume failures are constant

7.2 The Patterns We've Learned

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     WEEK 2: COMPLETE PATTERN CATALOG                     β”‚
β”‚                                                                          β”‚
β”‚  DAY 1: TIMEOUTS                                                        β”‚
β”‚  β”œβ”€β”€ Timeout budgets (total time allocation)                            β”‚
β”‚  β”œβ”€β”€ Cascading timeouts (shrinking budgets through call chain)          β”‚
β”‚  β”œβ”€β”€ Deadline propagation (pass deadline, not duration)                 β”‚
β”‚  └── Connection vs read vs write timeouts                               β”‚
β”‚                                                                          β”‚
β”‚  DAY 2: IDEMPOTENCY                                                     β”‚
β”‚  β”œβ”€β”€ Idempotency keys (client-generated unique IDs)                     β”‚
β”‚  β”œβ”€β”€ Request fingerprinting (hash of request content)                   β”‚
β”‚  β”œβ”€β”€ Idempotency store (remember processed requests)                    β”‚
β”‚  └── TTL management (how long to remember)                              β”‚
β”‚                                                                          β”‚
β”‚  DAY 3: CIRCUIT BREAKERS                                                β”‚
β”‚  β”œβ”€β”€ Three states (Closed β†’ Open β†’ Half-Open)                           β”‚
β”‚  β”œβ”€β”€ Failure rate vs count thresholds                                   β”‚
β”‚  β”œβ”€β”€ Gradual recovery (prevent thundering herd)                         β”‚
β”‚  └── Fallback strategies (graceful degradation)                         β”‚
β”‚                                                                          β”‚
β”‚  DAY 4: WEBHOOKS                                                        β”‚
β”‚  β”œβ”€β”€ At-least-once delivery (retry until acknowledged)                  β”‚
β”‚  β”œβ”€β”€ Exponential backoff with jitter                                    β”‚
β”‚  β”œβ”€β”€ Dead letter queues (for failed deliveries)                         β”‚
β”‚  └── Signature verification (security)                                  β”‚
β”‚                                                                          β”‚
β”‚  DAY 5: DISTRIBUTED CRON                                                β”‚
β”‚  β”œβ”€β”€ Leader election (single scheduler)                                 β”‚
β”‚  β”œβ”€β”€ Fencing tokens (prevent split-brain)                               β”‚
β”‚  β”œβ”€β”€ Job heartbeats (detect stuck jobs)                                 β”‚
β”‚  └── Checkpoint/resume (idempotent execution)                           β”‚
β”‚                                                                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

7.3 How They Connect

                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚  DISTRIBUTED    β”‚
                    β”‚     CRON        β”‚
                    β”‚    (Day 5)      β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                             β”‚
           Uses leader election + fencing tokens
                             β”‚
                             β–Ό
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β”‚                                          β”‚
        β–Ό                                          β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   TIMEOUTS    β”‚                         β”‚  IDEMPOTENCY  β”‚
β”‚   (Day 1)     β”‚                         β”‚    (Day 2)    β”‚
β”‚               β”‚                         β”‚               β”‚
β”‚ Job timeouts, β”‚                         β”‚ Jobs must be  β”‚
β”‚ execution     β”‚                         β”‚ idempotent    β”‚
β”‚ limits        β”‚                         β”‚ for retries   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜                         β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
        β”‚                                          β”‚
        β”‚         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”               β”‚
        β”‚         β”‚   CIRCUIT     β”‚               β”‚
        └────────▢│   BREAKERS    β”‚β—€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                  β”‚   (Day 3)     β”‚
                  β”‚               β”‚
                  β”‚ Jobs calling  β”‚
                  β”‚ external APIs β”‚
                  β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                          β”‚
                          β–Ό
                  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                  β”‚   WEBHOOKS    β”‚
                  β”‚   (Day 4)     β”‚
                  β”‚               β”‚
                  β”‚ Jobs trigger  β”‚
                  β”‚ async events  β”‚
                  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

7.4 A Complete Example: Order Processing Job

Let's see all patterns working together:

class OrderProcessingJob:
    """
    Daily job to process pending orders.
    Demonstrates all Week 2 patterns.
    """
    
    def __init__(
        self,
        db,
        payment_client,
        inventory_client,
        webhook_service,
        circuit_breaker,
        idempotency_store
    ):
        self.db = db
        self.payment = payment_client
        self.inventory = inventory_client
        self.webhooks = webhook_service
        self.circuit = circuit_breaker
        self.idempotency = idempotency_store
    
    async def run(self, ctx: ExecutionContext):
        """
        Process all pending orders.
        
        Patterns used:
        - Idempotency: Skip already-processed orders
        - Timeouts: Limit time per order
        - Circuit breaker: Protect external calls
        - Webhooks: Notify external systems
        - Checkpointing: Resume after crash
        """
        
        # Get checkpoint (for resume after crash) [Day 5]
        checkpoint = await ctx.get_checkpoint("progress")
        last_order_id = checkpoint.get("last_order_id") if checkpoint else None
        
        # Get pending orders
        orders = await self.db.get_pending_orders(after_id=last_order_id)
        
        for order in orders:
            # Idempotency check [Day 2]
            idem_key = f"process_order_{order.id}"
            if await self.idempotency.exists(idem_key):
                continue
            
            try:
                # Process with timeout [Day 1]
                await asyncio.wait_for(
                    self._process_order(order, ctx),
                    timeout=30
                )
                
                # Mark idempotent [Day 2]
                await self.idempotency.set(idem_key, ttl=86400)
                
            except asyncio.TimeoutError:
                logger.error("Order processing timeout", order_id=order.id)
                await ctx.checkpoint("progress", {"last_order_id": order.id})
                raise
            
            except CircuitOpenError:
                # Circuit open, stop processing [Day 3]
                logger.warning("Circuit open, pausing job")
                await ctx.checkpoint("progress", {"last_order_id": order.id})
                raise
            
            # Checkpoint progress [Day 5]
            await ctx.checkpoint("progress", {"last_order_id": order.id})
    
    async def _process_order(self, order, ctx: ExecutionContext):
        """Process single order with all patterns."""
        
        # Step 1: Check inventory (with circuit breaker) [Day 3]
        try:
            available = await self.circuit.call(
                lambda: self.inventory.check(order.items)
            )
        except CircuitOpenError:
            raise  # Propagate to caller
        
        if not available:
            await self.db.mark_order_backordered(order.id)
            return
        
        # Step 2: Charge payment (with timeout) [Day 1]
        try:
            payment_result = await asyncio.wait_for(
                self.payment.charge(
                    order.user_id,
                    order.total,
                    idempotency_key=f"order_{order.id}"  # Day 2
                ),
                timeout=10
            )
        except asyncio.TimeoutError:
            logger.error("Payment timeout", order_id=order.id)
            raise
        
        # Step 3: Reserve inventory
        await self.inventory.reserve(order.items, order.id)
        
        # Step 4: Update order status
        await self.db.mark_order_processing(order.id, payment_result.id)
        
        # Step 5: Send webhook (async) [Day 4]
        await self.webhooks.emit(
            event_type="order.processing",
            data={
                "order_id": order.id,
                "status": "processing",
                "payment_id": payment_result.id
            }
        )

Part IV: Interview Questions

Chapter 8: Week 2 Interview Mastery

8.1 Distributed Cron Questions

Q: "How would you design a job scheduler that ensures jobs run exactly once across multiple servers?"

Strong Answer:

"This is a distributed coordination problem. Here's my approach:

Leader Election: First, I need a single scheduler to prevent duplicate scheduling. I'd use etcd or ZooKeeper for leader election. The leader has a lease that must be renewed; if it crashes, the lease expires and another node becomes leader.

Fencing Tokens: To prevent split-brain, each leader gets a monotonically increasing fencing token. Workers validate this token before executing. Stale tokens from old leaders are rejected.

At-Least-Once Execution: True exactly-once is impossible, so I implement at-least-once with idempotent jobs. Each execution has a unique ID. Jobs use this ID to check if work was already done.

Stuck Job Detection: Workers send heartbeats during execution. A background process finds jobs that started but haven't heartbeated recently β€” these are stuck. We reset them to 'failed' for retry.

Checkpointing: Long-running jobs checkpoint progress. If a job crashes at 50%, it resumes from the checkpoint rather than starting over.

Connection to Week 2: This combines everything from this week:

  • Day 1: Job timeouts prevent infinite execution
  • Day 2: Idempotent jobs handle retries safely
  • Day 3: Circuit breakers for jobs calling external services
  • Day 4: Jobs triggering webhooks for notifications
  • Day 5: Leader election and fencing tokens"

8.2 "The Leader Dies" Question

Q: "The leader crashes while a job is running. What happens?"

Strong Answer:

"Three scenarios to consider:

Scenario 1: Job on different worker The job continues running. New leader checks job store before scheduling and sees execution is in-progress with recent heartbeat. It skips scheduling until that execution completes or times out.

Scenario 2: Job on same server as leader Both crash. The execution record shows 'running' but heartbeat stops. Stuck job detector (on new leader) finds it after threshold, marks it failed, and reschedules.

Scenario 3: Job was just scheduled The job message is in the queue. Workers have the fencing token from old leader. New leader has new token. Workers should accept the job because it has the token that was valid when scheduled.

Key protections:

  1. Persistent execution records β€” not just in-memory
  2. Heartbeats β€” distinguish 'running' from 'stuck'
  3. Fencing tokens β€” prevent stale leaders from interfering
  4. Idempotent jobs β€” safe to re-run if uncertain

I'd also have comprehensive alerting: alert on stuck jobs, alert on jobs exceeding retry limits, alert on leader failovers."

8.3 Week 2 Synthesis Question

Q: "Walk me through how you'd make a payment processing job reliable."

Strong Answer:

"I'll use all of Week 2's patterns:

Scheduling (Day 5): Job scheduled via distributed cron with leader election. Only one instance schedules. Fencing tokens prevent split-brain.

Execution Setup:

- Execution ID: unique per run
- Idempotency key: derived from job + date
- Timeout: 30 minutes max
- Heartbeat: every 30 seconds

Processing Loop: For each pending payment:

  1. Idempotency check (Day 2): Skip if processed:{payment_id} exists
  2. Call payment provider with circuit breaker (Day 3): If circuit open, pause job
  3. Timeout the call (Day 1): Max 10 seconds per payment
  4. Mark processed (Day 2): Set processed:{payment_id} with TTL
  5. Emit webhook (Day 4): payment.completed event
  6. Checkpoint (Day 5): Save last_payment_id for resume

Failure Handling:

  • Timeout β†’ Log, checkpoint, continue
  • Circuit open β†’ Checkpoint, pause, retry later
  • Crash β†’ Resume from checkpoint, idempotency prevents duplicates

Monitoring:

  • Track: success rate, latency, circuit state
  • Alert: high failure rate, stuck jobs, circuit open
  • Dashboard: pending payments, processing rate, DLQ depth"

Chapter 9: Week 2 Checklist

Concepts You Should Master

  • Explain why timeouts are essential in distributed systems
  • Implement timeout budgets that propagate through service calls
  • Design idempotency key strategies for different use cases
  • Explain the three states of a circuit breaker
  • Choose between count-based and rate-based circuit breakers
  • Design at-least-once webhook delivery with retries
  • Implement HMAC signature verification
  • Explain why exactly-once is impossible and what to do instead
  • Implement leader election using etcd or ZooKeeper
  • Explain fencing tokens and their role in preventing split-brain
  • Design a distributed job scheduler
  • Handle "leader dies mid-job" scenario

Implementations You Should Be Able to Code

  • Timeout budget wrapper
  • Idempotency store with Redis
  • Circuit breaker with three states
  • Webhook signature generation and verification
  • Retry scheduler with exponential backoff
  • Leader election client
  • Job executor with heartbeats
  • Stuck job detector

Trade-offs You Should Articulate

Decision Trade-off
Timeout duration Too short = false failures; Too long = resource waste
Idempotency TTL Too short = duplicates; Too long = storage cost
Circuit threshold Too low = false opens; Too high = slow detection
Retry count Too few = lost events; Too many = delayed failures
Leader lease TTL Too short = flapping; Too long = slow failover
Heartbeat interval Too frequent = overhead; Too infrequent = slow detection

Part V: Resources and Further Learning

Books

  • "Designing Data-Intensive Applications" by Martin Kleppmann

    • Chapter 8: The Trouble with Distributed Systems
    • Chapter 9: Consistency and Consensus
    • Essential reading for understanding distributed coordination
  • "Database Internals" by Alex Petrov

    • Deep dive into consensus algorithms
    • Leader election implementations
  • "Site Reliability Engineering" by Google

    • Chapter 22: Addressing Cascading Failures
    • Chapter 23: Managing Critical State

Papers

Tools and Libraries

Online Resources

  • Jepsen: https://jepsen.io/ - Distributed systems testing
  • Fly.io Blog: Excellent distributed systems articles
  • Cloudflare Blog: Real-world distributed systems stories

Exercises

Exercise 1: Implement Leader Election

Build a leader election system using Redis:

  • Use SETNX for atomic leader claim
  • Implement lease renewal
  • Handle leader failure detection
  • Add fencing tokens

Exercise 2: Build Distributed Cron

Create a complete distributed cron system:

  • Leader election for scheduler
  • Job queue for workers
  • At-least-once execution
  • Stuck job detection
  • Comprehensive logging

Exercise 3: Chaos Testing

Write chaos tests for your distributed cron:

  • Kill leader mid-job
  • Network partition simulation
  • Clock skew injection
  • Verify no duplicate executions

Week 2 Conclusion

What We've Accomplished

This week transformed how you think about distributed systems:

Week 1: Foundations of Scale
  "How do I handle more traffic?"

Week 2: Failure-First Design
  "How do I handle failures gracefully?"

You now have a complete toolkit for building reliable systems:

Day Pattern Superpower
1 Timeouts Never wait forever
2 Idempotency Safe to retry anything
3 Circuit Breakers Fail fast, protect the system
4 Webhooks Reliable async communication
5 Distributed Cron Exactly-once job execution

Looking Ahead

Week 3 will build on this foundation as we dive into Building Blocks of Distributed Systems:

  • Consistent hashing
  • Replication strategies
  • Distributed transactions
  • CAP theorem in practice

The failure-first mindset you've developed this week will be essential as we tackle these more complex topics.

Congratulations on completing Week 2! πŸŽ‰


End of Week 2 β€” Day 5: Distributed Cron

You've completed the "Failure-First Design" week. Next up: Week 3 β€” Building Blocks of Distributed Systems.