Low-Level Design: Task Scheduler
The Task Scheduler is a recurring LLD interview question at companies like Google, Meta, Airbnb, and Stripe. It tests your ability to model recurring jobs, priority queues, thread pools, and failure handling in clean OOP code.
Requirements
- Schedule tasks to run once at a specific time, or repeatedly on a cron-like interval.
- Priority: higher-priority tasks preempt lower-priority ones when the thread pool is saturated.
- Cancel a scheduled task before it executes.
- Retry: failed tasks retry up to N times with exponential backoff.
- Concurrency: multiple tasks run in parallel via a fixed thread pool.
Core Design
Data Model
from __future__ import annotations
import heapq
import threading
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class Task:
func: Callable
run_at: float # Unix timestamp
priority: int = 0 # higher = more urgent
interval: float | None = None # seconds, None for one-shot
max_retries: int = 3
retry_count: int = field(default=0, init=False)
task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
status: TaskStatus = field(default=TaskStatus.PENDING, init=False)
# heapq ordering: (run_at, -priority, task_id)
def __lt__(self, other: Task) -> bool:
if self.run_at != other.run_at:
return self.run_at other.priority # higher priority first
Scheduler Core
class TaskScheduler:
def __init__(self, workers: int = 4):
self._heap: list[Task] = []
self._lock: threading.Lock = threading.Lock()
self._wakeup: threading.Event = threading.Event()
self._cancelled: set[str] = set()
self._semaphore: threading.Semaphore = threading.Semaphore(workers)
self._running = False
self._dispatcher: threading.Thread | None = None
# ── Public API ────────────────────────────────────────────────────────
def schedule_once(self, func: Callable, delay: float = 0,
priority: int = 0, max_retries: int = 3) -> str:
task = Task(func=func, run_at=time.monotonic() + delay,
priority=priority, max_retries=max_retries)
self._enqueue(task)
return task.task_id
def schedule_recurring(self, func: Callable, interval: float,
priority: int = 0) -> str:
task = Task(func=func, run_at=time.monotonic(),
priority=priority, interval=interval)
self._enqueue(task)
return task.task_id
def cancel(self, task_id: str) -> bool:
with self._lock:
self._cancelled.add(task_id)
return True # lazy deletion — removed when dequeued
def start(self) -> None:
self._running = True
self._dispatcher = threading.Thread(target=self._dispatch_loop,
daemon=True)
self._dispatcher.start()
def stop(self) -> None:
self._running = False
self._wakeup.set()
if self._dispatcher:
self._dispatcher.join()
# ── Internal ──────────────────────────────────────────────────────────
def _enqueue(self, task: Task) -> None:
with self._lock:
heapq.heappush(self._heap, task)
self._wakeup.set()
def _dispatch_loop(self) -> None:
while self._running:
with self._lock:
now = time.monotonic()
sleep = None
while self._heap:
task = self._heap[0]
if task.task_id in self._cancelled:
heapq.heappop(self._heap)
task.status = TaskStatus.CANCELLED
continue
if task.run_at None:
self._semaphore.acquire()
task.status = TaskStatus.RUNNING
try:
task.func()
task.status = TaskStatus.COMPLETED
if task.interval is not None:
task.run_at = time.monotonic() + task.interval
task.retry_count = 0
task.status = TaskStatus.PENDING
self._enqueue(task)
except Exception:
task.retry_count += 1
if task.retry_count <= task.max_retries:
backoff = 2 ** task.retry_count # 2, 4, 8 … seconds
task.run_at = time.monotonic() + backoff
task.status = TaskStatus.PENDING
self._enqueue(task)
else:
task.status = TaskStatus.FAILED
finally:
self._semaphore.release()
Usage Example
scheduler = TaskScheduler(workers=4)
scheduler.start()
# One-shot task after 5 seconds
job1 = scheduler.schedule_once(lambda: print("Hello!"), delay=5)
# Recurring every 60 seconds, high priority
job2 = scheduler.schedule_recurring(
lambda: print("Heartbeat"),
interval=60,
priority=10,
)
# Cancel before it fires
scheduler.cancel(job1)
time.sleep(120)
scheduler.stop()
Design Decisions and Trade-offs
| Decision | Choice | Alternative |
|---|---|---|
| Priority queue | heapq (binary heap) — O(log n) push/pop | Skip list — O(log n) but supports O(log n) delete |
| Cancellation | Lazy (mark + ignore at dequeue) | Eager (O(n) heap scan + rebuild) |
| Worker pool | Semaphore + daemon threads | concurrent.futures.ThreadPoolExecutor |
| Wake signal | threading.Event with timeout | Condition variable, select() on a pipe |
| Retry backoff | Exponential 2^n seconds | Exponential with jitter to avoid thundering herd |
| Recurring schedule | Fixed interval (next = now + interval) | Fixed rate (next = scheduled + interval — no drift) |
Distributed Task Scheduler Extension
For a distributed version (like Airflow, Celery, or cloud schedulers), the design changes:
- Storage: Tasks stored in a distributed queue (Redis Sorted Set by score=run_at) or database.
- Leader election: One scheduler node is the leader (via ZooKeeper/etcd lease) and dispatches tasks; others are hot standby.
- Workers: Separate worker pool processes that dequeue tasks via a message broker (Kafka, SQS, RabbitMQ).
- Idempotency: Tasks include an idempotency key. Workers use atomic check-and-set to claim a task (Redis SET NX), preventing double execution.
- Observability: Each task has a state machine (PENDING → CLAIMED → RUNNING → COMPLETED/FAILED) stored in DB for audit and retry dashboards.
Common Interview Follow-ups
How would you handle a task that runs longer than its interval?
Two strategies: (a) fixed interval — next run is scheduled interval seconds after start, so if a run takes longer than the interval, the next run starts immediately after completion. (b) fixed rate — next run is always scheduled_time + interval, so short runs accumulate lag to catch up. Most job schedulers (cron) use fixed rate. Celery uses fixed interval by default.
How do you prevent a task from running twice if the scheduler crashes mid-execution?
Use transactional outbox pattern: the task row in the DB has a status column. The worker atomically transitions PENDING → RUNNING using a conditional update (WHERE status=’PENDING’). On restart, tasks stuck in RUNNING for > timeout are reset to PENDING. Combined with idempotent task logic, this guarantees at-least-once execution with no duplicates.
How do you implement cron-style scheduling (e.g., “every Mon at 9am”)?
Parse the cron expression into a next-fire-time calculator using a library like croniter. The scheduler calls croniter(expr).get_next() at task completion to compute the next run_at. Store the expression in the task record so it survives restarts.
Asked at: Airbnb Interview Guide
Asked at: Stripe Interview Guide
Asked at: DoorDash Interview Guide
Asked at: Lyft Interview Guide
Asked at: Uber Interview Guide
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Atlassian Interview Guide