4 min read
On this page

Distributed Computing Frameworks

MapReduce

Programming model for processing large datasets across clusters (Google, 2004).

Execution Model

Input -> Split -> Map -> Shuffle/Sort -> Reduce -> Output

Map:    (key1, value1) -> list(key2, value2)
Reduce: (key2, list(value2)) -> list(key3, value3)

Example: Word Count

def map(document):
    for word in document.split():
        emit(word, 1)

def reduce(word, counts):
    emit(word, sum(counts))

Execution Phases

  1. Input splitting: data divided into splits (typically 64-128 MB)
  2. Map phase: each split processed by a map task in parallel
  3. Combiner (optional): local reduce on map output to reduce network traffic
  4. Shuffle and sort: framework groups all values by key across the cluster
  5. Reduce phase: each unique key group processed by a reduce task
  6. Output: results written to distributed filesystem

Fault Tolerance

  • Map/reduce tasks are idempotent (deterministic output from input)
  • Failed tasks are re-executed on other nodes
  • Intermediate data written to local disk
  • Speculative execution: duplicate slow tasks ("stragglers") on other nodes

Limitations

  • High latency: disk I/O between every stage
  • Only supports map and reduce operations (awkward for iterative algorithms)
  • No support for streaming or interactive queries
  • Led to development of more general frameworks (Spark, Flink)

Apache Spark

Unified analytics engine providing in-memory distributed computing.

Resilient Distributed Datasets (RDDs)

Immutable, partitioned collections that can be operated on in parallel.

rdd = sc.textFile("hdfs://data.txt")     # create from file
rdd2 = rdd.flatMap(lambda line: line.split()) \
           .map(lambda word: (word, 1)) \
           .reduceByKey(lambda a, b: a + b)
rdd2.collect()  # action triggers computation

Lazy Evaluation

Transformations are lazy (recorded as a DAG). Actions trigger execution.

| Transformations (lazy) | Actions (eager) | |------------------------|-----------------| | map, flatMap, filter | collect, count, reduce | | union, join, groupByKey | saveAsTextFile, take | | reduceByKey, sortByKey | foreach, first |

Fault Tolerance via Lineage

RDDs track their lineage (sequence of transformations). If a partition is lost, Spark recomputes it from the lineage rather than replicating data.

DataFrames and Datasets

Higher-level API with schema information and optimizations.

df = spark.read.json("people.json")
df.filter(df.age > 21) \
  .groupBy("city") \
  .agg({"salary": "avg"}) \
  .show()

DataFrames are compiled into optimized execution plans, often faster than hand-written RDD code.

Catalyst Optimizer

Spark's query optimization engine for DataFrames/SQL.

Optimization pipeline:
  1. Analysis: resolve references, types
  2. Logical optimization: predicate pushdown, constant folding,
     column pruning, join reordering
  3. Physical planning: select join strategies (broadcast, sort-merge, shuffle hash)
  4. Code generation: Whole-Stage CodeGen compiles query stages to Java bytecode

Key optimizations:

  • Predicate pushdown: push filters to data source (skip reading unnecessary data)
  • Column pruning: read only needed columns from columnar formats (Parquet)
  • Broadcast join: broadcast small table to all nodes to avoid shuffle
  • Adaptive Query Execution (AQE): re-optimize plan at runtime based on statistics

Structured Streaming

Unified batch and streaming API treating streams as unbounded tables.

stream_df = spark.readStream \
    .format("kafka") \
    .option("subscribe", "topic") \
    .load()

result = stream_df.groupBy("key").count()

query = result.writeStream \
    .outputMode("update") \
    .format("console") \
    .trigger(processingTime="10 seconds") \
    .start()

Processing modes:

  • Micro-batch: processes data in small batches (100ms+ latency)
  • Continuous processing: experimental, ~1ms latency

Stream-first distributed processing engine with true event-time semantics.

Core Concepts

Source -> Transformations -> Sink

DataStream API (streaming):
  stream.keyBy(x -> x.userId)
        .window(TumblingEventTimeWindows.of(Time.minutes(5)))
        .reduce((a, b) -> a.merge(b))
        .addSink(sink);

Event Time vs Processing Time

| Concept | Definition | |---------|-----------| | Event time | When the event actually occurred (embedded in data) | | Ingestion time | When the event entered Flink | | Processing time | When Flink processes the event |

Event-time processing is critical for correctness with out-of-order data.

Watermarks

A watermark W(t) asserts that no events with timestamp <= t will arrive in the future.

Event stream:  [t=3] [t=1] [t=5] [W=5] [t=7] [t=6] [W=7]
                                    ^
                    Window [0,5) can now close and emit results

Watermark strategies:
  - Periodic: emit watermark every N ms based on observed max timestamp
  - Punctuated: emit watermark based on special events in the stream
  - Custom: application-defined logic

Late events (arriving after watermark) can be handled via:

  • Allowed lateness: keep window state longer, update results
  • Side outputs: route late events to a separate stream
  • Drop: ignore late events

State Management

Flink provides managed state with exactly-once guarantees.

Types:
  - ValueState<T>: single value per key
  - ListState<T>: list of values per key
  - MapState<K, V>: map per key
  - ReducingState<T>: aggregated value per key

State backends:
  - HashMapStateBackend: in-memory (fast, limited by RAM)
  - EmbeddedRocksDBStateBackend: on-disk (larger state, slower)

Checkpointing

Flink uses the Chandy-Lamport algorithm for distributed snapshots.

  1. Job manager injects checkpoint barrier into all sources
  2. Barriers flow through the DAG with data
  3. When an operator receives barriers from all inputs, it snapshots its state
  4. Upon completion, checkpoint is committed (state + source offsets)

Recovery: restore state from latest checkpoint, replay from saved source offsets.

| Aspect | Flink | Spark Structured Streaming | |--------|-------|---------------------------| | Model | True streaming | Micro-batch (default) | | Latency | Milliseconds | 100ms+ (micro-batch) | | State | Managed, keyed | Limited stateful ops | | Event time | First-class | Supported | | Throughput | High | Very high | | Ecosystem | Growing | Mature |

Ray

Distributed computing framework for ML and general-purpose parallel/distributed Python.

import ray

ray.init()

@ray.remote
def compute(x):
    return x * x

# Launch tasks
futures = [compute.remote(i) for i in range(100)]
results = ray.get(futures)

# Actors (stateful)
@ray.remote
class Counter:
    def __init__(self):
        self.count = 0
    def increment(self):
        self.count += 1
        return self.count

counter = Counter.remote()
ray.get(counter.increment.remote())

Components

  • Ray Core: task and actor scheduling, object store
  • Ray Tune: hyperparameter tuning
  • Ray Train: distributed training
  • Ray Serve: model serving
  • Ray Data: data loading and preprocessing

Object Store

Shared-memory object store (Apache Arrow-based) enables zero-copy reads between tasks on the same node. Large objects are stored in the object store and passed by reference.

Dask

Parallel computing library for Python, scales NumPy/Pandas workflows.

import dask.dataframe as dd

df = dd.read_parquet("s3://bucket/data/")
result = df.groupby("category").value.mean().compute()

Collections

  • dask.array: parallel NumPy (blocked algorithms)
  • dask.dataframe: parallel Pandas (partitioned by rows)
  • dask.bag: parallel Python lists (for unstructured data)
  • dask.delayed: custom task graphs

Scheduler

  • Single-machine: threaded or multiprocess scheduler
  • Distributed: dask.distributed with a central scheduler and workers

Dask is lighter weight than Spark, integrates natively with the Python ecosystem, and works well from laptop to cluster.

MPI (Message Passing Interface)

The standard for distributed-memory parallel programming in HPC.

Core Operations

MPI_Init(&argc, &argv);

int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);

// Point-to-point
MPI_Send(buf, count, MPI_FLOAT, dest, tag, MPI_COMM_WORLD);
MPI_Recv(buf, count, MPI_FLOAT, source, tag, MPI_COMM_WORLD, &status);

// Non-blocking
MPI_Isend(buf, count, MPI_FLOAT, dest, tag, comm, &request);
MPI_Irecv(buf, count, MPI_FLOAT, source, tag, comm, &request);
MPI_Wait(&request, &status);

MPI_Finalize();

Collective Operations

MPI_Bcast(buf, count, type, root, comm);          // one-to-all
MPI_Reduce(sendbuf, recvbuf, count, type, op,     // all-to-one
           root, comm);
MPI_Allreduce(sendbuf, recvbuf, count, type, op,  // all-to-all reduce
              comm);
MPI_Scatter(sendbuf, sendcount, sendtype,          // distribute
            recvbuf, recvcount, recvtype, root, comm);
MPI_Gather(sendbuf, sendcount, sendtype,           // collect
           recvbuf, recvcount, recvtype, root, comm);
MPI_Alltoall(sendbuf, sendcount, sendtype,         // transpose
             recvbuf, recvcount, recvtype, comm);
MPI_Barrier(comm);                                  // synchronization

Communicators and Topologies

MPI_Comm_split(MPI_COMM_WORLD, color, key, &new_comm);  // sub-groups
MPI_Cart_create(comm, ndims, dims, periods, reorder,     // Cartesian topology
                &cart_comm);

One-Sided Communication (RMA)

MPI_Win_create(base, size, disp_unit, info, comm, &win);
MPI_Win_lock(MPI_LOCK_SHARED, target_rank, 0, win);
MPI_Get(origin, count, type, target_rank, offset, count, type, win);
MPI_Win_unlock(target_rank, win);
MPI_Win_free(&win);

Enables RDMA-style access without explicit matching sends/receives. Essential for irregular communication patterns.

MPI in Practice

  • Dominant in scientific computing and HPC
  • Typically combined with OpenMP for hybrid parallelism (MPI between nodes, OpenMP within nodes)
  • Implementations: Open MPI, MPICH, Intel MPI
  • Scales to millions of processes on supercomputers