System Design Interview: Design a Distributed File System (HDFS/GFS)
Distributed file systems are foundational infrastructure, powering big data processing, cloud storage, and ML training pipelines. Understanding HDFS and Google File System (GFS) principles is essential for data engineering roles at Google, Meta, Amazon, Databricks, and Netflix. This design is also the foundation for Dropbox/Google Drive.
Requirements Clarification
Functional Requirements
- Store large files (gigabytes to terabytes each)
- Read and write files sequentially (streaming, not random access)
- Replicate files for fault tolerance
- Automatic recovery when nodes fail
- Namespace operations: create, delete, rename, list directory
Non-Functional Requirements
- Scale: petabytes of data across thousands of machines
- Fault tolerance: automatic re-replication when nodes fail
- High throughput for sequential reads/writes (not low latency)
- Consistency: append-only writes; no random updates
- Replication factor: 3x (default HDFS)
Architecture Overview
Client
↓
NameNode (master) DataNodes (workers)
- Namespace (directory tree) - Store actual file chunks
- File → chunks mapping - Serve chunk reads/writes
- DataNode locations - Send heartbeats to NameNode
- No file data stored - Replicate chunks to peers
Key Design: Chunking
CHUNK_SIZE = 64 * 1024 * 1024 # 64MB (HDFS default)
# GFS uses 64MB chunks; Colossus (GFS successor) uses smaller chunks
class FileChunk:
def __init__(self, chunk_id: str, file_id: str, chunk_index: int):
self.chunk_id = chunk_id
self.file_id = file_id
self.chunk_index = chunk_index
self.size = 0 # actual bytes in chunk
self.checksum: str = "" # MD5 or CRC32 for integrity
self.version: int = 1 # incremented on mutation
self.locations: list[str] = [] # DataNode IDs storing this chunk
class FileMetadata:
def __init__(self, file_id: str, path: str, size: int):
self.file_id = file_id
self.path = path
self.total_size = size
self.chunk_ids: list[str] = [] # ordered list of chunk IDs
self.replication_factor = 3
self.created_at: float = 0
self.modified_at: float = 0
self.permissions: int = 0o644
NameNode: Master Coordinator
import threading
from collections import defaultdict
import time
class NameNode:
"""
Single master (HDFS) — SPOF addressed by:
- Secondary NameNode: checkpoints namespace periodically
- HDFS HA: Active/Standby NameNodes with shared edit log (ZooKeeper)
- Google Colossus: replaced with distributed Bigtable-backed namespace
"""
def __init__(self, replication_factor: int = 3):
self.replication_factor = replication_factor
# In-memory namespace (persisted to disk as edit log + fsimage)
self.namespace: dict[str, FileMetadata] = {} # path -> FileMetadata
self.chunk_map: dict[str, FileChunk] = {} # chunk_id -> FileChunk
self.datanode_chunks: dict[str, set] = defaultdict(set) # datanode_id -> chunk_ids
self.datanode_info: dict[str, dict] = {} # datanode_id -> {capacity, free, last_heartbeat}
self.lock = threading.RLock()
def create_file(self, path: str, replication: int = None) -> FileMetadata:
"""Create a new file entry (no data yet)"""
with self.lock:
if path in self.namespace:
raise FileExistsError(f"{path} already exists")
file_id = generate_id()
metadata = FileMetadata(file_id, path, 0)
metadata.replication_factor = replication or self.replication_factor
metadata.created_at = time.time()
self.namespace[path] = metadata
return metadata
def allocate_chunk(self, file_path: str) -> tuple[str, list[str]]:
"""
Allocate a new chunk for file, return (chunk_id, [datanode_ids]).
Client will write directly to DataNodes, not through NameNode.
"""
with self.lock:
metadata = self.namespace.get(file_path)
if not metadata:
raise FileNotFoundError(f"{file_path} not found")
chunk_id = generate_chunk_id()
chunk_index = len(metadata.chunk_ids)
# Select DataNodes for this chunk (placement policy)
datanodes = self._select_datanodes(metadata.replication_factor)
chunk = FileChunk(chunk_id, metadata.file_id, chunk_index)
chunk.locations = datanodes
self.chunk_map[chunk_id] = chunk
metadata.chunk_ids.append(chunk_id)
return chunk_id, datanodes
def _select_datanodes(self, count: int) -> list[str]:
"""
Rack-aware placement policy (HDFS default):
- 1st replica: on same rack as writer (minimize cross-rack traffic)
- 2nd replica: different rack (fault isolation)
- 3rd replica: same rack as 2nd but different node
Simplified: just select by available capacity
"""
available = [
(info['free'], node_id)
for node_id, info in self.datanode_info.items()
if time.time() - info['last_heartbeat'] list[tuple[str, list[str]]]:
"""Return chunk_id, [datanode_ids] for each chunk of file (for reading)"""
with self.lock:
metadata = self.namespace.get(file_path)
if not metadata:
raise FileNotFoundError(f"{file_path} not found")
return [
(chunk_id, self.chunk_map[chunk_id].locations)
for chunk_id in metadata.chunk_ids
]
def process_heartbeat(self, datanode_id: str, capacity: int,
free: int, chunk_reports: list[str]):
"""
DataNodes send heartbeat every 3 seconds.
NameNode tracks live nodes and chunk inventory.
Returns commands to execute (e.g., delete orphan chunks, re-replicate).
"""
with self.lock:
self.datanode_info[datanode_id] = {
'capacity': capacity,
'free': free,
'last_heartbeat': time.time(),
}
self.datanode_chunks[datanode_id] = set(chunk_reports)
return self._check_replication()
def _check_replication(self) -> list[dict]:
"""Find under-replicated chunks and issue re-replication commands"""
commands = []
with self.lock:
for chunk_id, chunk in self.chunk_map.items():
alive_locations = [
loc for loc in chunk.locations
if time.time() - self.datanode_info.get(loc, {}).get('last_heartbeat', 0) < 30
]
if len(alive_locations) < self.replication_factor:
# Command a DataNode to copy this chunk to a new DataNode
target = self._select_datanodes(1)
if target and alive_locations:
commands.append({
'type': 'REPLICATE',
'chunk_id': chunk_id,
'source': alive_locations[0],
'target': target[0],
})
return commands
DataNode: Storage Worker
import os
import hashlib
class DataNode:
"""
Stores chunks as files on local disk.
Serves read/write requests from clients.
Replicates to other DataNodes in the pipeline.
"""
def __init__(self, node_id: str, storage_dir: str, namenode_address: str):
self.node_id = node_id
self.storage_dir = storage_dir
self.namenode = NameNodeClient(namenode_address)
os.makedirs(storage_dir, exist_ok=True)
def write_chunk(self, chunk_id: str, data: bytes,
pipeline: list[str]) -> bool:
"""
Write chunk data.
Pipeline: [this_node, next_node, ...] — forward to pipeline tail.
Ensures write is replicated before acknowledging to client.
"""
# Write to local disk
chunk_path = os.path.join(self.storage_dir, chunk_id)
checksum = hashlib.md5(data).hexdigest()
with open(chunk_path, 'wb') as f:
f.write(data)
# Forward to next DataNode in pipeline (chain replication)
if pipeline:
next_node = pipeline[0]
remaining_pipeline = pipeline[1:]
datanode_client = DataNodeClient(next_node)
if not datanode_client.write_chunk(chunk_id, data, remaining_pipeline):
os.remove(chunk_path) # Rollback on pipeline failure
return False
# Notify NameNode chunk was written
self.namenode.report_chunk(self.node_id, chunk_id, len(data), checksum)
return True
def read_chunk(self, chunk_id: str, offset: int = 0,
length: int = None) -> bytes:
"""Read chunk data with integrity verification"""
chunk_path = os.path.join(self.storage_dir, chunk_id)
if not os.path.exists(chunk_path):
raise ChunkNotFoundError(f"Chunk {chunk_id} not found")
with open(chunk_path, 'rb') as f:
if offset:
f.seek(offset)
data = f.read(length) if length else f.read()
return data
def send_heartbeat(self):
"""Send heartbeat to NameNode every 3 seconds"""
stat = os.statvfs(self.storage_dir)
capacity = stat.f_blocks * stat.f_frsize
free = stat.f_available * stat.f_frsize
chunk_ids = [f for f in os.listdir(self.storage_dir)]
commands = self.namenode.heartbeat(
self.node_id, capacity, free, chunk_ids
)
self._execute_commands(commands)
def _execute_commands(self, commands: list[dict]):
"""Execute NameNode commands: replicate, delete orphan chunks"""
for cmd in commands:
if cmd['type'] == 'REPLICATE':
data = self.read_chunk(cmd['chunk_id'])
target_client = DataNodeClient(cmd['target'])
target_client.write_chunk(cmd['chunk_id'], data, [])
elif cmd['type'] == 'DELETE':
chunk_path = os.path.join(self.storage_dir, cmd['chunk_id'])
if os.path.exists(chunk_path):
os.remove(chunk_path)
Client API
class DFSClient:
"""
Client library for reading and writing files.
Communicates with NameNode for metadata, DataNodes for data.
"""
def __init__(self, namenode_address: str):
self.namenode = NameNodeClient(namenode_address)
self.chunk_size = 64 * 1024 * 1024 # 64MB
def write(self, remote_path: str, data: bytes):
"""Write file to DFS"""
self.namenode.create_file(remote_path)
offset = 0
while offset bytes:
"""Read entire file from DFS"""
chunk_locations = self.namenode.get_chunk_locations(remote_path)
result = []
for chunk_id, datanodes in chunk_locations:
# Try each DataNode; use first available (closest)
for datanode_addr in datanodes:
try:
client = DataNodeClient(datanode_addr)
chunk_data = client.read_chunk(chunk_id)
result.append(chunk_data)
break
except Exception:
continue # Try next replica
else:
raise Exception(f"All replicas unavailable for chunk {chunk_id}")
return b''.join(result)
Key Design Decisions
- 64MB chunk size: Large chunks reduce NameNode memory (fewer entries), reduce network round-trips for sequential reads, but waste space for small files. HDFS small files problem: 1M small files = 1M NameNode entries (memory bottleneck).
- Single NameNode: Simplifies consistency — all metadata operations serialized. SPOF addressed with HDFS HA (Active/Standby + shared edit log via JournalNodes/ZooKeeper). Google Colossus distributed this across multiple masters.
- Chain replication: Client writes to DataNode 1, which forwards to DN2, which forwards to DN3. Ack flows back. vs Fan-out (client writes to all): chain is slower but uses less client bandwidth.
- Checksums: Each chunk stored with checksum. On read, DataNode verifies — if mismatch, returns error, client tries next replica. Silent data corruption detected automatically.
- Heartbeat-based failure detection: DataNodes not heard from in 10 minutes → marked dead → NameNode triggers re-replication of all chunks that were on dead node.
HDFS vs GFS vs S3
| Property | HDFS | GFS/Colossus | S3 |
|---|---|---|---|
| Chunk size | 128MB | 64MB→variable | N/A (object) |
| Consistency | Strong (single master) | Relaxed (concurrent appends) | Eventually consistent→strong |
| Mutation model | Append only | Random write | Put/replace object |
| Namespace | Hierarchical | Hierarchical | Flat (bucket/key) |
| Use case | MapReduce/Spark | Google internal | General object storage |