Low-Level Design: Task Scheduler / Job Queue
A task scheduler executes jobs at specified times or with specified delays, tracks their status, and handles failures with retry logic. This design appears at companies like Airbnb (Chronon), Uber (Cadence), and Databricks. It tests: priority queuing, state machines, concurrency, and the Worker pattern.
Core Classes
Enums
from enum import Enum
class TaskStatus(Enum):
PENDING = "PENDING" # waiting to be picked up
RUNNING = "RUNNING" # currently executing
SUCCEEDED = "SUCCEEDED" # completed successfully
FAILED = "FAILED" # final failure (retries exhausted)
RETRYING = "RETRYING" # scheduled for retry
CANCELLED = "CANCELLED"
class TaskPriority(Enum):
LOW = 1
NORMAL = 5
HIGH = 10
URGENT = 20
Task
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Any
import uuid
@dataclass
class Task:
task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
func: Callable = None # the function to execute
args: tuple = field(default_factory=tuple)
kwargs: dict = field(default_factory=dict)
priority: TaskPriority = TaskPriority.NORMAL
status: TaskStatus = TaskStatus.PENDING
max_retries: int = 3
retry_count: int = 0
retry_delay_seconds: int = 60 # base delay, doubles each retry
scheduled_at: datetime = field(default_factory=datetime.now)
started_at: datetime = None
completed_at: datetime = None
result: Any = None
error: str = None
def __lt__(self, other: 'Task') -> bool:
"""For priority queue: higher priority = earlier execution."""
if self.priority.value != other.priority.value:
return self.priority.value > other.priority.value # higher priority first
return self.scheduled_at bool:
return self.retry_count datetime:
"""Exponential backoff: 60s, 120s, 240s, ..."""
delay = self.retry_delay_seconds * (2 ** self.retry_count)
return datetime.now() + timedelta(seconds=delay)
TaskScheduler
import heapq
import threading
from typing import Optional
class TaskScheduler:
def __init__(self, num_workers: int = 4):
self._queue: list[Task] = [] # min-heap (uses Task.__lt__)
self._lock = threading.Lock()
self._not_empty = threading.Condition(self._lock)
self._tasks: dict[str, Task] = {} # task_id -> Task (registry)
self._running = False
self._workers: list[threading.Thread] = []
self._num_workers = num_workers
def submit(self, task: Task) -> str:
with self._not_empty:
self._tasks[task.task_id] = task
heapq.heappush(self._queue, task)
self._not_empty.notify() # wake a waiting worker
print(f"Submitted task {task.task_id[:8]} [{task.name}] priority={task.priority.name}")
return task.task_id
def schedule_at(self, task: Task, run_at: datetime) -> str:
task.scheduled_at = run_at
return self.submit(task)
def cancel(self, task_id: str) -> bool:
with self._lock:
task = self._tasks.get(task_id)
if not task or task.status != TaskStatus.PENDING:
return False
task.status = TaskStatus.CANCELLED
return True
def get_status(self, task_id: str) -> Optional[TaskStatus]:
task = self._tasks.get(task_id)
return task.status if task else None
def _get_next_task(self) -> Optional[Task]:
"""Pop the highest-priority task that is due."""
now = datetime.now()
with self._not_empty:
while self._running:
# Find the first due task (may need to wait for scheduled_at)
if self._queue and self._queue[0].scheduled_at None:
while self._running:
task = self._get_next_task()
if task is None:
break
self._execute(task)
def _execute(self, task: Task) -> None:
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()
print(f"Executing: {task.task_id[:8]} [{task.name}]")
try:
task.result = task.func(*task.args, **task.kwargs)
task.status = TaskStatus.SUCCEEDED
task.completed_at = datetime.now()
print(f"Succeeded: {task.task_id[:8]} [{task.name}]")
except Exception as e:
task.error = str(e)
print(f"Failed: {task.task_id[:8]} [{task.name}] error={e}")
if task.can_retry():
task.retry_count += 1
task.status = TaskStatus.RETRYING
task.scheduled_at = task.next_retry_time()
# Re-enqueue for retry
with self._not_empty:
heapq.heappush(self._queue, task)
self._not_empty.notify()
print(f"Retry {task.retry_count}/{task.max_retries} scheduled for {task.name}")
else:
task.status = TaskStatus.FAILED
task.completed_at = datetime.now()
def start(self) -> None:
self._running = True
for i in range(self._num_workers):
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
self._workers.append(t)
print(f"TaskScheduler started with {self._num_workers} workers")
def stop(self, wait: bool = True) -> None:
self._running = False
with self._not_empty:
self._not_empty.notify_all() # wake all waiting workers
if wait:
for t in self._workers:
t.join(timeout=5)
print("TaskScheduler stopped")
Usage Example
import time
from datetime import datetime, timedelta
def send_email(to: str, subject: str) -> str:
time.sleep(0.1) # simulate work
return f"Email sent to {to}"
def generate_report(report_type: str) -> str:
time.sleep(0.5)
return f"Report {report_type} generated"
scheduler = TaskScheduler(num_workers=2)
scheduler.start()
# High-priority email
email_task = Task(
name="send_welcome_email",
func=send_email,
args=("user@example.com", "Welcome!"),
priority=TaskPriority.HIGH
)
tid1 = scheduler.submit(email_task)
# Scheduled report (run in 5 seconds)
report_task = Task(
name="daily_report",
func=generate_report,
args=("revenue",),
priority=TaskPriority.NORMAL,
max_retries=2
)
tid2 = scheduler.schedule_at(report_task, datetime.now() + timedelta(seconds=5))
time.sleep(2)
print(f"Email status: {scheduler.get_status(tid1)}")
time.sleep(10)
print(f"Report status: {scheduler.get_status(tid2)}")
scheduler.stop()
Interview Follow-ups
- Distributed scheduler: Replace the in-memory priority queue with a database table. Workers poll for PENDING tasks older than scheduled_at using SELECT FOR UPDATE SKIP LOCKED (PostgreSQL) — prevents two workers from claiming the same task. This scales across multiple servers.
- Dead letter queue: Tasks that exhaust retries are moved to a dead_letter_tasks table for manual inspection and reprocessing.
- Cron-style scheduling: Store a cron expression with each recurring task. A separate scheduler thread wakes up every minute, generates next-run instances for all cron tasks due in the next interval, and enqueues them.
- Heartbeat / task lease: Workers acquire a lease (DB row lock with TTL). If a worker crashes, the lease expires and another worker reclaims the task. Prevents orphaned RUNNING tasks.
🏢 Asked at: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering
🏢 Asked at: Airbnb Interview Guide 2026: Search Systems, Trust and Safety, and Full-Stack Engineering
🏢 Asked at: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture
🏢 Asked at: Atlassian Interview Guide
🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale