Low-Level Design: Notification System
A notification system delivers messages to users via multiple channels (push, email, SMS, in-app). It must handle high volume (millions of notifications per day), support user preferences, throttle noisy senders, and track delivery status. This is a common OOP interview at Meta, Uber, LinkedIn, and Airbnb.
Core Classes
Enums
from enum import Enum
class NotificationChannel(Enum):
PUSH = "PUSH"
EMAIL = "EMAIL"
SMS = "SMS"
IN_APP = "IN_APP"
class NotificationStatus(Enum):
PENDING = "PENDING"
SENT = "SENT"
DELIVERED = "DELIVERED"
FAILED = "FAILED"
SKIPPED = "SKIPPED" # user preference or throttle
Notification and Template
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid
@dataclass
class NotificationTemplate:
template_id: str
channel: NotificationChannel
subject_template: str # "{actor} liked your post"
body_template: str # "{actor} liked your post: '{post_title}'"
def render(self, context: dict) -> tuple[str, str]:
"""Render subject and body with context variables."""
subject = self.subject_template.format(**context)
body = self.body_template.format(**context)
return subject, body
@dataclass
class Notification:
notification_id: str = field(default_factory=lambda: str(uuid.uuid4()))
user_id: str = ""
channel: NotificationChannel = NotificationChannel.IN_APP
subject: str = ""
body: str = ""
status: NotificationStatus = NotificationStatus.PENDING
created_at: datetime = field(default_factory=datetime.now)
sent_at: datetime = None
metadata: dict = field(default_factory=dict)
User Preferences
@dataclass
class UserNotificationPreferences:
user_id: str
enabled_channels: set = field(default_factory=lambda: {
NotificationChannel.PUSH,
NotificationChannel.EMAIL,
NotificationChannel.IN_APP,
})
quiet_hours_start: int = 22 # 10 PM
quiet_hours_end: int = 8 # 8 AM
max_per_hour: dict = field(default_factory=lambda: {
NotificationChannel.PUSH: 5,
NotificationChannel.EMAIL: 10,
NotificationChannel.SMS: 2,
NotificationChannel.IN_APP: 50,
})
def is_channel_enabled(self, channel: NotificationChannel) -> bool:
return channel in self.enabled_channels
def is_quiet_hour(self, hour: int) -> bool:
if self.quiet_hours_start > self.quiet_hours_end:
# Wraps midnight: start=22, end=8 → quiet 22,23,0,1,...,7
return hour >= self.quiet_hours_start or hour < self.quiet_hours_end
return self.quiet_hours_start <= hour < self.quiet_hours_end
Channel Handlers (Strategy Pattern)
from abc import ABC, abstractmethod
class ChannelHandler(ABC):
@abstractmethod
def send(self, notification: Notification) -> bool:
"""Send notification. Returns True on success."""
pass
class PushNotificationHandler(ChannelHandler):
def send(self, notification: Notification) -> bool:
device_token = notification.metadata.get("device_token")
if not device_token:
print(f"[Push] No device token for user {notification.user_id}")
return False
# In real system: call FCM/APNs API
print(f"[Push] Sent to {notification.user_id}: {notification.subject}")
return True
class EmailHandler(ChannelHandler):
def send(self, notification: Notification) -> bool:
email = notification.metadata.get("email")
if not email:
return False
# In real system: call SendGrid/SES API
print(f"[Email] Sent to {email}: {notification.subject}")
return True
class SMSHandler(ChannelHandler):
def send(self, notification: Notification) -> bool:
phone = notification.metadata.get("phone")
if not phone:
return False
# In real system: call Twilio API
print(f"[SMS] Sent to {phone}: {notification.body[:160]}")
return True
class InAppHandler(ChannelHandler):
def __init__(self):
self._inbox: dict[str, list[Notification]] = {}
def send(self, notification: Notification) -> bool:
self._inbox.setdefault(notification.user_id, []).append(notification)
print(f"[InApp] Stored for {notification.user_id}: {notification.subject}")
return True
def get_inbox(self, user_id: str) -> list[Notification]:
return self._inbox.get(user_id, [])
NotificationService (Orchestrator)
from collections import defaultdict
from datetime import datetime
class NotificationService:
def __init__(self):
self._handlers: dict[NotificationChannel, ChannelHandler] = {
NotificationChannel.PUSH: PushNotificationHandler(),
NotificationChannel.EMAIL: EmailHandler(),
NotificationChannel.SMS: SMSHandler(),
NotificationChannel.IN_APP: InAppHandler(),
}
self._preferences: dict[str, UserNotificationPreferences] = {}
self._templates: dict[str, NotificationTemplate] = {}
# Rate limiting: user_id -> channel -> list of timestamps
self._sent_times: dict[str, dict] = defaultdict(lambda: defaultdict(list))
def register_preferences(self, prefs: UserNotificationPreferences) -> None:
self._preferences[prefs.user_id] = prefs
def register_template(self, template: NotificationTemplate) -> None:
self._templates[template.template_id] = template
def _is_rate_limited(self, user_id: str, channel: NotificationChannel) -> bool:
prefs = self._preferences.get(user_id)
if not prefs:
return False
max_per_hour = prefs.max_per_hour.get(channel, float('inf'))
now = datetime.now()
# Keep only timestamps from the last hour
recent = [
t for t in self._sent_times[user_id][channel]
if (now - t).total_seconds() = max_per_hour
def send(
self,
user_id: str,
template_id: str,
context: dict,
user_metadata: dict = None,
channels: list = None,
) -> list[Notification]:
prefs = self._preferences.get(user_id)
template = self._templates.get(template_id)
if not template:
raise ValueError(f"Template {template_id} not found")
target_channels = channels or [template.channel]
now = datetime.now()
results = []
for channel in target_channels:
# Check user preferences
if prefs and not prefs.is_channel_enabled(channel):
print(f"Skipped {channel.value}: disabled by user {user_id}")
continue
# Check quiet hours (skip push/SMS during quiet hours)
if prefs and channel in (NotificationChannel.PUSH, NotificationChannel.SMS):
if prefs.is_quiet_hour(now.hour):
print(f"Skipped {channel.value}: quiet hours for {user_id}")
continue
# Check rate limit
if self._is_rate_limited(user_id, channel):
print(f"Skipped {channel.value}: rate limit for {user_id}")
continue
subject, body = template.render(context)
notification = Notification(
user_id=user_id,
channel=channel,
subject=subject,
body=body,
metadata=user_metadata or {},
)
handler = self._handlers[channel]
success = handler.send(notification)
if success:
notification.status = NotificationStatus.SENT
notification.sent_at = datetime.now()
self._sent_times[user_id][channel].append(now)
else:
notification.status = NotificationStatus.FAILED
results.append(notification)
return results
Usage Example
service = NotificationService()
# Setup preferences
prefs = UserNotificationPreferences(
user_id="U001",
enabled_channels={NotificationChannel.PUSH, NotificationChannel.EMAIL},
quiet_hours_start=22,
quiet_hours_end=8,
)
service.register_preferences(prefs)
# Register template
template = NotificationTemplate(
template_id="like_notification",
channel=NotificationChannel.PUSH,
subject_template="{actor} liked your post",
body_template="{actor} liked your post: '{post_title}'",
)
service.register_template(template)
# Send notification
notifications = service.send(
user_id="U001",
template_id="like_notification",
context={"actor": "Bob", "post_title": "System Design Tips"},
user_metadata={"device_token": "abc123", "email": "alice@example.com"},
channels=[NotificationChannel.PUSH, NotificationChannel.EMAIL],
)
Interview Follow-ups
- Digest mode: Instead of sending each like individually, batch likes into a “Bob and 5 others liked your post” digest every 30 minutes. Store pending notifications in a digest queue; flush on timer.
- Priority: Add priority levels (URGENT overrides quiet hours and rate limits). Used for security alerts, password resets.
- Delivery tracking: Webhooks from email providers (SendGrid) and push services (FCM) call back with DELIVERED or BOUNCED status — update notification records accordingly.
- Async dispatch: In production, service.send() enqueues to Kafka/SQS and returns immediately. Worker pools consume and dispatch to channel handlers.
🏢 Asked at: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
🏢 Asked at: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering
🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale
🏢 Asked at: Airbnb Interview Guide 2026: Search Systems, Trust and Safety, and Full-Stack Engineering
🏢 Asked at: Snap Interview Guide