3 min read
On this page

Distributed Database Architecture

Overview

Distributed databases partition data across multiple nodes to achieve horizontal scalability, fault tolerance, and geographic distribution. The core challenges are data placement, query routing, rebalancing, and maintaining consistency across partitions.


Shared-Nothing Architecture

In shared-nothing systems, each node has its own CPU, memory, and storage. Nodes communicate only via the network. This eliminates contention on shared resources and enables linear scalability.

+--------+    +--------+    +--------+
| Node 1 |    | Node 2 |    | Node 3 |
| CPU/Mem|    | CPU/Mem|    | CPU/Mem|
| Disk   |    | Disk   |    | Disk   |
+--------+    +--------+    +--------+
     \            |            /
      \           |           /
       +-----Network-----+

Contrast with:
- Shared-disk: nodes share storage (Oracle RAC, Aurora)
- Shared-memory: nodes share RAM (SMP systems)

Shared-nothing is the dominant architecture for modern distributed databases (CockroachDB, TiDB, Cassandra, Spanner).


Sharding Strategies

Hash Sharding

Apply a hash function to the shard key to determine partition assignment. Provides uniform distribution but sacrifices range query efficiency.

# Consistent hashing variant
def get_shard(key, num_shards):
    hash_val = murmurhash3(key)
    return hash_val % num_shards  # simple modulo

# Problem: adding a shard remaps ~all keys
# Solution: consistent hashing with virtual nodes

class ConsistentHashRing:
    def __init__(self, nodes, vnodes=150):
        self.ring = SortedDict()
        for node in nodes:
            for i in range(vnodes):
                token = hash(f"{node}:{i}")
                self.ring[token] = node

    def get_node(self, key):
        token = hash(key)
        idx = self.ring.bisect_right(token) % len(self.ring)
        return self.ring.values()[idx]

Range Sharding

Partition data by contiguous key ranges. Enables efficient range scans but risks hotspots if writes concentrate on recent ranges.

Shard 1: [A, F)     -> node-1
Shard 2: [F, N)     -> node-2
Shard 3: [N, T)     -> node-3
Shard 4: [T, Z]     -> node-4

Range query: WHERE name BETWEEN 'Alice' AND 'Dave'
  -> only hits Shard 1

Hotspot: auto-increment keys always write to last shard
  -> mitigate with key reversal, salting, or hash prefix

Directory-Based Sharding

A lookup service (directory) maps each key or key range to a shard. Maximum flexibility but introduces a single point of coordination.

Directory Table:
+------------------+--------+
| Key Range        | Shard  |
+------------------+--------+
| users:1-10000    | shard1 |
| users:10001-25000| shard2 |
| users:25001-50000| shard3 |
+------------------+--------+

Used by: MongoDB config servers, Vitess VSchema

Partition Rebalancing

When nodes are added or removed, data must be redistributed while maintaining availability.

Strategies

1. Fixed Number of Partitions (Elasticsearch, Riak, CockroachDB):
   - Create many more partitions than nodes (e.g., 256 partitions, 8 nodes)
   - Each node owns ~32 partitions
   - Adding node 9: transfer some partitions from existing nodes
   - Partition boundaries never change, only assignment

2. Dynamic Splitting (HBase, TiKV):
   - Start with one partition per table
   - Split when partition exceeds threshold (e.g., 512MB)
   - Merge when partitions shrink
   - Adapts to data distribution over time

3. Proportional to Node Count (Cassandra):
   - Fixed number of partitions per node
   - Adding a node splits existing partitions
   - Total partitions grow with cluster size

Rebalancing Process

1. Plan phase:   Determine which ranges move where
2. Snapshot:     Take consistent snapshot of source range
3. Transfer:     Stream data to target node
4. Catch-up:     Replay writes that occurred during transfer
5. Cutover:      Atomically update routing metadata
6. Cleanup:      Remove old data from source

Key requirement: serve reads and writes throughout the process

Distributed Query Execution

Query Routing

Approach 1: Client-side routing
  Client -> [partition map] -> correct node
  Used by: Cassandra drivers, Redis Cluster

Approach 2: Proxy/router layer
  Client -> Router -> correct node
  Used by: MongoDB (mongos), Vitess (vtgate), ProxySQL

Approach 3: Any-node routing
  Client -> any node -> forwards to correct node
  Used by: CockroachDB, YugabyteDB

Distributed Joins and Aggregations

-- Query spanning multiple shards
SELECT c.name, SUM(o.amount)
FROM customers c JOIN orders o ON c.id = o.customer_id
WHERE c.region = 'US'
GROUP BY c.name;
Execution Plan:
1. Gateway node receives query
2. Push down: WHERE c.region = 'US' to customer shards
3. Co-located join if customers and orders share shard key
4. Otherwise: broadcast join or hash-repartition join
   - Repartition: redistribute orders by customer_id
   - Each node performs local join on its partition
5. Partial aggregation at each shard (local GROUP BY + SUM)
6. Gateway merges partial results (final aggregation)

Secondary Indexes in Distributed Systems

Local (Partitioned) Secondary Indexes

Each partition maintains its own index over its local data. Writes are fast (single partition), but reads require scatter-gather across all partitions.

Partition 1: data [a-m]     Partition 2: data [n-z]
  local index: color=red      local index: color=red
  -> doc3, doc7                -> doc15, doc22

Query: WHERE color = 'red'
  -> must query ALL partitions (scatter-gather)
  -> merge results

Global Secondary Indexes

The index itself is partitioned (by index term), covering all data partitions. Reads hit a single index partition, but writes must update a remote index partition.

Index Partition A: color [a-m]   Index Partition B: color [n-z]
  "blue"  -> [p1:doc2, p3:doc8]   "red"  -> [p1:doc3, p2:doc15]
  "green" -> [p2:doc11]           "white" -> [p1:doc7]

Query: WHERE color = 'red'
  -> single index partition lookup
  -> then fetch from data partitions p1 and p2

Trade-off: faster reads, but writes require distributed coordination

Production Systems

CockroachDB

  • Architecture: Shared-nothing, range-partitioned (64MB default ranges)
  • Consensus: Raft per range for strong consistency
  • SQL: Full PostgreSQL wire protocol compatibility
  • Transactions: Serializable isolation via MVCC + timestamp ordering
  • Geo-partitioning: Pin ranges to specific regions for data residency
-- CockroachDB geo-partitioning
ALTER TABLE users PARTITION BY LIST (region) (
  PARTITION us_east VALUES IN ('us-east-1'),
  PARTITION eu_west VALUES IN ('eu-west-1')
);
ALTER PARTITION us_east OF TABLE users
  CONFIGURE ZONE USING constraints = '[+region=us-east-1]';

TiDB

  • Architecture: Compute (TiDB) separated from storage (TiKV + TiFlash)
  • TiKV: Distributed key-value using Raft + RocksDB per node
  • TiFlash: Columnar replicas for analytical queries (HTAP)
  • SQL: MySQL compatible, horizontal scaling of both compute and storage

YugabyteDB

  • Architecture: DocDB (distributed document store) using per-tablet Raft
  • APIs: PostgreSQL-compatible (YSQL) and Cassandra-compatible (YCQL)
  • Tablet splitting: Automatic range splitting as data grows
  • Colocation: Small tables share a single tablet to reduce overhead

Vitess

  • Purpose: Horizontal scaling layer for MySQL
  • VTGate: Query router that parses SQL and routes to correct shards
  • VTTablet: Manages a MySQL instance, handles connection pooling
  • VSchema: Declarative sharding configuration
// Vitess VSchema example
{
  "sharded": true,
  "vindexes": {
    "hash": { "type": "hash" }
  },
  "tables": {
    "users": {
      "column_vindexes": [
        { "column": "id", "name": "hash" }
      ]
    }
  }
}

Key Trade-offs

| Decision | Option A | Option B | |----------|----------|----------| | Sharding | Hash (uniform) | Range (range scans) | | Indexes | Local (fast writes) | Global (fast reads) | | Consistency | Strong (latency cost) | Eventual (availability) | | Rebalancing | Fixed partitions (simple) | Dynamic splitting (adaptive) | | Compute-storage | Coupled (locality) | Disaggregated (elasticity) |