Low-Level Design: Task Scheduler / Job Queue (OOP Interview)

Low-Level Design: Task Scheduler / Job Queue

A task scheduler executes jobs at specified times or with specified delays, tracks their status, and handles failures with retry logic. This design appears at companies like Airbnb (Chronon), Uber (Cadence), and Databricks. It tests: priority queuing, state machines, concurrency, and the Worker pattern.

Core Classes

Enums

from enum import Enum

class TaskStatus(Enum):
    PENDING   = "PENDING"    # waiting to be picked up
    RUNNING   = "RUNNING"    # currently executing
    SUCCEEDED = "SUCCEEDED"  # completed successfully
    FAILED    = "FAILED"     # final failure (retries exhausted)
    RETRYING  = "RETRYING"   # scheduled for retry
    CANCELLED = "CANCELLED"

class TaskPriority(Enum):
    LOW    = 1
    NORMAL = 5
    HIGH   = 10
    URGENT = 20

Task

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Any
import uuid

@dataclass
class Task:
    task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    func: Callable = None              # the function to execute
    args: tuple = field(default_factory=tuple)
    kwargs: dict = field(default_factory=dict)
    priority: TaskPriority = TaskPriority.NORMAL
    status: TaskStatus = TaskStatus.PENDING
    max_retries: int = 3
    retry_count: int = 0
    retry_delay_seconds: int = 60      # base delay, doubles each retry
    scheduled_at: datetime = field(default_factory=datetime.now)
    started_at: datetime = None
    completed_at: datetime = None
    result: Any = None
    error: str = None

    def __lt__(self, other: 'Task') -> bool:
        """For priority queue: higher priority = earlier execution."""
        if self.priority.value != other.priority.value:
            return self.priority.value > other.priority.value  # higher priority first
        return self.scheduled_at  bool:
        return self.retry_count  datetime:
        """Exponential backoff: 60s, 120s, 240s, ..."""
        delay = self.retry_delay_seconds * (2 ** self.retry_count)
        return datetime.now() + timedelta(seconds=delay)

TaskScheduler

import heapq
import threading
from typing import Optional

class TaskScheduler:
    def __init__(self, num_workers: int = 4):
        self._queue: list[Task] = []       # min-heap (uses Task.__lt__)
        self._lock = threading.Lock()
        self._not_empty = threading.Condition(self._lock)
        self._tasks: dict[str, Task] = {}  # task_id -> Task (registry)
        self._running = False
        self._workers: list[threading.Thread] = []
        self._num_workers = num_workers

    def submit(self, task: Task) -> str:
        with self._not_empty:
            self._tasks[task.task_id] = task
            heapq.heappush(self._queue, task)
            self._not_empty.notify()       # wake a waiting worker
        print(f"Submitted task {task.task_id[:8]} [{task.name}] priority={task.priority.name}")
        return task.task_id

    def schedule_at(self, task: Task, run_at: datetime) -> str:
        task.scheduled_at = run_at
        return self.submit(task)

    def cancel(self, task_id: str) -> bool:
        with self._lock:
            task = self._tasks.get(task_id)
            if not task or task.status != TaskStatus.PENDING:
                return False
            task.status = TaskStatus.CANCELLED
            return True

    def get_status(self, task_id: str) -> Optional[TaskStatus]:
        task = self._tasks.get(task_id)
        return task.status if task else None

    def _get_next_task(self) -> Optional[Task]:
        """Pop the highest-priority task that is due."""
        now = datetime.now()
        with self._not_empty:
            while self._running:
                # Find the first due task (may need to wait for scheduled_at)
                if self._queue and self._queue[0].scheduled_at  None:
        while self._running:
            task = self._get_next_task()
            if task is None:
                break
            self._execute(task)

    def _execute(self, task: Task) -> None:
        task.status = TaskStatus.RUNNING
        task.started_at = datetime.now()
        print(f"Executing: {task.task_id[:8]} [{task.name}]")
        try:
            task.result = task.func(*task.args, **task.kwargs)
            task.status = TaskStatus.SUCCEEDED
            task.completed_at = datetime.now()
            print(f"Succeeded: {task.task_id[:8]} [{task.name}]")
        except Exception as e:
            task.error = str(e)
            print(f"Failed: {task.task_id[:8]} [{task.name}] error={e}")
            if task.can_retry():
                task.retry_count += 1
                task.status = TaskStatus.RETRYING
                task.scheduled_at = task.next_retry_time()
                # Re-enqueue for retry
                with self._not_empty:
                    heapq.heappush(self._queue, task)
                    self._not_empty.notify()
                print(f"Retry {task.retry_count}/{task.max_retries} scheduled for {task.name}")
            else:
                task.status = TaskStatus.FAILED
                task.completed_at = datetime.now()

    def start(self) -> None:
        self._running = True
        for i in range(self._num_workers):
            t = threading.Thread(target=self._worker_loop, daemon=True)
            t.start()
            self._workers.append(t)
        print(f"TaskScheduler started with {self._num_workers} workers")

    def stop(self, wait: bool = True) -> None:
        self._running = False
        with self._not_empty:
            self._not_empty.notify_all()   # wake all waiting workers
        if wait:
            for t in self._workers:
                t.join(timeout=5)
        print("TaskScheduler stopped")

Usage Example

import time
from datetime import datetime, timedelta

def send_email(to: str, subject: str) -> str:
    time.sleep(0.1)   # simulate work
    return f"Email sent to {to}"

def generate_report(report_type: str) -> str:
    time.sleep(0.5)
    return f"Report {report_type} generated"

scheduler = TaskScheduler(num_workers=2)
scheduler.start()

# High-priority email
email_task = Task(
    name="send_welcome_email",
    func=send_email,
    args=("user@example.com", "Welcome!"),
    priority=TaskPriority.HIGH
)
tid1 = scheduler.submit(email_task)

# Scheduled report (run in 5 seconds)
report_task = Task(
    name="daily_report",
    func=generate_report,
    args=("revenue",),
    priority=TaskPriority.NORMAL,
    max_retries=2
)
tid2 = scheduler.schedule_at(report_task, datetime.now() + timedelta(seconds=5))

time.sleep(2)
print(f"Email status: {scheduler.get_status(tid1)}")
time.sleep(10)
print(f"Report status: {scheduler.get_status(tid2)}")
scheduler.stop()

Interview Follow-ups

  • Distributed scheduler: Replace the in-memory priority queue with a database table. Workers poll for PENDING tasks older than scheduled_at using SELECT FOR UPDATE SKIP LOCKED (PostgreSQL) — prevents two workers from claiming the same task. This scales across multiple servers.
  • Dead letter queue: Tasks that exhaust retries are moved to a dead_letter_tasks table for manual inspection and reprocessing.
  • Cron-style scheduling: Store a cron expression with each recurring task. A separate scheduler thread wakes up every minute, generates next-run instances for all cron tasks due in the next interval, and enqueues them.
  • Heartbeat / task lease: Workers acquire a lease (DB row lock with TTL). If a worker crashes, the lease expires and another worker reclaims the task. Prevents orphaned RUNNING tasks.

🏢 Asked at: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering

🏢 Asked at: Airbnb Interview Guide 2026: Search Systems, Trust and Safety, and Full-Stack Engineering

🏢 Asked at: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

🏢 Asked at: Atlassian Interview Guide

🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale

Scroll to Top