Low-Level Design: Pub/Sub Event System (OOP Interview)

Low-Level Design: Pub/Sub Event System

The Publish-Subscribe pattern is one of the most important OOP design patterns. It decouples event producers from event consumers — publishers don’t know who’s listening, and subscribers don’t know who’s sending. This pattern underlies event-driven architectures, UI frameworks, and distributed systems. Designing a clean in-process pub/sub is a common OOP interview at companies like Atlassian, Uber, and Meta.

Core Classes

Event

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any

@dataclass
class Event:
    event_type: str
    payload: Any
    source: str
    timestamp: datetime = field(default_factory=datetime.now)
    event_id: str = field(default_factory=lambda: __import__('uuid').uuid4().hex)

    def __str__(self) -> str:
        return f"Event({self.event_type}, source={self.source}, id={self.event_id[:8]})"

Subscriber (Abstract)

from abc import ABC, abstractmethod

class Subscriber(ABC):
    @abstractmethod
    def on_event(self, event: Event) -> None:
        """Handle an incoming event."""
        pass

    @property
    def subscriber_id(self) -> str:
        return f"{self.__class__.__name__}_{id(self)}"

EventBus

from collections import defaultdict
import threading

class EventBus:
    """
    Central pub/sub coordinator.
    Thread-safe: multiple threads can publish and subscribe concurrently.
    """
    def __init__(self):
        self._subscribers: dict[str, list[Subscriber]] = defaultdict(list)
        self._lock = threading.RLock()   # RLock allows same thread to re-acquire

    def subscribe(self, event_type: str, subscriber: Subscriber) -> None:
        with self._lock:
            # Prevent duplicate subscriptions
            existing_ids = {s.subscriber_id for s in self._subscribers[event_type]}
            if subscriber.subscriber_id not in existing_ids:
                self._subscribers[event_type].append(subscriber)
                print(f"Subscribed: {subscriber.subscriber_id} -> {event_type}")

    def unsubscribe(self, event_type: str, subscriber: Subscriber) -> None:
        with self._lock:
            self._subscribers[event_type] = [
                s for s in self._subscribers[event_type]
                if s.subscriber_id != subscriber.subscriber_id
            ]

    def publish(self, event: Event) -> int:
        """
        Publish event to all subscribers.
        Returns number of subscribers notified.
        """
        with self._lock:
            subscribers = list(self._subscribers.get(event.event_type, []))

        notified = 0
        for subscriber in subscribers:
            try:
                subscriber.on_event(event)
                notified += 1
            except Exception as e:
                print(f"Error in {subscriber.subscriber_id}: {e}")
                # Don't let one failing subscriber block others
        return notified

    def publish_all(self, event: Event) -> int:
        """Also publish to wildcard (*) subscribers."""
        count = self.publish(event)
        if event.event_type != "*":
            # Shallow copy to avoid lock issues
            with self._lock:
                wildcard_subs = list(self._subscribers.get("*", []))
            for subscriber in wildcard_subs:
                try:
                    subscriber.on_event(event)
                    count += 1
                except Exception as e:
                    print(f"Error in wildcard subscriber {subscriber.subscriber_id}: {e}")
        return count

    def subscriber_count(self, event_type: str) -> int:
        with self._lock:
            return len(self._subscribers[event_type])

Concrete Subscribers

class EmailNotificationService(Subscriber):
    def on_event(self, event: Event) -> None:
        if event.event_type == "user.registered":
            user = event.payload
            print(f"[Email] Sending welcome email to {user['email']}")
        elif event.event_type == "order.placed":
            order = event.payload
            print(f"[Email] Order confirmation sent for order {order['order_id']}")

class AuditLogger(Subscriber):
    def __init__(self):
        self.log: list[Event] = []

    def on_event(self, event: Event) -> None:
        self.log.append(event)
        print(f"[Audit] {event.timestamp.strftime('%H:%M:%S')} | {event}")

class AnalyticsService(Subscriber):
    def on_event(self, event: Event) -> None:
        print(f"[Analytics] Tracking: {event.event_type} from {event.source}")
        # In a real system: send to data pipeline (Kafka, Segment, etc.)

Publisher Helper

class Publisher:
    """Convenience class for publishing events with a consistent source name."""
    def __init__(self, source: str, event_bus: EventBus):
        self.source = source
        self.event_bus = event_bus

    def publish(self, event_type: str, payload: Any) -> None:
        event = Event(event_type=event_type, payload=payload, source=self.source)
        count = self.event_bus.publish_all(event)
        print(f"Published {event_type}: notified {count} subscriber(s)")

Usage Example

bus = EventBus()

# Subscribers
email_svc = EmailNotificationService()
audit_log = AuditLogger()
analytics = AnalyticsService()

# Subscriptions
bus.subscribe("user.registered", email_svc)
bus.subscribe("user.registered", analytics)
bus.subscribe("order.placed", email_svc)
bus.subscribe("*", audit_log)   # audit ALL events

# Publishers
user_svc = Publisher("UserService", bus)
order_svc = Publisher("OrderService", bus)

# Simulate events
user_svc.publish("user.registered", {"user_id": "U001", "email": "alice@example.com"})
order_svc.publish("order.placed", {"order_id": "O42", "user_id": "U001", "total": 99.99})

print(f"Audit log entries: {len(audit_log.log)}")

Interview Follow-ups

  • Async delivery: Modify publish() to dispatch events on a thread pool executor rather than blocking the publisher. Use concurrent.futures.ThreadPoolExecutor. Subscribers process in parallel.
  • Event filtering: Allow subscribers to register with a filter predicate: subscribe("order.*", subscriber, filter=lambda e: e.payload["total"] > 100). The EventBus checks the predicate before dispatching.
  • Dead letter queue: When a subscriber raises an exception, enqueue the event in a dead_letter list for later replay rather than discarding it silently.
  • Topic wildcards: Support glob patterns like “order.*” matching “order.placed”, “order.shipped”, etc. Use fnmatch.fnmatch() for matching.
  • Why use RLock? If publish() calls subscriber.on_event(), which itself calls bus.subscribe() (a subscriber that subscribes others in response to events), a regular Lock would deadlock. RLock allows the same thread to re-enter.

🏢 Asked at: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering

🏢 Asked at: Atlassian Interview Guide

🏢 Asked at: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale

🏢 Asked at: Snap Interview Guide

Scroll to Top