Distributed Computing Frameworks
MapReduce
Programming model for processing large datasets across clusters (Google, 2004).
Execution Model
Input -> Split -> Map -> Shuffle/Sort -> Reduce -> Output
Map: (key1, value1) -> list(key2, value2)
Reduce: (key2, list(value2)) -> list(key3, value3)
Example: Word Count
def map(document):
for word in document.split():
emit(word, 1)
def reduce(word, counts):
emit(word, sum(counts))
Execution Phases
- Input splitting: data divided into splits (typically 64-128 MB)
- Map phase: each split processed by a map task in parallel
- Combiner (optional): local reduce on map output to reduce network traffic
- Shuffle and sort: framework groups all values by key across the cluster
- Reduce phase: each unique key group processed by a reduce task
- Output: results written to distributed filesystem
Fault Tolerance
- Map/reduce tasks are idempotent (deterministic output from input)
- Failed tasks are re-executed on other nodes
- Intermediate data written to local disk
- Speculative execution: duplicate slow tasks ("stragglers") on other nodes
Limitations
- High latency: disk I/O between every stage
- Only supports map and reduce operations (awkward for iterative algorithms)
- No support for streaming or interactive queries
- Led to development of more general frameworks (Spark, Flink)
Apache Spark
Unified analytics engine providing in-memory distributed computing.
Resilient Distributed Datasets (RDDs)
Immutable, partitioned collections that can be operated on in parallel.
rdd = sc.textFile("hdfs://data.txt") # create from file
rdd2 = rdd.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
rdd2.collect() # action triggers computation
Lazy Evaluation
Transformations are lazy (recorded as a DAG). Actions trigger execution.
| Transformations (lazy) | Actions (eager) | |------------------------|-----------------| | map, flatMap, filter | collect, count, reduce | | union, join, groupByKey | saveAsTextFile, take | | reduceByKey, sortByKey | foreach, first |
Fault Tolerance via Lineage
RDDs track their lineage (sequence of transformations). If a partition is lost, Spark recomputes it from the lineage rather than replicating data.
DataFrames and Datasets
Higher-level API with schema information and optimizations.
df = spark.read.json("people.json")
df.filter(df.age > 21) \
.groupBy("city") \
.agg({"salary": "avg"}) \
.show()
DataFrames are compiled into optimized execution plans, often faster than hand-written RDD code.
Catalyst Optimizer
Spark's query optimization engine for DataFrames/SQL.
Optimization pipeline:
1. Analysis: resolve references, types
2. Logical optimization: predicate pushdown, constant folding,
column pruning, join reordering
3. Physical planning: select join strategies (broadcast, sort-merge, shuffle hash)
4. Code generation: Whole-Stage CodeGen compiles query stages to Java bytecode
Key optimizations:
- Predicate pushdown: push filters to data source (skip reading unnecessary data)
- Column pruning: read only needed columns from columnar formats (Parquet)
- Broadcast join: broadcast small table to all nodes to avoid shuffle
- Adaptive Query Execution (AQE): re-optimize plan at runtime based on statistics
Structured Streaming
Unified batch and streaming API treating streams as unbounded tables.
stream_df = spark.readStream \
.format("kafka") \
.option("subscribe", "topic") \
.load()
result = stream_df.groupBy("key").count()
query = result.writeStream \
.outputMode("update") \
.format("console") \
.trigger(processingTime="10 seconds") \
.start()
Processing modes:
- Micro-batch: processes data in small batches (100ms+ latency)
- Continuous processing: experimental, ~1ms latency
Apache Flink
Stream-first distributed processing engine with true event-time semantics.
Core Concepts
Source -> Transformations -> Sink
DataStream API (streaming):
stream.keyBy(x -> x.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.reduce((a, b) -> a.merge(b))
.addSink(sink);
Event Time vs Processing Time
| Concept | Definition | |---------|-----------| | Event time | When the event actually occurred (embedded in data) | | Ingestion time | When the event entered Flink | | Processing time | When Flink processes the event |
Event-time processing is critical for correctness with out-of-order data.
Watermarks
A watermark W(t) asserts that no events with timestamp <= t will arrive in the future.
Event stream: [t=3] [t=1] [t=5] [W=5] [t=7] [t=6] [W=7]
^
Window [0,5) can now close and emit results
Watermark strategies:
- Periodic: emit watermark every N ms based on observed max timestamp
- Punctuated: emit watermark based on special events in the stream
- Custom: application-defined logic
Late events (arriving after watermark) can be handled via:
- Allowed lateness: keep window state longer, update results
- Side outputs: route late events to a separate stream
- Drop: ignore late events
State Management
Flink provides managed state with exactly-once guarantees.
Types:
- ValueState<T>: single value per key
- ListState<T>: list of values per key
- MapState<K, V>: map per key
- ReducingState<T>: aggregated value per key
State backends:
- HashMapStateBackend: in-memory (fast, limited by RAM)
- EmbeddedRocksDBStateBackend: on-disk (larger state, slower)
Checkpointing
Flink uses the Chandy-Lamport algorithm for distributed snapshots.
- Job manager injects checkpoint barrier into all sources
- Barriers flow through the DAG with data
- When an operator receives barriers from all inputs, it snapshots its state
- Upon completion, checkpoint is committed (state + source offsets)
Recovery: restore state from latest checkpoint, replay from saved source offsets.
Flink vs Spark Streaming
| Aspect | Flink | Spark Structured Streaming | |--------|-------|---------------------------| | Model | True streaming | Micro-batch (default) | | Latency | Milliseconds | 100ms+ (micro-batch) | | State | Managed, keyed | Limited stateful ops | | Event time | First-class | Supported | | Throughput | High | Very high | | Ecosystem | Growing | Mature |
Ray
Distributed computing framework for ML and general-purpose parallel/distributed Python.
import ray
ray.init()
@ray.remote
def compute(x):
return x * x
# Launch tasks
futures = [compute.remote(i) for i in range(100)]
results = ray.get(futures)
# Actors (stateful)
@ray.remote
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
return self.count
counter = Counter.remote()
ray.get(counter.increment.remote())
Components
- Ray Core: task and actor scheduling, object store
- Ray Tune: hyperparameter tuning
- Ray Train: distributed training
- Ray Serve: model serving
- Ray Data: data loading and preprocessing
Object Store
Shared-memory object store (Apache Arrow-based) enables zero-copy reads between tasks on the same node. Large objects are stored in the object store and passed by reference.
Dask
Parallel computing library for Python, scales NumPy/Pandas workflows.
import dask.dataframe as dd
df = dd.read_parquet("s3://bucket/data/")
result = df.groupby("category").value.mean().compute()
Collections
- dask.array: parallel NumPy (blocked algorithms)
- dask.dataframe: parallel Pandas (partitioned by rows)
- dask.bag: parallel Python lists (for unstructured data)
- dask.delayed: custom task graphs
Scheduler
- Single-machine: threaded or multiprocess scheduler
- Distributed:
dask.distributedwith a central scheduler and workers
Dask is lighter weight than Spark, integrates natively with the Python ecosystem, and works well from laptop to cluster.
MPI (Message Passing Interface)
The standard for distributed-memory parallel programming in HPC.
Core Operations
MPI_Init(&argc, &argv);
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
// Point-to-point
MPI_Send(buf, count, MPI_FLOAT, dest, tag, MPI_COMM_WORLD);
MPI_Recv(buf, count, MPI_FLOAT, source, tag, MPI_COMM_WORLD, &status);
// Non-blocking
MPI_Isend(buf, count, MPI_FLOAT, dest, tag, comm, &request);
MPI_Irecv(buf, count, MPI_FLOAT, source, tag, comm, &request);
MPI_Wait(&request, &status);
MPI_Finalize();
Collective Operations
MPI_Bcast(buf, count, type, root, comm); // one-to-all
MPI_Reduce(sendbuf, recvbuf, count, type, op, // all-to-one
root, comm);
MPI_Allreduce(sendbuf, recvbuf, count, type, op, // all-to-all reduce
comm);
MPI_Scatter(sendbuf, sendcount, sendtype, // distribute
recvbuf, recvcount, recvtype, root, comm);
MPI_Gather(sendbuf, sendcount, sendtype, // collect
recvbuf, recvcount, recvtype, root, comm);
MPI_Alltoall(sendbuf, sendcount, sendtype, // transpose
recvbuf, recvcount, recvtype, comm);
MPI_Barrier(comm); // synchronization
Communicators and Topologies
MPI_Comm_split(MPI_COMM_WORLD, color, key, &new_comm); // sub-groups
MPI_Cart_create(comm, ndims, dims, periods, reorder, // Cartesian topology
&cart_comm);
One-Sided Communication (RMA)
MPI_Win_create(base, size, disp_unit, info, comm, &win);
MPI_Win_lock(MPI_LOCK_SHARED, target_rank, 0, win);
MPI_Get(origin, count, type, target_rank, offset, count, type, win);
MPI_Win_unlock(target_rank, win);
MPI_Win_free(&win);
Enables RDMA-style access without explicit matching sends/receives. Essential for irregular communication patterns.
MPI in Practice
- Dominant in scientific computing and HPC
- Typically combined with OpenMP for hybrid parallelism (MPI between nodes, OpenMP within nodes)
- Implementations: Open MPI, MPICH, Intel MPI
- Scales to millions of processes on supercomputers