Distributed Coordination
Leader Election
Many distributed systems designate a single leader for coordination. The challenge: electing exactly one leader and handling leader failures.
Bully Algorithm
The process with the highest ID wins. When a process detects the leader is down, it initiates an election.
Processes: P1, P2, P3, P4, P5 (P5 was leader, crashed)
P3 detects P5 is down, starts election:
P3 ──Election──> P4 (sends to higher-ID processes)
P3 ──Election──> P5 (no response, crashed)
P4 ──OK──> P3 (P4 takes over election)
P4 ──Election──> P5 (no response)
P4: no higher process responds
P4 ──Coordinator──> P1, P2, P3 ("I am the leader")
Complexity: O(n^2) messages worst case
Problems: Assumes reliable failure detection. A slow process can be mistakenly declared dead, leading to two leaders.
Ring Algorithm
Processes arranged in a logical ring. Election message circulates, collecting IDs.
Ring: P1 → P2 → P3 → P4 → P5 → P1
P3 starts election:
P3 sends [3] → P4
P4 adds ID: [3,4] → P5
P5 adds ID: [3,4,5] → P1
P1 adds ID: [3,4,5,1] → P2
P2 adds ID: [3,4,5,1,2] → P3
P3 sees its own ID, election complete
Leader = max(3,4,5,1,2) = P5
P3 sends "P5 is leader" around the ring
Raft Leader Election (Production-Grade)
See consensus chapter for full details. Key properties: randomized timeouts prevent split votes, term numbers prevent stale leaders.
Term 1: P1 is leader
│
P1 crashes
│
Term 2: P3 times out first (random 150-300ms)
│
P3: RequestVote(term=2) → P2, P4, P5
P2, P4: VoteGranted (majority = 3/5)
P3 becomes leader of term 2
│
P1 recovers, still thinks it's leader (term 1)
P1 sends AppendEntries(term=1) → rejected (term < 2)
P1 steps down to follower
Distributed Locking
ZooKeeper Recipes
ZooKeeper provides ephemeral sequential znodes. A lock is implemented by creating a sequential znode under a lock path and watching the predecessor.
Lock path: /locks/resource-1
Client A creates: /locks/resource-1/lock-0000000001 (ephemeral)
Client B creates: /locks/resource-1/lock-0000000002 (ephemeral)
Client C creates: /locks/resource-1/lock-0000000003 (ephemeral)
Client A: lowest sequence number → acquires lock
Client B: watches lock-0000000001 (A's node)
Client C: watches lock-0000000002 (B's node)
A releases (deletes node or session expires):
B gets notified → B is now lowest → acquires lock
This creates a fair, ordered queue. Each client watches only its predecessor (avoids herd effect).
etcd Distributed Locking
Uses lease-based locking with revision-based ordering.
// Conceptual etcd lock implementation
STRUCTURE EtcdLock
client: EtcdClient
key: string
lease_id: integer
lease_ttl: integer
ASYNC PROCEDURE ACQUIRE(lock) → success or error
// 1. Create lease
lock.lease_id ← AWAIT lock.client.LEASE_GRANT(lock.lease_ttl)
// 2. Put key with lease (if not exists)
result ← AWAIT lock.client.TRANSACTION(
WHEN: VERSION(lock.key) = 0,
THEN: PUT(lock.key, "locked", lease ← lock.lease_id)
)
// 3. If key already exists, watch for deletion
IF NOT result.succeeded THEN
AWAIT lock.client.WATCH(lock.key, filter ← DELETE)
// Retry after deletion event
// 4. Keep-alive refreshes lease
AWAIT lock.client.LEASE_KEEP_ALIVE(lock.lease_id)
ASYNC PROCEDURE RELEASE(lock) → success or error
AWAIT lock.client.DELETE(lock.key)
AWAIT lock.client.LEASE_REVOKE(lock.lease_id)
Redlock (Redis Distributed Lock)
Martin Kleppmann's critique notwithstanding, Redlock is widely used. It acquires locks on a majority of independent Redis instances.
5 independent Redis masters: R1, R2, R3, R4, R5
Lock acquisition:
1. Get current time T1
2. Try SET key value NX PX ttl on all 5 instances
3. Get current time T2
4. Lock acquired if:
- Majority (≥3) succeeded
- Elapsed time (T2 - T1) < ttl
- Remaining validity = ttl - (T2 - T1)
R1: SET ✓ R2: SET ✓ R3: SET ✗
R4: SET ✓ R5: SET ✓
4/5 > 3 → Lock acquired ✓
Unlock: DEL key on ALL instances (even those that failed)
Criticism (Kleppmann): Redlock relies on timing assumptions. GC pauses, clock jumps, or network delays can cause safety violations. Use fencing tokens for safety-critical applications.
Fencing token approach:
Lock grants monotonically increasing token T
Client A acquires lock, gets token T=33
Client A pauses (GC)
Lock expires
Client B acquires lock, gets token T=34
Client A resumes, tries to write with T=33
Storage rejects write: T=33 < current fence T=34
Service Discovery
How services find each other in a dynamic environment.
┌─────────────────────────────────────┐
│ Service Registry │
│ (Consul / etcd / ZooKeeper) │
│ │
│ auth-service: │
│ - 10.0.1.5:8080 (healthy) │
│ - 10.0.1.6:8080 (healthy) │
│ - 10.0.1.7:8080 (unhealthy) │
│ │
│ order-service: │
│ - 10.0.2.3:9090 (healthy) │
└────────┬──────────────┬─────────────┘
│ │
Register/Heartbeat Query
│ │
┌────┘ └────┐
Service Instance Client/Gateway
Patterns:
- Client-side discovery: Client queries registry, picks instance, connects directly
- Server-side discovery: Load balancer queries registry, routes request
- DNS-based: Consul DNS, CoreDNS -- services resolved via DNS lookup
Membership and Failure Detection
SWIM (Scalable Weakly-consistent Infection-style Membership)
Gossip-based failure detection with O(1) message overhead per member per protocol period.
Protocol Period:
Node A randomly selects Node B:
A ──ping──> B
B ──ack───> A (B is alive ✓)
If B doesn't respond:
A asks k random nodes to probe B (indirect ping):
A ──ping-req(B)──> C
C ──ping──> B
B ──ack───> C
C ──ack───> A (B is alive, just A→B link failed)
If still no response:
A marks B as "suspected"
After timeout → A marks B as "failed"
Disseminate via piggybacked gossip
State Dissemination (piggyback on protocol messages):
Every ping/ack message carries a bounded list of
recent membership changes:
ping(to=B, piggyback=[
{node: C, status: alive, incarnation: 5},
{node: D, status: failed, incarnation: 3},
])
Infection-style: O(log n) rounds to reach all members
Gossip Protocol (Epidemic Broadcast)
Each node periodically selects a random peer and exchanges state.
Round 1: A knows {x=1}
A tells B → B knows {x=1}
Round 2: A tells D → D knows {x=1}
B tells C → C knows {x=1}
Round 3: All 4 nodes know {x=1}
Convergence: O(log n) rounds for n nodes
Message complexity: O(n log n) total messages
STRUCTURE GossipNode
id: integer
members: map of integer → MemberState
peers: list of integer
fanout: integer // number of peers to contact per round
STRUCTURE MemberState
status: ALIVE | SUSPECTED | FAILED
incarnation: integer
last_updated: integer
PROCEDURE GOSSIP_ROUND(node) → list of (peer_id, state_snapshot)
targets ← RANDOM_SAMPLE(node.peers, count ← node.fanout)
messages ← empty list
FOR EACH peer IN targets DO
APPEND (peer, COPY(node.members)) TO messages
RETURN messages
PROCEDURE RECEIVE_GOSSIP(node, remote_state)
FOR EACH (id, remote) IN remote_state DO
IF id NOT IN node.members THEN
node.members[id] ← remote
ELSE
local ← node.members[id]
IF remote.incarnation > local.incarnation THEN
node.members[id] ← remote
Failure Detection
Heartbeat-Based
Simple:
Every node sends heartbeat to monitor every T seconds
If no heartbeat for k*T seconds → suspected failure
Problem: n^2 messages, single monitor = SPOF
Gossip-based heartbeats:
Each node increments local heartbeat counter
Gossip propagates counters
If counter hasn't increased in T_fail → suspected
If suspected for T_cleanup → declared failed
Phi Accrual Failure Detector
Instead of binary alive/dead, outputs a suspicion level (phi) based on heartbeat arrival statistics.
Track heartbeat inter-arrival times:
Δ1=100ms, Δ2=102ms, Δ3=98ms, Δ4=150ms, Δ5=99ms
Mean μ, Variance σ² computed from recent samples
φ = -log10(P(Δ > t_now - t_last))
φ = 1 → P(mistake) = 10%
φ = 2 → P(mistake) = 1%
φ = 3 → P(mistake) = 0.1%
Application sets threshold:
Low threshold (φ=1): faster detection, more false positives
High threshold (φ=8): slower detection, fewer false positives
Used by Akka, Cassandra, and other systems requiring adaptive failure detection.
Coordination Service Comparison
| Feature | ZooKeeper | etcd | Consul | |---|---|---|---| | Consensus | Zab (Paxos-like) | Raft | Raft | | Data model | Hierarchical (znodes) | Flat key-value | Key-value + services | | Watch mechanism | One-time watches | Continuous watch streams | Blocking queries | | Language | Java | Go | Go | | Health checking | Application-level | Lease TTL | Built-in (HTTP, TCP, script) | | Use case | Hadoop, Kafka, HBase | Kubernetes | Service mesh, discovery |
Real-World Applications
| Pattern | System | Implementation | |---|---|---| | Leader election | Kafka | ZooKeeper-based (moving to KRaft) | | Distributed lock | Google Chubby | Paxos-based lock service | | Service discovery | Kubernetes | etcd + CoreDNS | | Membership | Cassandra | Gossip (modified SWIM) | | Failure detection | Akka Cluster | Phi accrual detector | | Configuration | Spring Cloud | Consul / ZooKeeper |
Key Takeaways
- Leader election requires careful handling of network partitions to avoid split-brain. Raft's term-based approach is the most robust practical solution.
- Distributed locks are fundamentally limited by FLP. Lease-based locks with fencing tokens are the safest practical approach.
- SWIM gossip provides scalable membership with O(1) per-node overhead per protocol period, making it suitable for large clusters.
- Phi accrual failure detectors adapt to network conditions automatically, avoiding the fixed-timeout problem of binary detectors.
- Use established coordination services (etcd, ZooKeeper, Consul) rather than building your own -- the subtleties of distributed coordination are a minefield.