System Design Interview: Design a Distributed Messaging System (Apache Kafka)
Understanding distributed messaging systems is essential for senior engineering roles. Kafka’s design is commonly asked at LinkedIn (where Kafka was invented), Uber, Airbnb, Databricks, and any company with large-scale event streaming. This post covers the architecture that lets Kafka handle millions of messages per second with durability guarantees.
Requirements
Functional Requirements
- Publish messages to named topics
- Multiple consumers read messages independently (consumer groups)
- Durable persistence: messages retained for configurable period
- Ordered delivery within a partition
- At-least-once delivery guarantee
- Horizontal scalability for both producers and consumers
Non-Functional Requirements
- Throughput: millions of messages per second
- Latency: single-digit milliseconds end-to-end
- Durability: messages replicated across multiple brokers
- Fault tolerance: survive broker failures without data loss
- Consumer groups: multiple independent consumers reading same topic
Core Architecture
Producers
↓ (write to leader)
Kafka Broker Cluster
[Topic A, Partition 0 — Leader on Broker 1]
[Topic A, Partition 1 — Leader on Broker 2]
[Topic A, Partition 2 — Leader on Broker 3]
↑ (replicate to followers)
Broker 1 ←→ Broker 2 ←→ Broker 3
↓ (consumers pull)
Consumer Groups
[Group X: Consumer A (partition 0), Consumer B (partition 1,2)]
[Group Y: Consumer C (all partitions)]
ZooKeeper / KRaft (metadata, leader election)
Topic, Partition, and Offset
class KafkaMessage:
def __init__(self, key: bytes, value: bytes,
timestamp: int, headers: dict = None):
self.key = key # determines partition (hash(key) % num_partitions)
self.value = value # payload (bytes — schema-agnostic)
self.timestamp = timestamp
self.headers = headers or {}
self.offset: int = -1 # assigned by broker when written
class Partition:
"""
Ordered, immutable log of messages.
New messages appended to end (offset increments).
Consumers read by specifying offset.
Retention: messages deleted after retention.ms or retention.bytes exceeded.
"""
def __init__(self, topic: str, partition_id: int):
self.topic = topic
self.partition_id = partition_id
self.log: list[KafkaMessage] = [] # append-only
self.leader_broker_id: int = -1
self.follower_broker_ids: list[int] = []
self.hw: int = 0 # High Watermark: highest committed offset (all replicas have)
def append(self, message: KafkaMessage) -> int:
"""Returns offset of appended message"""
message.offset = len(self.log)
self.log.append(message)
return message.offset
def read(self, offset: int, max_count: int = 100) -> list:
"""Read messages starting from offset"""
return self.log[offset:offset + max_count]
Partition Assignment
class DefaultPartitioner:
"""
Kafka's default partitioning strategy.
With key: consistent hashing to same partition.
Without key: round-robin across partitions.
"""
def partition(self, key: bytes, num_partitions: int) -> int:
if key is None:
# Round-robin (sticky partitioner in modern Kafka)
return self._round_robin(num_partitions)
# Consistent hash: same key always goes to same partition
# Ensures ordering for messages with same key
return murmur2_hash(key) % num_partitions
# Key insight: all messages with same key go to same partition
# → ordering guaranteed for same key within a topic
# Example: order events for the same order_id
producer.send('order_events',
key=str(order_id).encode(), # same order → same partition → ordered
value=serialize(event)
)
Producer: Durability Guarantees
class KafkaProducerConfig:
"""
acks controls durability vs throughput tradeoff:
acks=0: fire and forget (fastest, may lose messages)
acks=1: leader acknowledges (lost if leader fails before replication)
acks=all: all in-sync replicas acknowledge (safest, slower)
"""
# High throughput (some data loss acceptable: metrics, logs)
fast_config = {
'acks': 1,
'batch.size': 16384, # 16KB batch
'linger.ms': 5, # wait 5ms to accumulate batch
'compression.type': 'snappy',
'buffer.memory': 33554432, # 32MB producer buffer
}
# High durability (financial transactions, audit logs)
safe_config = {
'acks': 'all',
'min.insync.replicas': 2, # at least 2 replicas must ack
'retries': 3,
'enable.idempotence': True, # dedup producer retries (exactly-once semantics)
'compression.type': 'lz4',
}
class Producer:
def send(self, topic: str, key: bytes, value: bytes,
callback=None) -> None:
"""
Asynchronous send: messages batched in memory buffer.
Batch sent when: batch.size reached OR linger.ms elapsed.
On failure: automatic retry with exponential backoff.
"""
message = KafkaMessage(key=key, value=value,
timestamp=current_time_ms())
partition = self.partitioner.partition(key, self.num_partitions(topic))
self.buffer[topic][partition].append(message)
if self._should_flush(topic, partition):
self._flush(topic, partition, callback)
Consumer Group: Parallel Processing
class ConsumerGroup:
"""
Multiple consumers sharing work by splitting partitions.
Rule: each partition assigned to exactly ONE consumer in group.
If consumers > partitions: some consumers are idle.
Rebalancing: triggered when consumer joins/leaves.
Kafka's Group Coordinator (broker) manages assignments.
"""
def assign_partitions(self, consumers: list[str],
partitions: list[int]) -> dict:
"""
Range assignor (default): assign contiguous partitions.
Round-robin assignor: distribute evenly across consumers.
Sticky assignor: minimize partition movements on rebalance.
"""
assignment = {}
# Round-robin for balanced distribution
for i, partition in enumerate(partitions):
consumer = consumers[i % len(consumers)]
if consumer not in assignment:
assignment[consumer] = []
assignment[consumer].append(partition)
return assignment
class Consumer:
def __init__(self, group_id: str, bootstrap_servers: str):
self.group_id = group_id
self.offsets: dict[tuple, int] = {} # (topic, partition) → committed offset
def poll(self, timeout_ms: int = 1000) -> list:
"""
Fetch messages from assigned partitions.
Returns records from current offset position.
Consumer tracks its own offset — broker doesn't push.
"""
records = []
for (topic, partition), offset in self.offsets.items():
new_records = self.fetch_from_broker(topic, partition, offset)
records.extend(new_records)
if new_records:
self.offsets[(topic, partition)] = new_records[-1].offset + 1
return records
def commit_offset(self, topic: str, partition: int, offset: int):
"""
Commit offset = "I've processed up to this offset."
Auto-commit: enabled by default (enable.auto.commit=true, auto.commit.interval.ms=5000)
Manual commit: commit AFTER processing for at-least-once guarantee.
"""
self.offsets[(topic, partition)] = offset
# Persist to __consumer_offsets topic (internal Kafka topic)
self.broker.commit(self.group_id, topic, partition, offset)
# At-least-once processing pattern:
def process_messages(consumer):
while True:
records = consumer.poll(timeout_ms=1000)
for record in records:
try:
process(record) # Your business logic
consumer.commit_offset(record.topic, record.partition, record.offset + 1)
except Exception as e:
# Don't commit offset → message will be redelivered
log_error(e)
# Implement DLQ (Dead Letter Queue) for repeated failures
Replication and Leader Election
class ReplicationManager:
"""
Each partition has 1 leader + N-1 followers (replication factor N).
Leader handles all reads and writes.
Followers replicate from leader continuously.
In-Sync Replicas (ISR): followers caught up with leader.
Leader failure: ZooKeeper notifies cluster, new leader elected from ISR.
"""
# Replication flow:
# 1. Producer → Leader (write to log)
# 2. Followers pull from leader (FetchRequest)
# 3. Leader tracks ISR: followers that are within replica.lag.time.max.ms
# 4. High Watermark advances when all ISR replicas have the message
# 5. Consumers only read up to High Watermark (committed messages)
def handle_follower_fetch(self, partition: str, fetch_offset: int,
follower_id: int) -> list:
"""Returns messages from fetch_offset to end of leader log"""
leader_partition = self.partitions[partition]
messages = leader_partition.read(fetch_offset)
# Update ISR tracking
self.update_follower_offset(partition, follower_id, fetch_offset)
self.advance_high_watermark(partition)
return messages
def advance_high_watermark(self, partition_id: str):
"""HW = min(offset) across all ISR followers"""
partition = self.partitions[partition_id]
isr_offsets = [
self.follower_offsets[partition_id][follower]
for follower in partition.follower_broker_ids
if self.is_in_sync(partition_id, follower)
]
if isr_offsets:
partition.hw = min(isr_offsets)
Key Kafka Design Decisions
- Log-structured storage: Messages written sequentially to append-only log segments on disk. Sequential I/O is 100x faster than random I/O — critical for Kafka’s throughput. Old segments deleted based on retention policy.
- Zero-copy transfer: Kafka uses OS-level sendfile() to transfer data from disk directly to network socket without copying to user space. Eliminates CPU overhead for data transfer — massive throughput improvement.
- Consumer pulls, not push: Consumers control their own pace. No back-pressure needed. Broker doesn’t need per-consumer state (just committed offsets in __consumer_offsets topic). Simple, scalable.
- Batching everywhere: Producers batch messages before sending. Brokers batch messages before writing to disk. Network transfers batched. Batching amortizes per-message overhead — critical for million-message-per-second throughput.
- Message ordering guarantee: Ordering only guaranteed within a partition, not across partitions. Design: use a consistent partition key (order_id, user_id) to ensure related messages go to the same partition.
Kafka vs Traditional Message Queues (RabbitMQ)
| Feature | Kafka | RabbitMQ |
|---|---|---|
| Message model | Persistent log (replay) | Queue (delete on consume) |
| Consumer model | Pull, offset-based | Push, acknowledgment-based |
| Ordering | Within partition | Within queue |
| Replayability | Yes (retention period) | No (deleted after ACK) |
| Multiple consumers | Consumer groups (all read) | Competing consumers (one gets) |
| Throughput | Millions/sec | Tens of thousands/sec |
| Use case | Event streaming, analytics, CDC | Task queues, RPC, routing |