Low-Level Design: Notification Service — Push, Email, SMS, Templates, and Deduplication

Requirements

Functional: send notifications via multiple channels (push, email, SMS, in-app), support notification templates with variable substitution, allow users to set preferences (opt-in/opt-out per channel per category), batch notifications (digest), deduplicate (don’t send the same alert twice), track delivery status.

Non-functional: at-least-once delivery, async (never block the triggering action), channel fallback (if push fails, try email), scalable to millions of notifications/day.

Core Entities

from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, List, Dict
from datetime import datetime

class Channel(Enum):
    PUSH  = "PUSH"
    EMAIL = "EMAIL"
    SMS   = "SMS"
    INAPP = "INAPP"

class NotificationType(Enum):
    ORDER_CONFIRMED   = "ORDER_CONFIRMED"
    PAYMENT_FAILED    = "PAYMENT_FAILED"
    MESSAGE_RECEIVED  = "MESSAGE_RECEIVED"
    PRICE_DROP        = "PRICE_DROP"
    ACCOUNT_SECURITY  = "ACCOUNT_SECURITY"

@dataclass
class NotificationTemplate:
    template_id: str
    notification_type: NotificationType
    channel: Channel
    subject: str           # for email
    body: str              # supports {{variable}} placeholders
    priority: int          # 1=critical, 2=high, 3=normal, 4=low

@dataclass
class UserPreferences:
    user_id: str
    # channel -> list of types the user wants on that channel
    subscriptions: Dict[Channel, List[NotificationType]] = field(default_factory=dict)
    dnd_start: Optional[int] = None   # hour 0-23: do not disturb window start
    dnd_end:   Optional[int] = None

@dataclass
class NotificationRequest:
    request_id: str
    user_id: str
    notification_type: NotificationType
    variables: Dict[str, str]          # template variable values
    idempotency_key: str               # prevent duplicate sends
    priority: int = 3
    scheduled_at: Optional[datetime] = None

@dataclass
class NotificationRecord:
    record_id: str
    request_id: str
    user_id: str
    channel: Channel
    notification_type: NotificationType
    status: str      # 'PENDING' | 'SENT' | 'DELIVERED' | 'FAILED' | 'SKIPPED'
    sent_at: Optional[datetime] = None
    error: Optional[str] = None
    external_id: Optional[str] = None   # FCM message ID, SendGrid ID, etc.

Notification Service Architecture

class NotificationService:
    def send(self, request: NotificationRequest) -> List[NotificationRecord]:
        # Idempotency check
        if db.notification_exists(request.idempotency_key):
            return db.get_records_by_idempotency(request.idempotency_key)

        prefs = db.get_user_preferences(request.user_id)
        channels = self._select_channels(request, prefs)
        records = []
        for channel in channels:
            template = db.get_template(request.notification_type, channel)
            if not template: continue
            body = self._render(template.body, request.variables)
            record = NotificationRecord(
                record_id=generate_id(),
                request_id=request.request_id,
                user_id=request.user_id,
                channel=channel,
                notification_type=request.notification_type,
                status='PENDING',
            )
            db.save(record)
            # Dispatch to channel-specific queue
            self._enqueue(channel, record, body, template.subject)
            records.append(record)
        return records

    def _select_channels(self, request, prefs: UserPreferences) -> List[Channel]:
        all_channels = [Channel.PUSH, Channel.EMAIL, Channel.INAPP]
        # ACCOUNT_SECURITY is always sent (cannot opt out)
        if request.notification_type == NotificationType.ACCOUNT_SECURITY:
            return [Channel.PUSH, Channel.EMAIL]
        # Filter by user preferences
        selected = []
        for ch in all_channels:
            user_types = prefs.subscriptions.get(ch, [])
            if request.notification_type in user_types:
                selected.append(ch)
        return selected or [Channel.INAPP]   # always at least in-app

    def _render(self, template: str, variables: dict) -> str:
        for key, value in variables.items():
            template = template.replace('{{' + key + '}}', value)
        return template

Channel Handlers

class PushHandler:
    def send(self, user_id: str, body: str, record: NotificationRecord):
        device_tokens = db.get_device_tokens(user_id)
        if not device_tokens:
            record.status = 'SKIPPED'
            record.error = 'No device tokens'
            db.save(record); return
        for token in device_tokens:
            try:
                response = fcm_client.send({
                    'token': token,
                    'notification': {'title': 'Notification', 'body': body},
                })
                record.status = 'SENT'
                record.external_id = response.message_id
            except Exception as e:
                if 'UNREGISTERED' in str(e):
                    db.remove_device_token(token)   # token expired
                record.status = 'FAILED'
                record.error = str(e)
        db.save(record)

class EmailHandler:
    def send(self, user_id: str, subject: str, body: str, record: NotificationRecord):
        user = db.get_user(user_id)
        if not user.email_verified:
            record.status = 'SKIPPED'; db.save(record); return
        response = sendgrid_client.send(
            to=user.email, subject=subject, html_body=body
        )
        record.status = 'SENT'
        record.external_id = response.headers.get('X-Message-Id')
        db.save(record)

Digest / Batching

class DigestService:
    """Batch low-priority notifications into a daily digest."""
    DIGEST_TYPES = {NotificationType.PRICE_DROP}

    def queue_for_digest(self, request: NotificationRequest):
        db.add_to_digest_queue(request.user_id, request)

    def send_daily_digest(self, user_id: str):
        pending = db.get_digest_queue(user_id)
        if not pending: return
        grouped = {}
        for req in pending:
            grouped.setdefault(req.notification_type.value, []).append(req)
        body = self._render_digest(grouped)
        self._send_email(user_id, 'Your Daily Digest', body)
        db.clear_digest_queue(user_id)

Deduplication

DEDUP_WINDOW_SECONDS = 3600   # 1 hour

def is_duplicate(user_id: str, notification_type: str, dedup_key: str) -> bool:
    cache_key = f"notif_dedup:{user_id}:{notification_type}:{dedup_key}"
    result = r.set(cache_key, 1, ex=DEDUP_WINDOW_SECONDS, nx=True)
    return result is None   # None = key existed → duplicate

Interview Questions

Q: How do you handle a user who has push notifications but is currently offline?

FCM (Firebase Cloud Messaging) queues push notifications when a device is offline and delivers them when the device reconnects (up to 4 weeks). The notification service sends to FCM regardless of online status — FCM handles the delivery. FCM returns a success even if the device is offline (it accepts the message). The device receives it when it comes online. For time-sensitive notifications (flash sale ends in 1 hour), set a TTL on the FCM message — if undelivered within TTL, discard rather than showing stale content.

Q: How would you scale this to 100 million notifications per day?

Use Kafka as the message bus — one topic per channel (push-notifications, email-notifications, sms-notifications). Producers (notification service) publish to Kafka. Consumers (channel handlers) are horizontally scaled worker pools. Each channel has different throughput: push = millions/minute (FCM handles batching), email = rate-limited by ESP (e.g., SendGrid = 100K/hour on basic plan → use multiple API keys or premium tier), SMS = expensive, rate-limited (use only for critical). Priority queues: critical notifications (ACCOUNT_SECURITY) get their own Kafka partition processed first.

Asked at: Snap Interview Guide

Asked at: DoorDash Interview Guide

Asked at: Shopify Interview Guide

Asked at: Stripe Interview Guide

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

See also: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale

Scroll to Top