System Design Interview: Design a Distributed Messaging System (Kafka)

System Design Interview: Design a Distributed Messaging System (Apache Kafka)

Understanding distributed messaging systems is essential for senior engineering roles. Kafka’s design is commonly asked at LinkedIn (where Kafka was invented), Uber, Airbnb, Databricks, and any company with large-scale event streaming. This post covers the architecture that lets Kafka handle millions of messages per second with durability guarantees.

Requirements

Functional Requirements

  • Publish messages to named topics
  • Multiple consumers read messages independently (consumer groups)
  • Durable persistence: messages retained for configurable period
  • Ordered delivery within a partition
  • At-least-once delivery guarantee
  • Horizontal scalability for both producers and consumers

Non-Functional Requirements

  • Throughput: millions of messages per second
  • Latency: single-digit milliseconds end-to-end
  • Durability: messages replicated across multiple brokers
  • Fault tolerance: survive broker failures without data loss
  • Consumer groups: multiple independent consumers reading same topic

Core Architecture

Producers
   ↓ (write to leader)
Kafka Broker Cluster
   [Topic A, Partition 0 — Leader on Broker 1]
   [Topic A, Partition 1 — Leader on Broker 2]
   [Topic A, Partition 2 — Leader on Broker 3]
   ↑ (replicate to followers)
   Broker 1 ←→ Broker 2 ←→ Broker 3
   ↓ (consumers pull)
Consumer Groups
   [Group X: Consumer A (partition 0), Consumer B (partition 1,2)]
   [Group Y: Consumer C (all partitions)]
ZooKeeper / KRaft (metadata, leader election)

Topic, Partition, and Offset

class KafkaMessage:
    def __init__(self, key: bytes, value: bytes,
                 timestamp: int, headers: dict = None):
        self.key = key          # determines partition (hash(key) % num_partitions)
        self.value = value      # payload (bytes — schema-agnostic)
        self.timestamp = timestamp
        self.headers = headers or {}
        self.offset: int = -1  # assigned by broker when written

class Partition:
    """
    Ordered, immutable log of messages.
    New messages appended to end (offset increments).
    Consumers read by specifying offset.
    Retention: messages deleted after retention.ms or retention.bytes exceeded.
    """
    def __init__(self, topic: str, partition_id: int):
        self.topic = topic
        self.partition_id = partition_id
        self.log: list[KafkaMessage] = []  # append-only
        self.leader_broker_id: int = -1
        self.follower_broker_ids: list[int] = []
        self.hw: int = 0  # High Watermark: highest committed offset (all replicas have)

    def append(self, message: KafkaMessage) -> int:
        """Returns offset of appended message"""
        message.offset = len(self.log)
        self.log.append(message)
        return message.offset

    def read(self, offset: int, max_count: int = 100) -> list:
        """Read messages starting from offset"""
        return self.log[offset:offset + max_count]

Partition Assignment

class DefaultPartitioner:
    """
    Kafka's default partitioning strategy.
    With key: consistent hashing to same partition.
    Without key: round-robin across partitions.
    """

    def partition(self, key: bytes, num_partitions: int) -> int:
        if key is None:
            # Round-robin (sticky partitioner in modern Kafka)
            return self._round_robin(num_partitions)
        # Consistent hash: same key always goes to same partition
        # Ensures ordering for messages with same key
        return murmur2_hash(key) % num_partitions

# Key insight: all messages with same key go to same partition
# → ordering guaranteed for same key within a topic

# Example: order events for the same order_id
producer.send('order_events',
    key=str(order_id).encode(),    # same order → same partition → ordered
    value=serialize(event)
)

Producer: Durability Guarantees

class KafkaProducerConfig:
    """
    acks controls durability vs throughput tradeoff:
    acks=0: fire and forget (fastest, may lose messages)
    acks=1: leader acknowledges (lost if leader fails before replication)
    acks=all: all in-sync replicas acknowledge (safest, slower)
    """

    # High throughput (some data loss acceptable: metrics, logs)
    fast_config = {
        'acks': 1,
        'batch.size': 16384,        # 16KB batch
        'linger.ms': 5,             # wait 5ms to accumulate batch
        'compression.type': 'snappy',
        'buffer.memory': 33554432,  # 32MB producer buffer
    }

    # High durability (financial transactions, audit logs)
    safe_config = {
        'acks': 'all',
        'min.insync.replicas': 2,   # at least 2 replicas must ack
        'retries': 3,
        'enable.idempotence': True,  # dedup producer retries (exactly-once semantics)
        'compression.type': 'lz4',
    }

class Producer:
    def send(self, topic: str, key: bytes, value: bytes,
             callback=None) -> None:
        """
        Asynchronous send: messages batched in memory buffer.
        Batch sent when: batch.size reached OR linger.ms elapsed.
        On failure: automatic retry with exponential backoff.
        """
        message = KafkaMessage(key=key, value=value,
                               timestamp=current_time_ms())
        partition = self.partitioner.partition(key, self.num_partitions(topic))
        self.buffer[topic][partition].append(message)

        if self._should_flush(topic, partition):
            self._flush(topic, partition, callback)

Consumer Group: Parallel Processing

class ConsumerGroup:
    """
    Multiple consumers sharing work by splitting partitions.
    Rule: each partition assigned to exactly ONE consumer in group.
    If consumers > partitions: some consumers are idle.
    Rebalancing: triggered when consumer joins/leaves.
    
    Kafka's Group Coordinator (broker) manages assignments.
    """

    def assign_partitions(self, consumers: list[str],
                          partitions: list[int]) -> dict:
        """
        Range assignor (default): assign contiguous partitions.
        Round-robin assignor: distribute evenly across consumers.
        Sticky assignor: minimize partition movements on rebalance.
        """
        assignment = {}
        # Round-robin for balanced distribution
        for i, partition in enumerate(partitions):
            consumer = consumers[i % len(consumers)]
            if consumer not in assignment:
                assignment[consumer] = []
            assignment[consumer].append(partition)
        return assignment

class Consumer:
    def __init__(self, group_id: str, bootstrap_servers: str):
        self.group_id = group_id
        self.offsets: dict[tuple, int] = {}  # (topic, partition) → committed offset

    def poll(self, timeout_ms: int = 1000) -> list:
        """
        Fetch messages from assigned partitions.
        Returns records from current offset position.
        Consumer tracks its own offset — broker doesn't push.
        """
        records = []
        for (topic, partition), offset in self.offsets.items():
            new_records = self.fetch_from_broker(topic, partition, offset)
            records.extend(new_records)
            if new_records:
                self.offsets[(topic, partition)] = new_records[-1].offset + 1
        return records

    def commit_offset(self, topic: str, partition: int, offset: int):
        """
        Commit offset = "I've processed up to this offset."
        Auto-commit: enabled by default (enable.auto.commit=true, auto.commit.interval.ms=5000)
        Manual commit: commit AFTER processing for at-least-once guarantee.
        """
        self.offsets[(topic, partition)] = offset
        # Persist to __consumer_offsets topic (internal Kafka topic)
        self.broker.commit(self.group_id, topic, partition, offset)

# At-least-once processing pattern:
def process_messages(consumer):
    while True:
        records = consumer.poll(timeout_ms=1000)
        for record in records:
            try:
                process(record)  # Your business logic
                consumer.commit_offset(record.topic, record.partition, record.offset + 1)
            except Exception as e:
                # Don't commit offset → message will be redelivered
                log_error(e)
                # Implement DLQ (Dead Letter Queue) for repeated failures

Replication and Leader Election

class ReplicationManager:
    """
    Each partition has 1 leader + N-1 followers (replication factor N).
    Leader handles all reads and writes.
    Followers replicate from leader continuously.
    In-Sync Replicas (ISR): followers caught up with leader.
    
    Leader failure: ZooKeeper notifies cluster, new leader elected from ISR.
    """

    # Replication flow:
    # 1. Producer → Leader (write to log)
    # 2. Followers pull from leader (FetchRequest)
    # 3. Leader tracks ISR: followers that are within replica.lag.time.max.ms
    # 4. High Watermark advances when all ISR replicas have the message
    # 5. Consumers only read up to High Watermark (committed messages)

    def handle_follower_fetch(self, partition: str, fetch_offset: int,
                              follower_id: int) -> list:
        """Returns messages from fetch_offset to end of leader log"""
        leader_partition = self.partitions[partition]
        messages = leader_partition.read(fetch_offset)
        # Update ISR tracking
        self.update_follower_offset(partition, follower_id, fetch_offset)
        self.advance_high_watermark(partition)
        return messages

    def advance_high_watermark(self, partition_id: str):
        """HW = min(offset) across all ISR followers"""
        partition = self.partitions[partition_id]
        isr_offsets = [
            self.follower_offsets[partition_id][follower]
            for follower in partition.follower_broker_ids
            if self.is_in_sync(partition_id, follower)
        ]
        if isr_offsets:
            partition.hw = min(isr_offsets)

Key Kafka Design Decisions

  • Log-structured storage: Messages written sequentially to append-only log segments on disk. Sequential I/O is 100x faster than random I/O — critical for Kafka’s throughput. Old segments deleted based on retention policy.
  • Zero-copy transfer: Kafka uses OS-level sendfile() to transfer data from disk directly to network socket without copying to user space. Eliminates CPU overhead for data transfer — massive throughput improvement.
  • Consumer pulls, not push: Consumers control their own pace. No back-pressure needed. Broker doesn’t need per-consumer state (just committed offsets in __consumer_offsets topic). Simple, scalable.
  • Batching everywhere: Producers batch messages before sending. Brokers batch messages before writing to disk. Network transfers batched. Batching amortizes per-message overhead — critical for million-message-per-second throughput.
  • Message ordering guarantee: Ordering only guaranteed within a partition, not across partitions. Design: use a consistent partition key (order_id, user_id) to ensure related messages go to the same partition.

Kafka vs Traditional Message Queues (RabbitMQ)

Feature Kafka RabbitMQ
Message model Persistent log (replay) Queue (delete on consume)
Consumer model Pull, offset-based Push, acknowledgment-based
Ordering Within partition Within queue
Replayability Yes (retention period) No (deleted after ACK)
Multiple consumers Consumer groups (all read) Competing consumers (one gets)
Throughput Millions/sec Tens of thousands/sec
Use case Event streaming, analytics, CDC Task queues, RPC, routing


{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”What is a Kafka topic, partition, and offset?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Topic: a named, ordered stream of records (like a table in a database, but for events). Topics are split into partitions for parallelism. Partition: an ordered, immutable sequence of records within a topic. Each partition is a separate ordered log. Records are assigned a monotonically increasing offset within their partition. New records appended to the end. Offset: the unique ID of a record within a partition. Consumers track which offset they’ve read. Records in different partitions have no ordering relationship. Rule: messages with the same key always go to the same partition (deterministic hashing), ensuring ordering for related events.”}},{“@type”:”Question”,”name”:”What are Kafka’s durability guarantees and how are they configured?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Producer acks setting: acks=0 — no durability (fire and forget); acks=1 — leader acknowledges, lost if leader dies before replication (throughput-optimized for logs); acks=all — all in-sync replicas (ISR) acknowledge, no data loss unless entire ISR fails simultaneously (strongest guarantee). Combined with min.insync.replicas=2 and replication.factor=3: even if one broker dies, at least one other replica is current, and the producer gets an error if fewer than 2 replicas are available. For financial systems: acks=all, min.insync.replicas=2, enable.idempotence=true (deduplicates producer retries for exactly-once semantics).”}},{“@type”:”Question”,”name”:”How do Kafka consumer groups work?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Consumer group: a set of consumers that jointly consume a topic. Kafka ensures each partition is consumed by exactly ONE consumer within a group. If you have 4 partitions and 2 consumers: each consumer gets 2 partitions. If you add a 3rd consumer: rebalancing occurs, each gets ~1-2 partitions. If consumers > partitions: extra consumers are idle. Multiple consumer groups can consume the same topic independently — each group maintains its own offset for each partition. This allows different services to process the same event stream independently (e.g., one group for real-time processing, another for batch analytics).”}},{“@type”:”Question”,”name”:”Why is Kafka so much faster than traditional message queues?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Key architectural choices: (1) Sequential disk I/O — Kafka appends messages to log segments sequentially. Sequential I/O is 100x faster than random I/O on spinning disks, and nearly as fast on SSDs. (2) Zero-copy transfer — OS sendfile() transfers data from disk to network socket without copying to user space or kernel buffers. Eliminates CPU overhead for data transfer. (3) Batching — producers batch hundreds of messages before sending; brokers batch before writing to disk; consumers fetch batches. Batching amortizes per-message overhead. (4) Consumer pull — consumers control their own pace, no backpressure mechanisms needed. (5) Partitioned parallelism — multiple partitions allow parallel writes and reads across brokers.”}},{“@type”:”Question”,”name”:”When should you use Kafka vs RabbitMQ?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Choose Kafka when: (1) High throughput required (millions of messages/sec), (2) Messages need to be replayed (event sourcing, audit logs, ML training), (3) Multiple independent consumers need same data (fan-out), (4) Long retention needed (hours to days), (5) Stream processing (Kafka Streams, Flink). Choose RabbitMQ when: (1) Complex routing rules (topic/fanout/direct exchanges), (2) Task queues where each task processed by exactly one worker, (3) Request-reply patterns (RPC), (4) Message priorities, (5) Lower throughput but complex routing requirements. Rule of thumb: Kafka for event streaming, analytics pipelines, and high-throughput logging; RabbitMQ for task distribution and complex message routing.”}}]}

Scroll to Top