3 min read
On this page

Distributed Coordination

Leader Election

Many distributed systems designate a single leader for coordination. The challenge: electing exactly one leader and handling leader failures.

Bully Algorithm

The process with the highest ID wins. When a process detects the leader is down, it initiates an election.

Processes: P1, P2, P3, P4, P5 (P5 was leader, crashed)

P3 detects P5 is down, starts election:

P3 ──Election──> P4     (sends to higher-ID processes)
P3 ──Election──> P5     (no response, crashed)

P4 ──OK──> P3            (P4 takes over election)
P4 ──Election──> P5      (no response)

P4: no higher process responds
P4 ──Coordinator──> P1, P2, P3   ("I am the leader")

Complexity: O(n^2) messages worst case

Problems: Assumes reliable failure detection. A slow process can be mistakenly declared dead, leading to two leaders.

Ring Algorithm

Processes arranged in a logical ring. Election message circulates, collecting IDs.

Ring: P1 → P2 → P3 → P4 → P5 → P1

P3 starts election:
  P3 sends [3] → P4
  P4 adds ID: [3,4] → P5
  P5 adds ID: [3,4,5] → P1
  P1 adds ID: [3,4,5,1] → P2
  P2 adds ID: [3,4,5,1,2] → P3
  P3 sees its own ID, election complete
  Leader = max(3,4,5,1,2) = P5

  P3 sends "P5 is leader" around the ring

Raft Leader Election (Production-Grade)

See consensus chapter for full details. Key properties: randomized timeouts prevent split votes, term numbers prevent stale leaders.

Term 1: P1 is leader
         │
    P1 crashes
         │
Term 2: P3 times out first (random 150-300ms)
         │
    P3: RequestVote(term=2) → P2, P4, P5
    P2, P4: VoteGranted (majority = 3/5)
    P3 becomes leader of term 2
         │
    P1 recovers, still thinks it's leader (term 1)
    P1 sends AppendEntries(term=1) → rejected (term < 2)
    P1 steps down to follower

Distributed Locking

ZooKeeper Recipes

ZooKeeper provides ephemeral sequential znodes. A lock is implemented by creating a sequential znode under a lock path and watching the predecessor.

Lock path: /locks/resource-1

Client A creates: /locks/resource-1/lock-0000000001  (ephemeral)
Client B creates: /locks/resource-1/lock-0000000002  (ephemeral)
Client C creates: /locks/resource-1/lock-0000000003  (ephemeral)

Client A: lowest sequence number → acquires lock
Client B: watches lock-0000000001 (A's node)
Client C: watches lock-0000000002 (B's node)

A releases (deletes node or session expires):
  B gets notified → B is now lowest → acquires lock

This creates a fair, ordered queue. Each client watches only its predecessor (avoids herd effect).

etcd Distributed Locking

Uses lease-based locking with revision-based ordering.

// Conceptual etcd lock implementation
STRUCTURE EtcdLock
    client: EtcdClient
    key: string
    lease_id: integer
    lease_ttl: integer

ASYNC PROCEDURE ACQUIRE(lock) → success or error
    // 1. Create lease
    lock.lease_id ← AWAIT lock.client.LEASE_GRANT(lock.lease_ttl)

    // 2. Put key with lease (if not exists)
    result ← AWAIT lock.client.TRANSACTION(
        WHEN: VERSION(lock.key) = 0,
        THEN: PUT(lock.key, "locked", lease ← lock.lease_id)
    )

    // 3. If key already exists, watch for deletion
    IF NOT result.succeeded THEN
        AWAIT lock.client.WATCH(lock.key, filter ← DELETE)
        // Retry after deletion event

    // 4. Keep-alive refreshes lease
    AWAIT lock.client.LEASE_KEEP_ALIVE(lock.lease_id)

ASYNC PROCEDURE RELEASE(lock) → success or error
    AWAIT lock.client.DELETE(lock.key)
    AWAIT lock.client.LEASE_REVOKE(lock.lease_id)

Redlock (Redis Distributed Lock)

Martin Kleppmann's critique notwithstanding, Redlock is widely used. It acquires locks on a majority of independent Redis instances.

5 independent Redis masters: R1, R2, R3, R4, R5

Lock acquisition:
  1. Get current time T1
  2. Try SET key value NX PX ttl on all 5 instances
  3. Get current time T2
  4. Lock acquired if:
     - Majority (≥3) succeeded
     - Elapsed time (T2 - T1) < ttl
     - Remaining validity = ttl - (T2 - T1)

     R1: SET ✓    R2: SET ✓    R3: SET ✗
     R4: SET ✓    R5: SET ✓

     4/5 > 3 → Lock acquired ✓

Unlock: DEL key on ALL instances (even those that failed)

Criticism (Kleppmann): Redlock relies on timing assumptions. GC pauses, clock jumps, or network delays can cause safety violations. Use fencing tokens for safety-critical applications.

Fencing token approach:
  Lock grants monotonically increasing token T

  Client A acquires lock, gets token T=33
  Client A pauses (GC)
  Lock expires
  Client B acquires lock, gets token T=34
  Client A resumes, tries to write with T=33
  Storage rejects write: T=33 < current fence T=34

Service Discovery

How services find each other in a dynamic environment.

  ┌─────────────────────────────────────┐
  │       Service Registry              │
  │   (Consul / etcd / ZooKeeper)       │
  │                                     │
  │   auth-service:                     │
  │     - 10.0.1.5:8080 (healthy)      │
  │     - 10.0.1.6:8080 (healthy)      │
  │     - 10.0.1.7:8080 (unhealthy)    │
  │                                     │
  │   order-service:                    │
  │     - 10.0.2.3:9090 (healthy)      │
  └────────┬──────────────┬─────────────┘
           │              │
    Register/Heartbeat   Query
           │              │
      ┌────┘              └────┐
  Service Instance        Client/Gateway

Patterns:

  • Client-side discovery: Client queries registry, picks instance, connects directly
  • Server-side discovery: Load balancer queries registry, routes request
  • DNS-based: Consul DNS, CoreDNS -- services resolved via DNS lookup

Membership and Failure Detection

SWIM (Scalable Weakly-consistent Infection-style Membership)

Gossip-based failure detection with O(1) message overhead per member per protocol period.

Protocol Period:

  Node A randomly selects Node B:
    A ──ping──> B
    B ──ack───> A     (B is alive ✓)

  If B doesn't respond:
    A asks k random nodes to probe B (indirect ping):
    A ──ping-req(B)──> C
    C ──ping──> B
    B ──ack───> C
    C ──ack───> A     (B is alive, just A→B link failed)

  If still no response:
    A marks B as "suspected"
    After timeout → A marks B as "failed"
    Disseminate via piggybacked gossip
State Dissemination (piggyback on protocol messages):

  Every ping/ack message carries a bounded list of
  recent membership changes:

  ping(to=B, piggyback=[
    {node: C, status: alive, incarnation: 5},
    {node: D, status: failed, incarnation: 3},
  ])

  Infection-style: O(log n) rounds to reach all members

Gossip Protocol (Epidemic Broadcast)

Each node periodically selects a random peer and exchanges state.

Round 1:  A knows {x=1}
          A tells B → B knows {x=1}

Round 2:  A tells D → D knows {x=1}
          B tells C → C knows {x=1}

Round 3:  All 4 nodes know {x=1}

Convergence: O(log n) rounds for n nodes
Message complexity: O(n log n) total messages
STRUCTURE GossipNode
    id: integer
    members: map of integer → MemberState
    peers: list of integer
    fanout: integer   // number of peers to contact per round

STRUCTURE MemberState
    status: ALIVE | SUSPECTED | FAILED
    incarnation: integer
    last_updated: integer

PROCEDURE GOSSIP_ROUND(node) → list of (peer_id, state_snapshot)
    targets ← RANDOM_SAMPLE(node.peers, count ← node.fanout)
    messages ← empty list
    FOR EACH peer IN targets DO
        APPEND (peer, COPY(node.members)) TO messages
    RETURN messages

PROCEDURE RECEIVE_GOSSIP(node, remote_state)
    FOR EACH (id, remote) IN remote_state DO
        IF id NOT IN node.members THEN
            node.members[id] ← remote
        ELSE
            local ← node.members[id]
            IF remote.incarnation > local.incarnation THEN
                node.members[id] ← remote

Failure Detection

Heartbeat-Based

Simple:
  Every node sends heartbeat to monitor every T seconds
  If no heartbeat for k*T seconds → suspected failure

  Problem: n^2 messages, single monitor = SPOF

Gossip-based heartbeats:
  Each node increments local heartbeat counter
  Gossip propagates counters
  If counter hasn't increased in T_fail → suspected
  If suspected for T_cleanup → declared failed

Phi Accrual Failure Detector

Instead of binary alive/dead, outputs a suspicion level (phi) based on heartbeat arrival statistics.

Track heartbeat inter-arrival times:
  Δ1=100ms, Δ2=102ms, Δ3=98ms, Δ4=150ms, Δ5=99ms

  Mean μ, Variance σ² computed from recent samples

  φ = -log10(P(Δ > t_now - t_last))

  φ = 1 → P(mistake) = 10%
  φ = 2 → P(mistake) = 1%
  φ = 3 → P(mistake) = 0.1%

  Application sets threshold:
    Low threshold (φ=1): faster detection, more false positives
    High threshold (φ=8): slower detection, fewer false positives

Used by Akka, Cassandra, and other systems requiring adaptive failure detection.

Coordination Service Comparison

| Feature | ZooKeeper | etcd | Consul | |---|---|---|---| | Consensus | Zab (Paxos-like) | Raft | Raft | | Data model | Hierarchical (znodes) | Flat key-value | Key-value + services | | Watch mechanism | One-time watches | Continuous watch streams | Blocking queries | | Language | Java | Go | Go | | Health checking | Application-level | Lease TTL | Built-in (HTTP, TCP, script) | | Use case | Hadoop, Kafka, HBase | Kubernetes | Service mesh, discovery |

Real-World Applications

| Pattern | System | Implementation | |---|---|---| | Leader election | Kafka | ZooKeeper-based (moving to KRaft) | | Distributed lock | Google Chubby | Paxos-based lock service | | Service discovery | Kubernetes | etcd + CoreDNS | | Membership | Cassandra | Gossip (modified SWIM) | | Failure detection | Akka Cluster | Phi accrual detector | | Configuration | Spring Cloud | Consul / ZooKeeper |

Key Takeaways

  • Leader election requires careful handling of network partitions to avoid split-brain. Raft's term-based approach is the most robust practical solution.
  • Distributed locks are fundamentally limited by FLP. Lease-based locks with fencing tokens are the safest practical approach.
  • SWIM gossip provides scalable membership with O(1) per-node overhead per protocol period, making it suitable for large clusters.
  • Phi accrual failure detectors adapt to network conditions automatically, avoiding the fixed-timeout problem of binary detectors.
  • Use established coordination services (etcd, ZooKeeper, Consul) rather than building your own -- the subtleties of distributed coordination are a minefield.