System Design: Distributed Task Queue and Job Scheduler (Celery, SQS, Redis)

What Is a Distributed Task Queue?

A distributed task queue decouples work producers from work executors. Producers enqueue jobs; workers poll the queue and execute them asynchronously. Examples: sending emails after user signup, resizing images, processing payments, running ML inference. Systems like Celery (Python), Sidekiq (Ruby), Bull (Node.js), and cloud-native options (AWS SQS + Lambda, Google Cloud Tasks) implement this pattern.

Requirements

Functional: enqueue tasks with a payload, process tasks by worker pool, support priority queues, delayed/scheduled tasks, retry with exponential backoff on failure, dead letter queue for permanently failed tasks, task status tracking.

Non-functional: at-least-once delivery (no task loss), exactly-once processing for critical tasks (idempotent workers), horizontal worker scaling, monitoring (queue depth, worker utilization, error rate).

Core Design

Task Model

@dataclass
class Task:
    task_id: str
    queue: str           # 'email', 'image_resize', 'payment'
    payload: dict        # serialized job arguments
    priority: int        # higher = more urgent (0-9)
    status: str          # PENDING | RUNNING | DONE | FAILED | DEAD
    created_at: datetime
    scheduled_at: datetime    # for delayed tasks
    run_at: Optional[datetime]
    completed_at: Optional[datetime]
    retry_count: int = 0
    max_retries: int = 3
    worker_id: Optional[str] = None
    error: Optional[str] = None

Queue Backend Options

Backend Strengths Weaknesses
Redis (LPUSH/BRPOP) Sub-millisecond latency, sorted sets for priority/delay In-memory, durability risk
Redis Streams Persistent, consumer groups, at-least-once semantics More complex
PostgreSQL ACID, existing infra, advisory locks Not built for queuing, high polling load
Kafka High throughput, replay, partitioned parallelism Overkill for low-volume, no native delay
SQS Managed, infinite scale, DLQ built-in At-least-once only, no ordering

Priority Queue with Redis Sorted Sets

import redis
import json, time, uuid

r = redis.Redis()
QUEUE = 'tasks:priority'
DELAYED = 'tasks:delayed'
RUNNING = 'tasks:running'

def enqueue(payload: dict, priority: int = 5, delay_seconds: int = 0):
    task = {
        'task_id': str(uuid.uuid4()),
        'payload': payload,
        'priority': priority,
        'retry_count': 0,
        'created_at': time.time(),
    }
    score = time.time() + delay_seconds
    if delay_seconds > 0:
        r.zadd(DELAYED, {json.dumps(task): score})
    else:
        # Score: lower = higher priority; use negative priority + timestamp
        score = -priority * 1e12 + time.time()
        r.zadd(QUEUE, {json.dumps(task): score})

def promote_delayed():
    """Move due delayed tasks to the priority queue. Run every second."""
    now = time.time()
    due = r.zrangebyscore(DELAYED, '-inf', now)
    for raw in due:
        task = json.loads(raw)
        score = -task['priority'] * 1e12 + time.time()
        pipe = r.pipeline()
        pipe.zrem(DELAYED, raw)
        pipe.zadd(QUEUE, {raw: score})
        pipe.execute()

def dequeue(worker_id: str) -> Optional[dict]:
    """Atomic pop from priority queue with visibility timeout."""
    pipe = r.pipeline()
    items = r.zrange(QUEUE, 0, 0)  # peek at highest priority
    if not items:
        return None
    raw = items[0]
    task = json.loads(raw)
    task['worker_id'] = worker_id
    task['started_at'] = time.time()
    visibility_deadline = time.time() + 300  # 5 min visibility timeout
    pipe.zrem(QUEUE, raw)
    pipe.zadd(RUNNING, {json.dumps(task): visibility_deadline})
    pipe.execute()
    return task

Worker Loop with Retry

BACKOFF_BASE = 2  # seconds
MAX_RETRIES = 3

def run_worker(worker_id: str):
    while True:
        task = dequeue(worker_id)
        if not task:
            time.sleep(1)
            continue
        try:
            handler = HANDLERS[task['payload']['type']]
            handler(task['payload'])
            acknowledge(task)   # remove from RUNNING
        except Exception as e:
            handle_failure(task, str(e))

def handle_failure(task: dict, error: str):
    task['retry_count'] += 1
    task['error'] = error
    if task['retry_count'] <= MAX_RETRIES:
        delay = BACKOFF_BASE ** task['retry_count']  # 2, 4, 8 seconds
        enqueue(task['payload'], task['priority'], delay_seconds=delay)
    else:
        # Move to dead letter queue
        r.lpush('tasks:dead', json.dumps(task))
    remove_from_running(task)

Visibility Timeout and At-Least-Once Delivery

When a worker dequeues a task, it moves to a RUNNING set with a deadline. If the worker crashes before acknowledging, a watchdog job re-enqueues the task after the deadline expires. This ensures no task is silently dropped at the cost of potential duplicate processing.

def recover_stuck_tasks():
    """Watchdog: re-enqueue tasks whose workers timed out."""
    now = time.time()
    stuck = r.zrangebyscore(RUNNING, '-inf', now)
    for raw in stuck:
        task = json.loads(raw)
        r.zrem(RUNNING, raw)
        enqueue(task['payload'], task['priority'])

Exactly-Once Processing

At-least-once delivery means a task may be processed twice (worker crash after processing but before ACK). For exactly-once, make workers idempotent: use the task_id as a deduplication key. Before processing: check if task_id exists in a “processed” Redis set. After processing: add task_id to the set with a TTL equal to the visibility timeout. If the key already exists, skip processing and ACK.

Cron / Scheduled Tasks

For recurring jobs (daily reports, hourly cleanup), store cron schedules in a separate table. A scheduler process runs every minute, queries cron jobs due in the current window, and enqueues them. Use a database lock or Redis SET NX to prevent multiple scheduler instances from enqueuing the same cron job twice.

Monitoring

  • Queue depth: ZCARD on each queue. Alert if depth exceeds threshold (backlog building).
  • Worker utilization: active workers / total workers. Alert if below 20% (overprovisioned) or above 90% (underprovisioned).
  • Error rate: DLQ growth rate. Alert if tasks are dying faster than being processed.
  • End-to-end latency: time from enqueue to completion. Alert if P99 exceeds SLA.

Interview Questions

Q: How do you scale a task queue to handle 100K tasks/second?

Partition by queue name (email, image_resize) — each queue is an independent Redis sorted set. Scale workers horizontally — each worker pulls from its assigned queues. Use Redis Cluster to distribute queue data across multiple nodes (partition by queue name hash). For burst capacity, use auto-scaling worker pools (e.g., AWS ECS with target tracking on queue depth). SQS handles this natively at any scale.

Q: How do you prevent a slow task from blocking the queue?

Use separate queues for fast ( 10 second) tasks. Assign more workers to fast queues. Each worker dequeues one task at a time per goroutine/thread; slow tasks don’t block other workers. Add a task-level timeout: if a task exceeds max_runtime, the watchdog marks it as failed and re-enqueues or sends to DLQ.

Asked at: Uber Interview Guide

Asked at: Netflix Interview Guide

Asked at: Databricks Interview Guide

Asked at: Stripe Interview Guide

Asked at: Atlassian Interview Guide

Scroll to Top