Low-Level Design: Pub/Sub Event System
The Publish-Subscribe pattern is one of the most important OOP design patterns. It decouples event producers from event consumers — publishers don’t know who’s listening, and subscribers don’t know who’s sending. This pattern underlies event-driven architectures, UI frameworks, and distributed systems. Designing a clean in-process pub/sub is a common OOP interview at companies like Atlassian, Uber, and Meta.
Core Classes
Event
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass
class Event:
event_type: str
payload: Any
source: str
timestamp: datetime = field(default_factory=datetime.now)
event_id: str = field(default_factory=lambda: __import__('uuid').uuid4().hex)
def __str__(self) -> str:
return f"Event({self.event_type}, source={self.source}, id={self.event_id[:8]})"
Subscriber (Abstract)
from abc import ABC, abstractmethod
class Subscriber(ABC):
@abstractmethod
def on_event(self, event: Event) -> None:
"""Handle an incoming event."""
pass
@property
def subscriber_id(self) -> str:
return f"{self.__class__.__name__}_{id(self)}"
EventBus
from collections import defaultdict
import threading
class EventBus:
"""
Central pub/sub coordinator.
Thread-safe: multiple threads can publish and subscribe concurrently.
"""
def __init__(self):
self._subscribers: dict[str, list[Subscriber]] = defaultdict(list)
self._lock = threading.RLock() # RLock allows same thread to re-acquire
def subscribe(self, event_type: str, subscriber: Subscriber) -> None:
with self._lock:
# Prevent duplicate subscriptions
existing_ids = {s.subscriber_id for s in self._subscribers[event_type]}
if subscriber.subscriber_id not in existing_ids:
self._subscribers[event_type].append(subscriber)
print(f"Subscribed: {subscriber.subscriber_id} -> {event_type}")
def unsubscribe(self, event_type: str, subscriber: Subscriber) -> None:
with self._lock:
self._subscribers[event_type] = [
s for s in self._subscribers[event_type]
if s.subscriber_id != subscriber.subscriber_id
]
def publish(self, event: Event) -> int:
"""
Publish event to all subscribers.
Returns number of subscribers notified.
"""
with self._lock:
subscribers = list(self._subscribers.get(event.event_type, []))
notified = 0
for subscriber in subscribers:
try:
subscriber.on_event(event)
notified += 1
except Exception as e:
print(f"Error in {subscriber.subscriber_id}: {e}")
# Don't let one failing subscriber block others
return notified
def publish_all(self, event: Event) -> int:
"""Also publish to wildcard (*) subscribers."""
count = self.publish(event)
if event.event_type != "*":
# Shallow copy to avoid lock issues
with self._lock:
wildcard_subs = list(self._subscribers.get("*", []))
for subscriber in wildcard_subs:
try:
subscriber.on_event(event)
count += 1
except Exception as e:
print(f"Error in wildcard subscriber {subscriber.subscriber_id}: {e}")
return count
def subscriber_count(self, event_type: str) -> int:
with self._lock:
return len(self._subscribers[event_type])
Concrete Subscribers
class EmailNotificationService(Subscriber):
def on_event(self, event: Event) -> None:
if event.event_type == "user.registered":
user = event.payload
print(f"[Email] Sending welcome email to {user['email']}")
elif event.event_type == "order.placed":
order = event.payload
print(f"[Email] Order confirmation sent for order {order['order_id']}")
class AuditLogger(Subscriber):
def __init__(self):
self.log: list[Event] = []
def on_event(self, event: Event) -> None:
self.log.append(event)
print(f"[Audit] {event.timestamp.strftime('%H:%M:%S')} | {event}")
class AnalyticsService(Subscriber):
def on_event(self, event: Event) -> None:
print(f"[Analytics] Tracking: {event.event_type} from {event.source}")
# In a real system: send to data pipeline (Kafka, Segment, etc.)
Publisher Helper
class Publisher:
"""Convenience class for publishing events with a consistent source name."""
def __init__(self, source: str, event_bus: EventBus):
self.source = source
self.event_bus = event_bus
def publish(self, event_type: str, payload: Any) -> None:
event = Event(event_type=event_type, payload=payload, source=self.source)
count = self.event_bus.publish_all(event)
print(f"Published {event_type}: notified {count} subscriber(s)")
Usage Example
bus = EventBus()
# Subscribers
email_svc = EmailNotificationService()
audit_log = AuditLogger()
analytics = AnalyticsService()
# Subscriptions
bus.subscribe("user.registered", email_svc)
bus.subscribe("user.registered", analytics)
bus.subscribe("order.placed", email_svc)
bus.subscribe("*", audit_log) # audit ALL events
# Publishers
user_svc = Publisher("UserService", bus)
order_svc = Publisher("OrderService", bus)
# Simulate events
user_svc.publish("user.registered", {"user_id": "U001", "email": "alice@example.com"})
order_svc.publish("order.placed", {"order_id": "O42", "user_id": "U001", "total": 99.99})
print(f"Audit log entries: {len(audit_log.log)}")
Interview Follow-ups
- Async delivery: Modify publish() to dispatch events on a thread pool executor rather than blocking the publisher. Use concurrent.futures.ThreadPoolExecutor. Subscribers process in parallel.
- Event filtering: Allow subscribers to register with a filter predicate:
subscribe("order.*", subscriber, filter=lambda e: e.payload["total"] > 100). The EventBus checks the predicate before dispatching. - Dead letter queue: When a subscriber raises an exception, enqueue the event in a dead_letter list for later replay rather than discarding it silently.
- Topic wildcards: Support glob patterns like “order.*” matching “order.placed”, “order.shipped”, etc. Use fnmatch.fnmatch() for matching.
- Why use RLock? If publish() calls subscriber.on_event(), which itself calls bus.subscribe() (a subscriber that subscribes others in response to events), a regular Lock would deadlock. RLock allows the same thread to re-enter.
🏢 Asked at: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering
🏢 Asked at: Atlassian Interview Guide
🏢 Asked at: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale
🏢 Asked at: Snap Interview Guide