Low-Level Design: Task Scheduler (Priority Queue, Thread Pool, Retries)

Low-Level Design: Task Scheduler

The Task Scheduler is a recurring LLD interview question at companies like Google, Meta, Airbnb, and Stripe. It tests your ability to model recurring jobs, priority queues, thread pools, and failure handling in clean OOP code.

Requirements

  • Schedule tasks to run once at a specific time, or repeatedly on a cron-like interval.
  • Priority: higher-priority tasks preempt lower-priority ones when the thread pool is saturated.
  • Cancel a scheduled task before it executes.
  • Retry: failed tasks retry up to N times with exponential backoff.
  • Concurrency: multiple tasks run in parallel via a fixed thread pool.

Core Design

Data Model

from __future__ import annotations
import heapq
import threading
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable

class TaskStatus(Enum):
    PENDING   = "pending"
    RUNNING   = "running"
    COMPLETED = "completed"
    FAILED    = "failed"
    CANCELLED = "cancelled"

@dataclass
class Task:
    func:        Callable
    run_at:      float            # Unix timestamp
    priority:    int   = 0        # higher = more urgent
    interval:    float | None = None  # seconds, None for one-shot
    max_retries: int   = 3
    retry_count: int   = field(default=0, init=False)
    task_id:     str   = field(default_factory=lambda: str(uuid.uuid4()))
    status:      TaskStatus = field(default=TaskStatus.PENDING, init=False)

    # heapq ordering: (run_at, -priority, task_id)
    def __lt__(self, other: Task) -> bool:
        if self.run_at != other.run_at:
            return self.run_at  other.priority   # higher priority first

Scheduler Core

class TaskScheduler:
    def __init__(self, workers: int = 4):
        self._heap:      list[Task]       = []
        self._lock:      threading.Lock   = threading.Lock()
        self._wakeup:    threading.Event  = threading.Event()
        self._cancelled: set[str]         = set()
        self._semaphore: threading.Semaphore = threading.Semaphore(workers)
        self._running    = False
        self._dispatcher: threading.Thread | None = None

    # ── Public API ────────────────────────────────────────────────────────

    def schedule_once(self, func: Callable, delay: float = 0,
                      priority: int = 0, max_retries: int = 3) -> str:
        task = Task(func=func, run_at=time.monotonic() + delay,
                    priority=priority, max_retries=max_retries)
        self._enqueue(task)
        return task.task_id

    def schedule_recurring(self, func: Callable, interval: float,
                           priority: int = 0) -> str:
        task = Task(func=func, run_at=time.monotonic(),
                    priority=priority, interval=interval)
        self._enqueue(task)
        return task.task_id

    def cancel(self, task_id: str) -> bool:
        with self._lock:
            self._cancelled.add(task_id)
        return True   # lazy deletion — removed when dequeued

    def start(self) -> None:
        self._running   = True
        self._dispatcher = threading.Thread(target=self._dispatch_loop,
                                            daemon=True)
        self._dispatcher.start()

    def stop(self) -> None:
        self._running = False
        self._wakeup.set()
        if self._dispatcher:
            self._dispatcher.join()

    # ── Internal ──────────────────────────────────────────────────────────

    def _enqueue(self, task: Task) -> None:
        with self._lock:
            heapq.heappush(self._heap, task)
        self._wakeup.set()

    def _dispatch_loop(self) -> None:
        while self._running:
            with self._lock:
                now   = time.monotonic()
                sleep = None
                while self._heap:
                    task = self._heap[0]
                    if task.task_id in self._cancelled:
                        heapq.heappop(self._heap)
                        task.status = TaskStatus.CANCELLED
                        continue
                    if task.run_at  None:
        self._semaphore.acquire()
        task.status = TaskStatus.RUNNING
        try:
            task.func()
            task.status = TaskStatus.COMPLETED
            if task.interval is not None:
                task.run_at     = time.monotonic() + task.interval
                task.retry_count = 0
                task.status      = TaskStatus.PENDING
                self._enqueue(task)
        except Exception:
            task.retry_count += 1
            if task.retry_count <= task.max_retries:
                backoff     = 2 ** task.retry_count   # 2, 4, 8 … seconds
                task.run_at = time.monotonic() + backoff
                task.status = TaskStatus.PENDING
                self._enqueue(task)
            else:
                task.status = TaskStatus.FAILED
        finally:
            self._semaphore.release()

Usage Example

scheduler = TaskScheduler(workers=4)
scheduler.start()

# One-shot task after 5 seconds
job1 = scheduler.schedule_once(lambda: print("Hello!"), delay=5)

# Recurring every 60 seconds, high priority
job2 = scheduler.schedule_recurring(
    lambda: print("Heartbeat"),
    interval=60,
    priority=10,
)

# Cancel before it fires
scheduler.cancel(job1)

time.sleep(120)
scheduler.stop()

Design Decisions and Trade-offs

Decision Choice Alternative
Priority queue heapq (binary heap) — O(log n) push/pop Skip list — O(log n) but supports O(log n) delete
Cancellation Lazy (mark + ignore at dequeue) Eager (O(n) heap scan + rebuild)
Worker pool Semaphore + daemon threads concurrent.futures.ThreadPoolExecutor
Wake signal threading.Event with timeout Condition variable, select() on a pipe
Retry backoff Exponential 2^n seconds Exponential with jitter to avoid thundering herd
Recurring schedule Fixed interval (next = now + interval) Fixed rate (next = scheduled + interval — no drift)

Distributed Task Scheduler Extension

For a distributed version (like Airflow, Celery, or cloud schedulers), the design changes:

  • Storage: Tasks stored in a distributed queue (Redis Sorted Set by score=run_at) or database.
  • Leader election: One scheduler node is the leader (via ZooKeeper/etcd lease) and dispatches tasks; others are hot standby.
  • Workers: Separate worker pool processes that dequeue tasks via a message broker (Kafka, SQS, RabbitMQ).
  • Idempotency: Tasks include an idempotency key. Workers use atomic check-and-set to claim a task (Redis SET NX), preventing double execution.
  • Observability: Each task has a state machine (PENDING → CLAIMED → RUNNING → COMPLETED/FAILED) stored in DB for audit and retry dashboards.

Common Interview Follow-ups

How would you handle a task that runs longer than its interval?

Two strategies: (a) fixed interval — next run is scheduled interval seconds after start, so if a run takes longer than the interval, the next run starts immediately after completion. (b) fixed rate — next run is always scheduled_time + interval, so short runs accumulate lag to catch up. Most job schedulers (cron) use fixed rate. Celery uses fixed interval by default.

How do you prevent a task from running twice if the scheduler crashes mid-execution?

Use transactional outbox pattern: the task row in the DB has a status column. The worker atomically transitions PENDING → RUNNING using a conditional update (WHERE status=’PENDING’). On restart, tasks stuck in RUNNING for > timeout are reset to PENDING. Combined with idempotent task logic, this guarantees at-least-once execution with no duplicates.

How do you implement cron-style scheduling (e.g., “every Mon at 9am”)?

Parse the cron expression into a next-fire-time calculator using a library like croniter. The scheduler calls croniter(expr).get_next() at task completion to compute the next run_at. Store the expression in the task record so it survives restarts.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What data structure should a task scheduler use internally?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A min-heap (priority queue) ordered by (run_at, -priority). This gives O(log n) insertion and O(log n) extraction of the next-due task. Python’s heapq is a binary min-heap. For distributed schedulers, a Redis Sorted Set with score=run_at achieves the same ordering with atomic cross-process access.”
}
},
{
“@type”: “Question”,
“name”: “How does a task scheduler implement cancellation efficiently?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Lazy deletion: mark the task ID in a cancelled set. When dequeuing tasks, skip any whose ID is in the cancelled set. This is O(1) for cancellation vs O(n) for eager heap removal + rebuild. The tradeoff: cancelled tasks still occupy heap memory until dequeued, which is acceptable unless cancellation is extremely frequent.”
}
},
{
“@type”: “Question”,
“name”: “How do you implement retry with exponential backoff in a task scheduler?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “In the exception handler: increment retry_count. If retry_count <= max_retries, set run_at = now + 2^retry_count seconds and re-enqueue the task. This doubles the wait time: 2s, 4s, 8s, 16s. Add jitter (random fraction of backoff) in distributed systems to prevent retry storms: backoff = 2^retry_count * (0.5 + random() * 0.5)."
}
},
{
"@type": "Question",
"name": "How do you prevent a task from running twice in a distributed scheduler?",
"acceptedAnswer": {
"@type": "Answer",
"text": "Use an atomic compare-and-swap to transition the task from PENDING to RUNNING: UPDATE tasks SET status='RUNNING', worker_id=X WHERE id=Y AND status='PENDING'. Only one worker wins. Add a heartbeat mechanism: the running worker updates claimed_at every 30s. A watchdog resets tasks stuck in RUNNING with stale claimed_at back to PENDING after a timeout."
}
},
{
"@type": "Question",
"name": "What is the difference between fixed-interval and fixed-rate scheduling?",
"acceptedAnswer": {
"@type": "Answer",
"text": "Fixed-interval: next run = completion_time + interval. If a task takes 45s in a 60s interval, next run is 60s after it finishes u2014 no catch-up. Fixed-rate: next run = scheduled_time + interval. If a task takes 45s in a 60s interval, next run is 15s after it finishes u2014 catches up to the intended cadence. Cron uses fixed-rate; most retry schedulers use fixed-interval to avoid overload cascades."
}
}
]
}

Asked at: Airbnb Interview Guide

Asked at: Stripe Interview Guide

Asked at: DoorDash Interview Guide

Asked at: Lyft Interview Guide

Asked at: Uber Interview Guide

Scroll to Top