System Design Interview: Design a Distributed Messaging System (Kafka)

System Design Interview: Design a Distributed Messaging System (Apache Kafka)

Understanding distributed messaging systems is essential for senior engineering roles. Kafka’s design is commonly asked at LinkedIn (where Kafka was invented), Uber, Airbnb, Databricks, and any company with large-scale event streaming. This post covers the architecture that lets Kafka handle millions of messages per second with durability guarantees.

Requirements

Functional Requirements

  • Publish messages to named topics
  • Multiple consumers read messages independently (consumer groups)
  • Durable persistence: messages retained for configurable period
  • Ordered delivery within a partition
  • At-least-once delivery guarantee
  • Horizontal scalability for both producers and consumers

Non-Functional Requirements

  • Throughput: millions of messages per second
  • Latency: single-digit milliseconds end-to-end
  • Durability: messages replicated across multiple brokers
  • Fault tolerance: survive broker failures without data loss
  • Consumer groups: multiple independent consumers reading same topic

Core Architecture

Producers
   ↓ (write to leader)
Kafka Broker Cluster
   [Topic A, Partition 0 — Leader on Broker 1]
   [Topic A, Partition 1 — Leader on Broker 2]
   [Topic A, Partition 2 — Leader on Broker 3]
   ↑ (replicate to followers)
   Broker 1 ←→ Broker 2 ←→ Broker 3
   ↓ (consumers pull)
Consumer Groups
   [Group X: Consumer A (partition 0), Consumer B (partition 1,2)]
   [Group Y: Consumer C (all partitions)]
ZooKeeper / KRaft (metadata, leader election)

Topic, Partition, and Offset

class KafkaMessage:
    def __init__(self, key: bytes, value: bytes,
                 timestamp: int, headers: dict = None):
        self.key = key          # determines partition (hash(key) % num_partitions)
        self.value = value      # payload (bytes — schema-agnostic)
        self.timestamp = timestamp
        self.headers = headers or {}
        self.offset: int = -1  # assigned by broker when written

class Partition:
    """
    Ordered, immutable log of messages.
    New messages appended to end (offset increments).
    Consumers read by specifying offset.
    Retention: messages deleted after retention.ms or retention.bytes exceeded.
    """
    def __init__(self, topic: str, partition_id: int):
        self.topic = topic
        self.partition_id = partition_id
        self.log: list[KafkaMessage] = []  # append-only
        self.leader_broker_id: int = -1
        self.follower_broker_ids: list[int] = []
        self.hw: int = 0  # High Watermark: highest committed offset (all replicas have)

    def append(self, message: KafkaMessage) -> int:
        """Returns offset of appended message"""
        message.offset = len(self.log)
        self.log.append(message)
        return message.offset

    def read(self, offset: int, max_count: int = 100) -> list:
        """Read messages starting from offset"""
        return self.log[offset:offset + max_count]

Partition Assignment

class DefaultPartitioner:
    """
    Kafka's default partitioning strategy.
    With key: consistent hashing to same partition.
    Without key: round-robin across partitions.
    """

    def partition(self, key: bytes, num_partitions: int) -> int:
        if key is None:
            # Round-robin (sticky partitioner in modern Kafka)
            return self._round_robin(num_partitions)
        # Consistent hash: same key always goes to same partition
        # Ensures ordering for messages with same key
        return murmur2_hash(key) % num_partitions

# Key insight: all messages with same key go to same partition
# → ordering guaranteed for same key within a topic

# Example: order events for the same order_id
producer.send('order_events',
    key=str(order_id).encode(),    # same order → same partition → ordered
    value=serialize(event)
)

Producer: Durability Guarantees

class KafkaProducerConfig:
    """
    acks controls durability vs throughput tradeoff:
    acks=0: fire and forget (fastest, may lose messages)
    acks=1: leader acknowledges (lost if leader fails before replication)
    acks=all: all in-sync replicas acknowledge (safest, slower)
    """

    # High throughput (some data loss acceptable: metrics, logs)
    fast_config = {
        'acks': 1,
        'batch.size': 16384,        # 16KB batch
        'linger.ms': 5,             # wait 5ms to accumulate batch
        'compression.type': 'snappy',
        'buffer.memory': 33554432,  # 32MB producer buffer
    }

    # High durability (financial transactions, audit logs)
    safe_config = {
        'acks': 'all',
        'min.insync.replicas': 2,   # at least 2 replicas must ack
        'retries': 3,
        'enable.idempotence': True,  # dedup producer retries (exactly-once semantics)
        'compression.type': 'lz4',
    }

class Producer:
    def send(self, topic: str, key: bytes, value: bytes,
             callback=None) -> None:
        """
        Asynchronous send: messages batched in memory buffer.
        Batch sent when: batch.size reached OR linger.ms elapsed.
        On failure: automatic retry with exponential backoff.
        """
        message = KafkaMessage(key=key, value=value,
                               timestamp=current_time_ms())
        partition = self.partitioner.partition(key, self.num_partitions(topic))
        self.buffer[topic][partition].append(message)

        if self._should_flush(topic, partition):
            self._flush(topic, partition, callback)

Consumer Group: Parallel Processing

class ConsumerGroup:
    """
    Multiple consumers sharing work by splitting partitions.
    Rule: each partition assigned to exactly ONE consumer in group.
    If consumers > partitions: some consumers are idle.
    Rebalancing: triggered when consumer joins/leaves.
    
    Kafka's Group Coordinator (broker) manages assignments.
    """

    def assign_partitions(self, consumers: list[str],
                          partitions: list[int]) -> dict:
        """
        Range assignor (default): assign contiguous partitions.
        Round-robin assignor: distribute evenly across consumers.
        Sticky assignor: minimize partition movements on rebalance.
        """
        assignment = {}
        # Round-robin for balanced distribution
        for i, partition in enumerate(partitions):
            consumer = consumers[i % len(consumers)]
            if consumer not in assignment:
                assignment[consumer] = []
            assignment[consumer].append(partition)
        return assignment

class Consumer:
    def __init__(self, group_id: str, bootstrap_servers: str):
        self.group_id = group_id
        self.offsets: dict[tuple, int] = {}  # (topic, partition) → committed offset

    def poll(self, timeout_ms: int = 1000) -> list:
        """
        Fetch messages from assigned partitions.
        Returns records from current offset position.
        Consumer tracks its own offset — broker doesn't push.
        """
        records = []
        for (topic, partition), offset in self.offsets.items():
            new_records = self.fetch_from_broker(topic, partition, offset)
            records.extend(new_records)
            if new_records:
                self.offsets[(topic, partition)] = new_records[-1].offset + 1
        return records

    def commit_offset(self, topic: str, partition: int, offset: int):
        """
        Commit offset = "I've processed up to this offset."
        Auto-commit: enabled by default (enable.auto.commit=true, auto.commit.interval.ms=5000)
        Manual commit: commit AFTER processing for at-least-once guarantee.
        """
        self.offsets[(topic, partition)] = offset
        # Persist to __consumer_offsets topic (internal Kafka topic)
        self.broker.commit(self.group_id, topic, partition, offset)

# At-least-once processing pattern:
def process_messages(consumer):
    while True:
        records = consumer.poll(timeout_ms=1000)
        for record in records:
            try:
                process(record)  # Your business logic
                consumer.commit_offset(record.topic, record.partition, record.offset + 1)
            except Exception as e:
                # Don't commit offset → message will be redelivered
                log_error(e)
                # Implement DLQ (Dead Letter Queue) for repeated failures

Replication and Leader Election

class ReplicationManager:
    """
    Each partition has 1 leader + N-1 followers (replication factor N).
    Leader handles all reads and writes.
    Followers replicate from leader continuously.
    In-Sync Replicas (ISR): followers caught up with leader.
    
    Leader failure: ZooKeeper notifies cluster, new leader elected from ISR.
    """

    # Replication flow:
    # 1. Producer → Leader (write to log)
    # 2. Followers pull from leader (FetchRequest)
    # 3. Leader tracks ISR: followers that are within replica.lag.time.max.ms
    # 4. High Watermark advances when all ISR replicas have the message
    # 5. Consumers only read up to High Watermark (committed messages)

    def handle_follower_fetch(self, partition: str, fetch_offset: int,
                              follower_id: int) -> list:
        """Returns messages from fetch_offset to end of leader log"""
        leader_partition = self.partitions[partition]
        messages = leader_partition.read(fetch_offset)
        # Update ISR tracking
        self.update_follower_offset(partition, follower_id, fetch_offset)
        self.advance_high_watermark(partition)
        return messages

    def advance_high_watermark(self, partition_id: str):
        """HW = min(offset) across all ISR followers"""
        partition = self.partitions[partition_id]
        isr_offsets = [
            self.follower_offsets[partition_id][follower]
            for follower in partition.follower_broker_ids
            if self.is_in_sync(partition_id, follower)
        ]
        if isr_offsets:
            partition.hw = min(isr_offsets)

Key Kafka Design Decisions

  • Log-structured storage: Messages written sequentially to append-only log segments on disk. Sequential I/O is 100x faster than random I/O — critical for Kafka’s throughput. Old segments deleted based on retention policy.
  • Zero-copy transfer: Kafka uses OS-level sendfile() to transfer data from disk directly to network socket without copying to user space. Eliminates CPU overhead for data transfer — massive throughput improvement.
  • Consumer pulls, not push: Consumers control their own pace. No back-pressure needed. Broker doesn’t need per-consumer state (just committed offsets in __consumer_offsets topic). Simple, scalable.
  • Batching everywhere: Producers batch messages before sending. Brokers batch messages before writing to disk. Network transfers batched. Batching amortizes per-message overhead — critical for million-message-per-second throughput.
  • Message ordering guarantee: Ordering only guaranteed within a partition, not across partitions. Design: use a consistent partition key (order_id, user_id) to ensure related messages go to the same partition.

Kafka vs Traditional Message Queues (RabbitMQ)

Feature Kafka RabbitMQ
Message model Persistent log (replay) Queue (delete on consume)
Consumer model Pull, offset-based Push, acknowledgment-based
Ordering Within partition Within queue
Replayability Yes (retention period) No (deleted after ACK)
Multiple consumers Consumer groups (all read) Competing consumers (one gets)
Throughput Millions/sec Tens of thousands/sec
Use case Event streaming, analytics, CDC Task queues, RPC, routing
Scroll to Top