MapReduce: Simplified Data Processing on Large Clusters
Paper Overview
Authors: Jeffrey Dean, Sanjay Ghemawat (Google)
Published: OSDI 2004
Citation: One of the most cited papers in computer science
TL;DR
MapReduce is a programming model for processing large datasets in parallel across a distributed cluster. Users define map functions that process key-value pairs to generate intermediate pairs, and reduce functions that merge intermediate values with the same key. The runtime handles parallelization, fault tolerance, and data distribution automatically. This abstraction enabled Google to run thousands of different computations on commodity hardware while hiding the complexity of distributed systems.
Problem Statement
Before MapReduce, Google engineers had to write custom distributed systems for each computation:
- Crawling the web
- Building the search index
- Analyzing logs
- Computing PageRank
Each system needed to handle:
- Parallelization across hundreds of machines
- Data distribution and load balancing
- Fault tolerance for machine failures
- Network optimization
This was complex, error-prone, and duplicated effort.
Core Abstraction
┌─────────────────────────────────────────────────────────────────────────┐
│ MapReduce Programming Model │
│ │
│ map(key1, value1) → list(key2, value2) │
│ reduce(key2, list(value2)) → list(value2) │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Example: Word Count │ │
│ │ │ │
│ │ Input: "hello world hello" │ │
│ │ │ │
│ │ map("doc1", "hello world hello"): │ │
│ │ emit("hello", 1) │ │
│ │ emit("world", 1) │ │
│ │ emit("hello", 1) │ │
│ │ │ │
│ │ reduce("hello", [1, 1]): │ │
│ │ emit("hello", 2) │ │
│ │ │ │
│ │ reduce("world", [1]): │ │
│ │ emit("world", 1) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘Execution Flow
┌─────────────────────────────────────────────────────────────────────────┐
│ MapReduce Execution Flow │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Input Files (GFS) │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │Split 0 │ │Split 1 │ │Split 2 │ │Split 3 │ │Split 4 │ │ │
│ │ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Map Phase │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │ Map 0 │ │ Map 1 │ │ Map 2 │ │ Map 3 │ │ Map 4 │ │ │
│ │ │Worker │ │Worker │ │Worker │ │Worker │ │Worker │ │ │
│ │ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘ │ │
│ │ │ │ │ │ │ │ │
│ │ ▼ ▼ ▼ ▼ ▼ │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │Local │ │Local │ │Local │ │Local │ │Local │ │ │
│ │ │Disk │ │Disk │ │Disk │ │Disk │ │Disk │ │ │
│ │ │(R parts)│ │(R parts)│ │(R parts)│ │(R parts)│ │(R parts)│ │ │
│ │ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ Shuffle (partition by key hash) │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Reduce Phase │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Reduce 0 │ │ Reduce 1 │ │ Reduce 2 │ │ │
│ │ │ Worker │ │ Worker │ │ Worker │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │Output 0 │ │Output 1 │ │Output 2 │ │ │
│ │ │ (GFS) │ │ (GFS) │ │ (GFS) │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘Execution Steps
- Split input: Divide input into M splits (typically 16-64 MB each)
- Fork processes: Master assigns map/reduce tasks to workers
- Map phase: Workers read splits, apply map function, buffer output in memory
- Partition: Buffered output partitioned into R regions by hash(key) mod R
- Write locally: Partitioned data written to local disk
- Shuffle: Reduce workers fetch their partition from all map workers
- Sort: Reduce workers sort by key (external sort if needed)
- Reduce phase: Iterate over sorted data, apply reduce function
- Output: Write reduce output to GFS
Implementation Details
Master Data Structures
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set
from enum import Enum
import time
class TaskState(Enum):
IDLE = "idle"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
class TaskType(Enum):
MAP = "map"
REDUCE = "reduce"
@dataclass
class MapTask:
task_id: int
input_split: str # GFS file path
state: TaskState = TaskState.IDLE
worker_id: Optional[str] = None
start_time: Optional[float] = None
output_locations: List[str] = field(default_factory=list) # R locations
@dataclass
class ReduceTask:
task_id: int
partition: int # Partition number (0 to R-1)
state: TaskState = TaskState.IDLE
worker_id: Optional[str] = None
start_time: Optional[float] = None
input_locations: List[str] = field(default_factory=list) # From all mappers
class MapReduceMaster:
"""
Master coordinates the entire MapReduce job.
Tracks task states, handles failures, and manages workers.
"""
def __init__(self, input_files: List[str], num_reducers: int):
self.num_mappers = len(input_files)
self.num_reducers = num_reducers
# Initialize map tasks
self.map_tasks: Dict[int, MapTask] = {
i: MapTask(task_id=i, input_split=f)
for i, f in enumerate(input_files)
}
# Initialize reduce tasks
self.reduce_tasks: Dict[int, ReduceTask] = {
i: ReduceTask(task_id=i, partition=i)
for i in range(num_reducers)
}
# Worker tracking
self.workers: Dict[str, float] = {} # worker_id -> last_heartbeat
self.worker_timeout = 10.0 # seconds
def get_task(self, worker_id: str) -> Optional[Dict]:
"""Assign a task to a worker"""
self.workers[worker_id] = time.time()
# Prefer map tasks first (reduce can't start until maps complete)
for task_id, task in self.map_tasks.items():
if task.state == TaskState.IDLE:
task.state = TaskState.IN_PROGRESS
task.worker_id = worker_id
task.start_time = time.time()
return {
"type": TaskType.MAP,
"task_id": task_id,
"input_split": task.input_split,
"num_reducers": self.num_reducers
}
# All maps done? Assign reduce tasks
if self._all_maps_complete():
for task_id, task in self.reduce_tasks.items():
if task.state == TaskState.IDLE:
task.state = TaskState.IN_PROGRESS
task.worker_id = worker_id
task.start_time = time.time()
# Collect input locations from all mappers
input_locations = []
for map_task in self.map_tasks.values():
input_locations.append(
map_task.output_locations[task_id]
)
return {
"type": TaskType.REDUCE,
"task_id": task_id,
"partition": task.partition,
"input_locations": input_locations
}
return None # No tasks available
def task_completed(
self,
worker_id: str,
task_type: TaskType,
task_id: int,
output_locations: List[str]
):
"""Handle task completion notification"""
if task_type == TaskType.MAP:
task = self.map_tasks[task_id]
if task.worker_id == worker_id:
task.state = TaskState.COMPLETED
task.output_locations = output_locations
else:
task = self.reduce_tasks[task_id]
if task.worker_id == worker_id:
task.state = TaskState.COMPLETED
def check_timeouts(self):
"""Re-assign tasks from failed workers"""
now = time.time()
# Check for worker timeouts
failed_workers = set()
for worker_id, last_heartbeat in self.workers.items():
if now - last_heartbeat > self.worker_timeout:
failed_workers.add(worker_id)
# Re-assign map tasks
for task in self.map_tasks.values():
if task.state == TaskState.IN_PROGRESS:
if task.worker_id in failed_workers:
task.state = TaskState.IDLE
task.worker_id = None
# Re-assign reduce tasks
for task in self.reduce_tasks.values():
if task.state == TaskState.IN_PROGRESS:
if task.worker_id in failed_workers:
task.state = TaskState.IDLE
task.worker_id = None
# Also re-run completed map tasks if worker failed
# (reduce workers may not have fetched data yet)
for task in self.map_tasks.values():
if task.state == TaskState.COMPLETED:
if task.worker_id in failed_workers:
task.state = TaskState.IDLE
task.worker_id = None
def _all_maps_complete(self) -> bool:
return all(t.state == TaskState.COMPLETED for t in self.map_tasks.values())
def is_done(self) -> bool:
return all(t.state == TaskState.COMPLETED for t in self.reduce_tasks.values())Worker Implementation
class MapReduceWorker:
"""
Worker executes map and reduce tasks.
"""
def __init__(self, worker_id: str, master_address: str):
self.worker_id = worker_id
self.master = master_address
self.map_fn = None
self.reduce_fn = None
def run(self):
"""Main worker loop"""
while True:
# Request task from master
task = self.request_task()
if task is None:
# No tasks available, sleep and retry
time.sleep(1)
continue
if task["type"] == TaskType.MAP:
self.execute_map(task)
else:
self.execute_reduce(task)
def execute_map(self, task: Dict):
"""Execute a map task"""
task_id = task["task_id"]
input_file = task["input_split"]
num_reducers = task["num_reducers"]
# Read input
content = self.read_gfs(input_file)
# Apply map function
intermediate = []
for key, value in self.parse_input(content):
for output_key, output_value in self.map_fn(key, value):
intermediate.append((output_key, output_value))
# Partition by reduce task
partitions = [[] for _ in range(num_reducers)]
for key, value in intermediate:
partition = hash(key) % num_reducers
partitions[partition].append((key, value))
# Write partitions to local disk
output_locations = []
for partition_id, partition_data in enumerate(partitions):
location = f"/tmp/mr-{task_id}-{partition_id}"
self.write_local(location, partition_data)
output_locations.append(f"{self.worker_id}:{location}")
# Notify master
self.notify_complete(TaskType.MAP, task_id, output_locations)
def execute_reduce(self, task: Dict):
"""Execute a reduce task"""
task_id = task["task_id"]
input_locations = task["input_locations"]
# Fetch all intermediate data for this partition
intermediate = []
for location in input_locations:
worker, path = location.split(":")
data = self.fetch_from_worker(worker, path)
intermediate.extend(data)
# Sort by key
intermediate.sort(key=lambda x: x[0])
# Group by key and apply reduce
output = []
i = 0
while i < len(intermediate):
key = intermediate[i][0]
values = []
# Collect all values for this key
while i < len(intermediate) and intermediate[i][0] == key:
values.append(intermediate[i][1])
i += 1
# Apply reduce function
for output_value in self.reduce_fn(key, values):
output.append((key, output_value))
# Write output to GFS
output_file = f"/output/part-{task_id:05d}"
self.write_gfs(output_file, output)
# Notify master
self.notify_complete(TaskType.REDUCE, task_id, [output_file])Fault Tolerance
Worker Failure
┌─────────────────────────────────────────────────────────────────────────┐
│ Handling Worker Failures │
│ │
│ Map Worker Fails: │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 1. Master detects via missing heartbeats │ │
│ │ 2. All map tasks on that worker reset to IDLE │ │
│ │ 3. Tasks re-assigned to other workers │ │
│ │ 4. Reduce workers notified of new intermediate locations │ │
│ │ │ │
│ │ Note: Completed map tasks ALSO re-executed because output │ │
│ │ is on local disk (inaccessible if worker dead) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ Reduce Worker Fails: │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 1. Master detects via missing heartbeats │ │
│ │ 2. Only IN_PROGRESS reduce tasks reset to IDLE │ │
│ │ 3. Tasks re-assigned to other workers │ │
│ │ │ │
│ │ Note: Completed reduce tasks NOT re-executed because output │ │
│ │ is on GFS (globally accessible) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ Master Failure: │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Original paper: Abort job, user restarts │ │
│ │ Later systems: Checkpointing, standby master │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘Deterministic Execution
For fault tolerance to work correctly:
- Map and Reduce functions must be deterministic
- Same input → same output, always
- Re-execution produces identical results
- Non-deterministic functions can cause inconsistencies
Optimizations
Locality Optimization
┌─────────────────────────────────────────────────────────────────────────┐
│ Data Locality Optimization │
│ │
│ GFS stores data in 64MB chunks, replicated 3x │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ GFS Chunk Locations for Input Split │ │
│ │ │ │
│ │ Split "input-0": replicas on [Machine A, Machine B, Machine C]│ │
│ │ │ │
│ │ Master scheduling priority: │ │
│ │ 1. Schedule map task on machine with replica (local read) │ │
│ │ 2. Schedule on machine on same rack (rack-local) │ │
│ │ 3. Schedule anywhere (remote read) │ │
│ │ │ │
│ │ Result: Most reads are local, minimizing network bandwidth │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘Combiner Function
# Without combiner: Each mapper emits ("the", 1) thousands of times
# Lots of network traffic during shuffle
def word_count_map(key, value):
for word in value.split():
emit(word, 1)
def word_count_reduce(key, values):
emit(key, sum(values))
# With combiner: Partial aggregation on mapper before shuffle
def word_count_combiner(key, values):
# Same as reduce, runs locally on mapper
emit(key, sum(values))
# Mapper output: ("the", 4523) instead of thousands of ("the", 1)
# Much less data transferred during shuffleBackup Tasks
┌─────────────────────────────────────────────────────────────────────────┐
│ Handling Stragglers │
│ │
│ Problem: One slow machine can delay entire job │
│ - Bad disk │
│ - Competing workloads │
│ - CPU throttling │
│ │
│ Solution: Backup tasks │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ When job is almost complete (e.g., 90% of tasks done): │ │
│ │ 1. Identify still-running tasks │ │
│ │ 2. Schedule backup executions on other machines │ │
│ │ 3. First to complete wins, other killed │ │
│ │ │ │
│ │ Cost: Few percent more resources │ │
│ │ Benefit: Significantly reduced tail latency │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ Example from paper: │
│ - Sort 1TB without backup tasks: 1283 seconds │
│ - Sort 1TB with backup tasks: 891 seconds (44% faster) │
└─────────────────────────────────────────────────────────────────────────┘Example Applications
Distributed Grep
def grep_map(key, value):
# key: filename, value: file contents
for line in value.split('\n'):
if re.search(pattern, line):
emit(line, "1")
def grep_reduce(key, values):
emit(key, None) # Just output matching linesURL Access Count
def url_map(key, value):
# key: log line number, value: log entry
parsed = parse_log_entry(value)
emit(parsed.url, 1)
def url_reduce(key, values):
emit(key, sum(values))Inverted Index
def index_map(key, value):
# key: document ID, value: document content
for word in value.split():
emit(word, key)
def index_reduce(key, values):
# key: word, values: list of document IDs
emit(key, sorted(set(values)))PageRank Iteration
def pagerank_map(key, value):
# key: page URL, value: (current_rank, outlinks)
current_rank, outlinks = value
# Distribute rank to outlinks
contribution = current_rank / len(outlinks)
for outlink in outlinks:
emit(outlink, contribution)
# Emit graph structure
emit(key, outlinks)
def pagerank_reduce(key, values):
# Separate contributions from graph structure
outlinks = None
total_contribution = 0
for value in values:
if isinstance(value, list):
outlinks = value
else:
total_contribution += value
# PageRank formula
new_rank = 0.15 + 0.85 * total_contribution
emit(key, (new_rank, outlinks))Performance Results (from paper)
Grep
- 10^10 100-byte records (1 TB)
- Pattern occurs in 92,337 records
- M = 15,000 map tasks, R = 1 reduce task
- 1,764 machines
- Time: 150 seconds (including ~60s startup overhead)
- Peak: 30 GB/s aggregate read rate
Sort
- 10^10 100-byte records (1 TB)
- M = 15,000 map tasks, R = 4,000 reduce tasks
- 1,764 machines
- Time: 891 seconds (with backup tasks)
- Three phases visible: map (~200s), shuffle (~600s), reduce (~100s)
Limitations
- Only batch processing - Not suitable for real-time or interactive queries
- Disk-based shuffle - Expensive I/O between phases
- Limited to map + reduce - Complex algorithms need multiple jobs
- No iteration support - Each iteration = full job restart
- High latency - Startup overhead, materialization between phases
Legacy and Influence
Direct Descendants
- Hadoop MapReduce - Open source implementation
- Apache Spark - In-memory, DAG-based (addressed many limitations)
- Apache Flink - Stream processing with batch
Broader Impact
- Popularized functional programming patterns in data processing
- Demonstrated viability of commodity hardware for big data
- Inspired cloud computing models (EMR, Dataproc)
- Influenced SQL-on-Hadoop systems (Hive, Pig)
Key Takeaways
Simple abstraction, powerful results - Two functions (map, reduce) express surprisingly many computations.
Automatic parallelization - Users write sequential code; framework handles distribution.
Fault tolerance through re-execution - Deterministic functions + lineage tracking = no lost work.
Data locality matters - Schedule computation near data to minimize network I/O.
Backup tasks for stragglers - Small resource cost for large latency improvement.
Commodity hardware works - Expect failures, design for them, scale out not up.
Intermediate data on local disk - Reduces network traffic, enables recovery without re-running entire pipeline.