Low-Level Design: Logging Framework (OOP Interview)

Low-Level Design: Logging Framework

A logging framework is a fundamental software engineering tool — and a frequently-asked OOP design question at companies like Uber, Google, and Atlassian. It tests: the Chain of Responsibility pattern, Strategy pattern for formatters, log level filtering, and thread safety. Designing a clean, extensible logger demonstrates solid OOP principles.

Core Classes

Log Level

from enum import IntEnum

class LogLevel(IntEnum):
    DEBUG   = 10
    INFO    = 20
    WARNING = 30
    ERROR   = 40
    CRITICAL = 50

Using IntEnum allows comparison: LogLevel.ERROR > LogLevel.INFO is True. A handler set to WARNING level will drop DEBUG and INFO messages.

Log Record

from dataclasses import dataclass, field
from datetime import datetime
import traceback

@dataclass
class LogRecord:
    level: LogLevel
    message: str
    logger_name: str
    timestamp: datetime = field(default_factory=datetime.now)
    exc_info: str = None   # formatted exception traceback

    @classmethod
    def from_exception(cls, level: LogLevel, message: str, logger_name: str) -> 'LogRecord':
        return cls(
            level=level,
            message=message,
            logger_name=logger_name,
            exc_info=traceback.format_exc()
        )

Formatter (Strategy Pattern)

from abc import ABC, abstractmethod

class Formatter(ABC):
    @abstractmethod
    def format(self, record: LogRecord) -> str:
        pass

class SimpleFormatter(Formatter):
    def format(self, record: LogRecord) -> str:
        ts = record.timestamp.strftime('%Y-%m-%d %H:%M:%S')
        return f"[{ts}] [{record.level.name}] [{record.logger_name}] {record.message}"

class JSONFormatter(Formatter):
    def format(self, record: LogRecord) -> str:
        import json
        data = {
            "timestamp": record.timestamp.isoformat(),
            "level": record.level.name,
            "logger": record.logger_name,
            "message": record.message,
        }
        if record.exc_info:
            data["exception"] = record.exc_info
        return json.dumps(data)

class ColorFormatter(Formatter):
    COLORS = {
        LogLevel.DEBUG:    "[36m",   # Cyan
        LogLevel.INFO:     "[32m",   # Green
        LogLevel.WARNING:  "[33m",   # Yellow
        LogLevel.ERROR:    "[31m",   # Red
        LogLevel.CRITICAL: "[35m",   # Magenta
    }
    RESET = "[0m"

    def format(self, record: LogRecord) -> str:
        color = self.COLORS.get(record.level, "")
        ts = record.timestamp.strftime('%H:%M:%S')
        return f"{color}[{ts}] {record.level.name}: {record.message}{self.RESET}"

Handler (Chain of Responsibility)

import sys

class Handler(ABC):
    def __init__(
        self,
        level: LogLevel = LogLevel.DEBUG,
        formatter: Formatter = None,
        next_handler: 'Handler' = None
    ):
        self.level = level
        self.formatter = formatter or SimpleFormatter()
        self.next_handler = next_handler   # chain of responsibility

    def handle(self, record: LogRecord) -> None:
        """Filter by level, format, emit. Then pass to next handler."""
        if record.level >= self.level:
            self.emit(self.formatter.format(record))
        if self.next_handler:
            self.next_handler.handle(record)

    @abstractmethod
    def emit(self, message: str) -> None:
        pass

class ConsoleHandler(Handler):
    def emit(self, message: str) -> None:
        print(message, file=sys.stdout, flush=True)

class FileHandler(Handler):
    def __init__(self, filepath: str, **kwargs):
        super().__init__(**kwargs)
        self._file = open(filepath, 'a', encoding='utf-8')

    def emit(self, message: str) -> None:
        self._file.write(message + '
')
        self._file.flush()

    def close(self) -> None:
        self._file.close()

class RateLimitedHandler(Handler):
    """Drops repeated identical messages within a time window."""
    def __init__(self, max_per_minute: int = 100, **kwargs):
        super().__init__(**kwargs)
        from collections import deque
        self._timestamps = deque()
        self._max = max_per_minute

    def emit(self, message: str) -> None:
        from datetime import datetime
        now = datetime.now()
        # Remove timestamps older than 1 minute
        while self._timestamps and (now - self._timestamps[0]).total_seconds() > 60:
            self._timestamps.popleft()
        if len(self._timestamps) < self._max:
            self._timestamps.append(now)
            print(message)
        # else: drop the message

Logger

import threading

class Logger:
    def __init__(self, name: str, level: LogLevel = LogLevel.DEBUG):
        self.name = name
        self.level = level
        self._handlers: list[Handler] = []
        self._lock = threading.Lock()

    def add_handler(self, handler: Handler) -> None:
        self._handlers.append(handler)

    def _log(self, level: LogLevel, message: str, exc: bool = False) -> None:
        if level  None: self._log(LogLevel.DEBUG, msg)
    def info(self, msg: str)     -> None: self._log(LogLevel.INFO, msg)
    def warning(self, msg: str)  -> None: self._log(LogLevel.WARNING, msg)
    def error(self, msg: str)    -> None: self._log(LogLevel.ERROR, msg)
    def critical(self, msg: str) -> None: self._log(LogLevel.CRITICAL, msg)
    def exception(self, msg: str) -> None: self._log(LogLevel.ERROR, msg, exc=True)

class LoggerFactory:
    """Singleton registry of named loggers."""
    _loggers: dict[str, Logger] = {}
    _lock = threading.Lock()

    @classmethod
    def get_logger(cls, name: str, level: LogLevel = LogLevel.DEBUG) -> Logger:
        with cls._lock:
            if name not in cls._loggers:
                cls._loggers[name] = Logger(name, level)
            return cls._loggers[name]

Usage Example

# Setup
logger = LoggerFactory.get_logger("payment_service", LogLevel.INFO)

console_handler = ConsoleHandler(level=LogLevel.DEBUG, formatter=ColorFormatter())
file_handler = FileHandler(
    "/var/log/app.log",
    level=LogLevel.WARNING,
    formatter=JSONFormatter()
)

logger.add_handler(console_handler)
logger.add_handler(file_handler)

# Usage
logger.info("Payment service started")
logger.warning("High latency detected: 450ms")
try:
    raise ValueError("Invalid card number")
except Exception:
    logger.exception("Payment processing failed")

Design Patterns Used

  • Strategy: Formatters are interchangeable — SimpleFormatter, JSONFormatter, ColorFormatter are swapped without modifying Handler.
  • Chain of Responsibility: Handlers form a chain via next_handler. A log record flows down the chain, each handler decides independently whether to emit.
  • Factory / Registry: LoggerFactory maintains a registry of named loggers — same name always returns the same instance (similar to Python’s logging.getLogger()).
  • Template Method: Handler.handle() defines the algorithm (filter → format → emit → chain); subclasses override only emit().

Interview Follow-ups

  • Async logging: Replace synchronous emit() calls with queue-based dispatch. Logger.log() enqueues LogRecord; a background thread dequeues and calls handlers. Prevents logging from blocking the main thread.
  • Log rotation: RotatingFileHandler tracks file size; when it exceeds maxBytes, rename current log to app.log.1 and open a new app.log. Keep backupCount rotated files.
  • Structured logging: Log key-value pairs instead of strings — easier to query in log aggregation systems (ELK, Splunk). JSONFormatter already supports this pattern.

{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”What design patterns are used in a logging framework?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A logging framework typically uses three main patterns: (1) Strategy pattern for Formatters — SimpleFormatter, JSONFormatter, and ColorFormatter all implement the same Formatter interface with a format(record) method. Handlers are configured with a formatter at construction time; swapping formatters changes output format without modifying Handler code. (2) Chain of Responsibility for Handlers — each Handler has a next_handler reference. handle() filters by level, emits if above threshold, then passes to next_handler. This allows a single log record to flow through multiple handlers (console + file + alerting) without the Logger knowing how many handlers exist. (3) Factory/Registry pattern for Logger instances — LoggerFactory maintains a dict of name → Logger. Multiple calls to get_logger(“payment_service”) return the same instance, ensuring consistent configuration across modules.”}},{“@type”:”Question”,”name”:”How do you implement thread-safe logging?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Use a threading.Lock in the Logger._log() method — acquire before iterating handlers, release after. This prevents two threads from interleaving their log records, which would corrupt multi-line log entries (e.g., a stack trace interleaved with another log line). Use threading.Lock (not RLock) because Logger._log() doesn’t re-enter the lock recursively. For high-throughput systems: synchronous locking can become a bottleneck. Solution: async logging queue. Logger._log() appends the LogRecord to a thread-safe queue.Queue instead of calling handlers directly. A single background thread dequeues records and calls handlers — no lock contention in the critical path. The background thread is the sole consumer, so handlers don’t need to be thread-safe themselves. Trade-off: async logging may lose the last few log records if the process crashes before the queue is drained — acceptable for most applications.”}},{“@type”:”Question”,”name”:”How do log levels work in a logging framework?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Log levels are a hierarchy (DEBUG < INFO < WARNING < ERROR = handler.level returns True only if the record’s level is at or above the handler’s threshold. Python’s logging module works exactly this way: getLogger().setLevel() is the logger filter, and handler.setLevel() is the handler filter. Always set the logger level to the minimum of all handler levels to avoid unnecessary record creation.”}}]}

🏢 Asked at: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering

🏢 Asked at: Atlassian Interview Guide

🏢 Asked at: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

🏢 Asked at: Shopify Interview Guide

🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale

Scroll to Top