Low-Level Design: Pub/Sub Event System (OOP Interview)

Low-Level Design: Pub/Sub Event System

The Publish-Subscribe pattern is one of the most important OOP design patterns. It decouples event producers from event consumers — publishers don’t know who’s listening, and subscribers don’t know who’s sending. This pattern underlies event-driven architectures, UI frameworks, and distributed systems. Designing a clean in-process pub/sub is a common OOP interview at companies like Atlassian, Uber, and Meta.

Core Classes

Event

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any

@dataclass
class Event:
    event_type: str
    payload: Any
    source: str
    timestamp: datetime = field(default_factory=datetime.now)
    event_id: str = field(default_factory=lambda: __import__('uuid').uuid4().hex)

    def __str__(self) -> str:
        return f"Event({self.event_type}, source={self.source}, id={self.event_id[:8]})"

Subscriber (Abstract)

from abc import ABC, abstractmethod

class Subscriber(ABC):
    @abstractmethod
    def on_event(self, event: Event) -> None:
        """Handle an incoming event."""
        pass

    @property
    def subscriber_id(self) -> str:
        return f"{self.__class__.__name__}_{id(self)}"

EventBus

from collections import defaultdict
import threading

class EventBus:
    """
    Central pub/sub coordinator.
    Thread-safe: multiple threads can publish and subscribe concurrently.
    """
    def __init__(self):
        self._subscribers: dict[str, list[Subscriber]] = defaultdict(list)
        self._lock = threading.RLock()   # RLock allows same thread to re-acquire

    def subscribe(self, event_type: str, subscriber: Subscriber) -> None:
        with self._lock:
            # Prevent duplicate subscriptions
            existing_ids = {s.subscriber_id for s in self._subscribers[event_type]}
            if subscriber.subscriber_id not in existing_ids:
                self._subscribers[event_type].append(subscriber)
                print(f"Subscribed: {subscriber.subscriber_id} -> {event_type}")

    def unsubscribe(self, event_type: str, subscriber: Subscriber) -> None:
        with self._lock:
            self._subscribers[event_type] = [
                s for s in self._subscribers[event_type]
                if s.subscriber_id != subscriber.subscriber_id
            ]

    def publish(self, event: Event) -> int:
        """
        Publish event to all subscribers.
        Returns number of subscribers notified.
        """
        with self._lock:
            subscribers = list(self._subscribers.get(event.event_type, []))

        notified = 0
        for subscriber in subscribers:
            try:
                subscriber.on_event(event)
                notified += 1
            except Exception as e:
                print(f"Error in {subscriber.subscriber_id}: {e}")
                # Don't let one failing subscriber block others
        return notified

    def publish_all(self, event: Event) -> int:
        """Also publish to wildcard (*) subscribers."""
        count = self.publish(event)
        if event.event_type != "*":
            # Shallow copy to avoid lock issues
            with self._lock:
                wildcard_subs = list(self._subscribers.get("*", []))
            for subscriber in wildcard_subs:
                try:
                    subscriber.on_event(event)
                    count += 1
                except Exception as e:
                    print(f"Error in wildcard subscriber {subscriber.subscriber_id}: {e}")
        return count

    def subscriber_count(self, event_type: str) -> int:
        with self._lock:
            return len(self._subscribers[event_type])

Concrete Subscribers

class EmailNotificationService(Subscriber):
    def on_event(self, event: Event) -> None:
        if event.event_type == "user.registered":
            user = event.payload
            print(f"[Email] Sending welcome email to {user['email']}")
        elif event.event_type == "order.placed":
            order = event.payload
            print(f"[Email] Order confirmation sent for order {order['order_id']}")

class AuditLogger(Subscriber):
    def __init__(self):
        self.log: list[Event] = []

    def on_event(self, event: Event) -> None:
        self.log.append(event)
        print(f"[Audit] {event.timestamp.strftime('%H:%M:%S')} | {event}")

class AnalyticsService(Subscriber):
    def on_event(self, event: Event) -> None:
        print(f"[Analytics] Tracking: {event.event_type} from {event.source}")
        # In a real system: send to data pipeline (Kafka, Segment, etc.)

Publisher Helper

class Publisher:
    """Convenience class for publishing events with a consistent source name."""
    def __init__(self, source: str, event_bus: EventBus):
        self.source = source
        self.event_bus = event_bus

    def publish(self, event_type: str, payload: Any) -> None:
        event = Event(event_type=event_type, payload=payload, source=self.source)
        count = self.event_bus.publish_all(event)
        print(f"Published {event_type}: notified {count} subscriber(s)")

Usage Example

bus = EventBus()

# Subscribers
email_svc = EmailNotificationService()
audit_log = AuditLogger()
analytics = AnalyticsService()

# Subscriptions
bus.subscribe("user.registered", email_svc)
bus.subscribe("user.registered", analytics)
bus.subscribe("order.placed", email_svc)
bus.subscribe("*", audit_log)   # audit ALL events

# Publishers
user_svc = Publisher("UserService", bus)
order_svc = Publisher("OrderService", bus)

# Simulate events
user_svc.publish("user.registered", {"user_id": "U001", "email": "alice@example.com"})
order_svc.publish("order.placed", {"order_id": "O42", "user_id": "U001", "total": 99.99})

print(f"Audit log entries: {len(audit_log.log)}")

Interview Follow-ups

  • Async delivery: Modify publish() to dispatch events on a thread pool executor rather than blocking the publisher. Use concurrent.futures.ThreadPoolExecutor. Subscribers process in parallel.
  • Event filtering: Allow subscribers to register with a filter predicate: subscribe("order.*", subscriber, filter=lambda e: e.payload["total"] > 100). The EventBus checks the predicate before dispatching.
  • Dead letter queue: When a subscriber raises an exception, enqueue the event in a dead_letter list for later replay rather than discarding it silently.
  • Topic wildcards: Support glob patterns like “order.*” matching “order.placed”, “order.shipped”, etc. Use fnmatch.fnmatch() for matching.
  • Why use RLock? If publish() calls subscriber.on_event(), which itself calls bus.subscribe() (a subscriber that subscribes others in response to events), a regular Lock would deadlock. RLock allows the same thread to re-enter.

{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”What is the Observer pattern and how does it differ from Pub/Sub?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”The Observer pattern (Gang of Four) has direct coupling: the Subject (observable) maintains a list of Observer objects and calls their update() method directly. Observers know about the Subject and register with it explicitly. Pub/Sub adds an Event Bus (message broker) between publishers and subscribers — neither knows about the other. Publishers emit events to the bus; subscribers listen to the bus for event types they care about. Key differences: (1) Coupling — Observer is tightly coupled (Subject knows Observer interface); Pub/Sub is decoupled (neither side knows the other). (2) Filtering — Pub/Sub supports topic-based filtering; basic Observer delivers all events. (3) Distribution — Pub/Sub naturally extends to distributed systems (Kafka, Redis pub/sub); Observer is in-process. Use Observer for UI component updates, Model-View connections, and simple event callbacks. Use Pub/Sub for complex event routing, multiple independent consumers, and anything that may need to go distributed.”}},{“@type”:”Question”,”name”:”How do you make a pub/sub event bus thread-safe?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”The EventBus has two critical sections: (1) subscribe/unsubscribe — modifying the subscriber list. (2) publish — reading the subscriber list and calling handlers. Use threading.RLock (reentrant lock) rather than threading.Lock. Why RLock? If a subscriber’s on_event() handler calls bus.subscribe() (e.g., a subscriber that dynamically adds other subscribers based on events), a regular Lock would deadlock — the thread already holds the lock from publish() and tries to acquire it again in subscribe(). RLock allows the same thread to re-enter. In publish(): acquire the lock to get a shallow copy of the subscriber list, release the lock, then iterate the copy and call handlers WITHOUT the lock. This prevents holding the lock during potentially long-running handler code and allows concurrent subscriptions during event dispatch. If a handler throws an exception, catch it individually so one failing subscriber doesn’t block others from receiving the event.”}},{“@type”:”Question”,”name”:”How do you extend a pub/sub system to support asynchronous event delivery?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Replace synchronous subscriber.on_event() calls with thread pool submission. Use concurrent.futures.ThreadPoolExecutor: pool = ThreadPoolExecutor(max_workers=N). In publish(): instead of calling subscriber.on_event(event) directly, call pool.submit(subscriber.on_event, event). This returns a Future immediately — the publisher is not blocked by subscriber processing. Tradeoffs: (1) Ordering — async dispatch means subscribers may process events out of order if previous events take longer. Fix: use a single-threaded executor per subscriber (each subscriber gets its own thread). (2) Error handling — exceptions in submitted tasks are captured in the Future; use future.add_done_callback() to log failures. (3) Backpressure — if subscribers are slow, the thread pool queue grows unbounded. Add a bounded queue and reject-or-block policy. (4) Shutdown — call pool.shutdown(wait=True) on application exit to wait for in-flight events to complete before terminating.”}}]}

🏢 Asked at: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering

🏢 Asked at: Atlassian Interview Guide

🏢 Asked at: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale

🏢 Asked at: Snap Interview Guide

Scroll to Top