Low-Level Design: Notification System (OOP Interview)

Low-Level Design: Notification System

A notification system delivers messages to users via multiple channels (push, email, SMS, in-app). It must handle high volume (millions of notifications per day), support user preferences, throttle noisy senders, and track delivery status. This is a common OOP interview at Meta, Uber, LinkedIn, and Airbnb.

Core Classes

Enums

from enum import Enum

class NotificationChannel(Enum):
    PUSH  = "PUSH"
    EMAIL = "EMAIL"
    SMS   = "SMS"
    IN_APP = "IN_APP"

class NotificationStatus(Enum):
    PENDING   = "PENDING"
    SENT      = "SENT"
    DELIVERED = "DELIVERED"
    FAILED    = "FAILED"
    SKIPPED   = "SKIPPED"   # user preference or throttle

Notification and Template

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid

@dataclass
class NotificationTemplate:
    template_id: str
    channel: NotificationChannel
    subject_template: str    # "{actor} liked your post"
    body_template: str       # "{actor} liked your post: '{post_title}'"

    def render(self, context: dict) -> tuple[str, str]:
        """Render subject and body with context variables."""
        subject = self.subject_template.format(**context)
        body = self.body_template.format(**context)
        return subject, body

@dataclass
class Notification:
    notification_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    user_id: str = ""
    channel: NotificationChannel = NotificationChannel.IN_APP
    subject: str = ""
    body: str = ""
    status: NotificationStatus = NotificationStatus.PENDING
    created_at: datetime = field(default_factory=datetime.now)
    sent_at: datetime = None
    metadata: dict = field(default_factory=dict)

User Preferences

@dataclass
class UserNotificationPreferences:
    user_id: str
    enabled_channels: set = field(default_factory=lambda: {
        NotificationChannel.PUSH,
        NotificationChannel.EMAIL,
        NotificationChannel.IN_APP,
    })
    quiet_hours_start: int = 22   # 10 PM
    quiet_hours_end: int = 8      # 8 AM
    max_per_hour: dict = field(default_factory=lambda: {
        NotificationChannel.PUSH: 5,
        NotificationChannel.EMAIL: 10,
        NotificationChannel.SMS: 2,
        NotificationChannel.IN_APP: 50,
    })

    def is_channel_enabled(self, channel: NotificationChannel) -> bool:
        return channel in self.enabled_channels

    def is_quiet_hour(self, hour: int) -> bool:
        if self.quiet_hours_start > self.quiet_hours_end:
            # Wraps midnight: start=22, end=8 → quiet 22,23,0,1,...,7
            return hour >= self.quiet_hours_start or hour < self.quiet_hours_end
        return self.quiet_hours_start <= hour < self.quiet_hours_end

Channel Handlers (Strategy Pattern)

from abc import ABC, abstractmethod

class ChannelHandler(ABC):
    @abstractmethod
    def send(self, notification: Notification) -> bool:
        """Send notification. Returns True on success."""
        pass

class PushNotificationHandler(ChannelHandler):
    def send(self, notification: Notification) -> bool:
        device_token = notification.metadata.get("device_token")
        if not device_token:
            print(f"[Push] No device token for user {notification.user_id}")
            return False
        # In real system: call FCM/APNs API
        print(f"[Push] Sent to {notification.user_id}: {notification.subject}")
        return True

class EmailHandler(ChannelHandler):
    def send(self, notification: Notification) -> bool:
        email = notification.metadata.get("email")
        if not email:
            return False
        # In real system: call SendGrid/SES API
        print(f"[Email] Sent to {email}: {notification.subject}")
        return True

class SMSHandler(ChannelHandler):
    def send(self, notification: Notification) -> bool:
        phone = notification.metadata.get("phone")
        if not phone:
            return False
        # In real system: call Twilio API
        print(f"[SMS] Sent to {phone}: {notification.body[:160]}")
        return True

class InAppHandler(ChannelHandler):
    def __init__(self):
        self._inbox: dict[str, list[Notification]] = {}

    def send(self, notification: Notification) -> bool:
        self._inbox.setdefault(notification.user_id, []).append(notification)
        print(f"[InApp] Stored for {notification.user_id}: {notification.subject}")
        return True

    def get_inbox(self, user_id: str) -> list[Notification]:
        return self._inbox.get(user_id, [])

NotificationService (Orchestrator)

from collections import defaultdict
from datetime import datetime

class NotificationService:
    def __init__(self):
        self._handlers: dict[NotificationChannel, ChannelHandler] = {
            NotificationChannel.PUSH:   PushNotificationHandler(),
            NotificationChannel.EMAIL:  EmailHandler(),
            NotificationChannel.SMS:    SMSHandler(),
            NotificationChannel.IN_APP: InAppHandler(),
        }
        self._preferences: dict[str, UserNotificationPreferences] = {}
        self._templates: dict[str, NotificationTemplate] = {}
        # Rate limiting: user_id -> channel -> list of timestamps
        self._sent_times: dict[str, dict] = defaultdict(lambda: defaultdict(list))

    def register_preferences(self, prefs: UserNotificationPreferences) -> None:
        self._preferences[prefs.user_id] = prefs

    def register_template(self, template: NotificationTemplate) -> None:
        self._templates[template.template_id] = template

    def _is_rate_limited(self, user_id: str, channel: NotificationChannel) -> bool:
        prefs = self._preferences.get(user_id)
        if not prefs:
            return False
        max_per_hour = prefs.max_per_hour.get(channel, float('inf'))
        now = datetime.now()
        # Keep only timestamps from the last hour
        recent = [
            t for t in self._sent_times[user_id][channel]
            if (now - t).total_seconds() = max_per_hour

    def send(
        self,
        user_id: str,
        template_id: str,
        context: dict,
        user_metadata: dict = None,
        channels: list = None,
    ) -> list[Notification]:
        prefs = self._preferences.get(user_id)
        template = self._templates.get(template_id)
        if not template:
            raise ValueError(f"Template {template_id} not found")

        target_channels = channels or [template.channel]
        now = datetime.now()
        results = []

        for channel in target_channels:
            # Check user preferences
            if prefs and not prefs.is_channel_enabled(channel):
                print(f"Skipped {channel.value}: disabled by user {user_id}")
                continue

            # Check quiet hours (skip push/SMS during quiet hours)
            if prefs and channel in (NotificationChannel.PUSH, NotificationChannel.SMS):
                if prefs.is_quiet_hour(now.hour):
                    print(f"Skipped {channel.value}: quiet hours for {user_id}")
                    continue

            # Check rate limit
            if self._is_rate_limited(user_id, channel):
                print(f"Skipped {channel.value}: rate limit for {user_id}")
                continue

            subject, body = template.render(context)
            notification = Notification(
                user_id=user_id,
                channel=channel,
                subject=subject,
                body=body,
                metadata=user_metadata or {},
            )

            handler = self._handlers[channel]
            success = handler.send(notification)
            if success:
                notification.status = NotificationStatus.SENT
                notification.sent_at = datetime.now()
                self._sent_times[user_id][channel].append(now)
            else:
                notification.status = NotificationStatus.FAILED

            results.append(notification)

        return results

Usage Example

service = NotificationService()

# Setup preferences
prefs = UserNotificationPreferences(
    user_id="U001",
    enabled_channels={NotificationChannel.PUSH, NotificationChannel.EMAIL},
    quiet_hours_start=22,
    quiet_hours_end=8,
)
service.register_preferences(prefs)

# Register template
template = NotificationTemplate(
    template_id="like_notification",
    channel=NotificationChannel.PUSH,
    subject_template="{actor} liked your post",
    body_template="{actor} liked your post: '{post_title}'",
)
service.register_template(template)

# Send notification
notifications = service.send(
    user_id="U001",
    template_id="like_notification",
    context={"actor": "Bob", "post_title": "System Design Tips"},
    user_metadata={"device_token": "abc123", "email": "alice@example.com"},
    channels=[NotificationChannel.PUSH, NotificationChannel.EMAIL],
)

Interview Follow-ups

  • Digest mode: Instead of sending each like individually, batch likes into a “Bob and 5 others liked your post” digest every 30 minutes. Store pending notifications in a digest queue; flush on timer.
  • Priority: Add priority levels (URGENT overrides quiet hours and rate limits). Used for security alerts, password resets.
  • Delivery tracking: Webhooks from email providers (SendGrid) and push services (FCM) call back with DELIVERED or BOUNCED status — update notification records accordingly.
  • Async dispatch: In production, service.send() enqueues to Kafka/SQS and returns immediately. Worker pools consume and dispatch to channel handlers.

🏢 Asked at: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

🏢 Asked at: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering

🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale

🏢 Asked at: Airbnb Interview Guide 2026: Search Systems, Trust and Safety, and Full-Stack Engineering

🏢 Asked at: Snap Interview Guide

Scroll to Top