What Is a Distributed Task Queue?
A distributed task queue decouples work producers from work executors. Producers enqueue jobs; workers poll the queue and execute them asynchronously. Examples: sending emails after user signup, resizing images, processing payments, running ML inference. Systems like Celery (Python), Sidekiq (Ruby), Bull (Node.js), and cloud-native options (AWS SQS + Lambda, Google Cloud Tasks) implement this pattern.
Requirements
Functional: enqueue tasks with a payload, process tasks by worker pool, support priority queues, delayed/scheduled tasks, retry with exponential backoff on failure, dead letter queue for permanently failed tasks, task status tracking.
Non-functional: at-least-once delivery (no task loss), exactly-once processing for critical tasks (idempotent workers), horizontal worker scaling, monitoring (queue depth, worker utilization, error rate).
Core Design
Task Model
@dataclass
class Task:
task_id: str
queue: str # 'email', 'image_resize', 'payment'
payload: dict # serialized job arguments
priority: int # higher = more urgent (0-9)
status: str # PENDING | RUNNING | DONE | FAILED | DEAD
created_at: datetime
scheduled_at: datetime # for delayed tasks
run_at: Optional[datetime]
completed_at: Optional[datetime]
retry_count: int = 0
max_retries: int = 3
worker_id: Optional[str] = None
error: Optional[str] = None
Queue Backend Options
| Backend | Strengths | Weaknesses |
|---|---|---|
| Redis (LPUSH/BRPOP) | Sub-millisecond latency, sorted sets for priority/delay | In-memory, durability risk |
| Redis Streams | Persistent, consumer groups, at-least-once semantics | More complex |
| PostgreSQL | ACID, existing infra, advisory locks | Not built for queuing, high polling load |
| Kafka | High throughput, replay, partitioned parallelism | Overkill for low-volume, no native delay |
| SQS | Managed, infinite scale, DLQ built-in | At-least-once only, no ordering |
Priority Queue with Redis Sorted Sets
import redis
import json, time, uuid
r = redis.Redis()
QUEUE = 'tasks:priority'
DELAYED = 'tasks:delayed'
RUNNING = 'tasks:running'
def enqueue(payload: dict, priority: int = 5, delay_seconds: int = 0):
task = {
'task_id': str(uuid.uuid4()),
'payload': payload,
'priority': priority,
'retry_count': 0,
'created_at': time.time(),
}
score = time.time() + delay_seconds
if delay_seconds > 0:
r.zadd(DELAYED, {json.dumps(task): score})
else:
# Score: lower = higher priority; use negative priority + timestamp
score = -priority * 1e12 + time.time()
r.zadd(QUEUE, {json.dumps(task): score})
def promote_delayed():
"""Move due delayed tasks to the priority queue. Run every second."""
now = time.time()
due = r.zrangebyscore(DELAYED, '-inf', now)
for raw in due:
task = json.loads(raw)
score = -task['priority'] * 1e12 + time.time()
pipe = r.pipeline()
pipe.zrem(DELAYED, raw)
pipe.zadd(QUEUE, {raw: score})
pipe.execute()
def dequeue(worker_id: str) -> Optional[dict]:
"""Atomic pop from priority queue with visibility timeout."""
pipe = r.pipeline()
items = r.zrange(QUEUE, 0, 0) # peek at highest priority
if not items:
return None
raw = items[0]
task = json.loads(raw)
task['worker_id'] = worker_id
task['started_at'] = time.time()
visibility_deadline = time.time() + 300 # 5 min visibility timeout
pipe.zrem(QUEUE, raw)
pipe.zadd(RUNNING, {json.dumps(task): visibility_deadline})
pipe.execute()
return task
Worker Loop with Retry
BACKOFF_BASE = 2 # seconds
MAX_RETRIES = 3
def run_worker(worker_id: str):
while True:
task = dequeue(worker_id)
if not task:
time.sleep(1)
continue
try:
handler = HANDLERS[task['payload']['type']]
handler(task['payload'])
acknowledge(task) # remove from RUNNING
except Exception as e:
handle_failure(task, str(e))
def handle_failure(task: dict, error: str):
task['retry_count'] += 1
task['error'] = error
if task['retry_count'] <= MAX_RETRIES:
delay = BACKOFF_BASE ** task['retry_count'] # 2, 4, 8 seconds
enqueue(task['payload'], task['priority'], delay_seconds=delay)
else:
# Move to dead letter queue
r.lpush('tasks:dead', json.dumps(task))
remove_from_running(task)
Visibility Timeout and At-Least-Once Delivery
When a worker dequeues a task, it moves to a RUNNING set with a deadline. If the worker crashes before acknowledging, a watchdog job re-enqueues the task after the deadline expires. This ensures no task is silently dropped at the cost of potential duplicate processing.
def recover_stuck_tasks():
"""Watchdog: re-enqueue tasks whose workers timed out."""
now = time.time()
stuck = r.zrangebyscore(RUNNING, '-inf', now)
for raw in stuck:
task = json.loads(raw)
r.zrem(RUNNING, raw)
enqueue(task['payload'], task['priority'])
Exactly-Once Processing
At-least-once delivery means a task may be processed twice (worker crash after processing but before ACK). For exactly-once, make workers idempotent: use the task_id as a deduplication key. Before processing: check if task_id exists in a “processed” Redis set. After processing: add task_id to the set with a TTL equal to the visibility timeout. If the key already exists, skip processing and ACK.
Cron / Scheduled Tasks
For recurring jobs (daily reports, hourly cleanup), store cron schedules in a separate table. A scheduler process runs every minute, queries cron jobs due in the current window, and enqueues them. Use a database lock or Redis SET NX to prevent multiple scheduler instances from enqueuing the same cron job twice.
Monitoring
- Queue depth: ZCARD on each queue. Alert if depth exceeds threshold (backlog building).
- Worker utilization: active workers / total workers. Alert if below 20% (overprovisioned) or above 90% (underprovisioned).
- Error rate: DLQ growth rate. Alert if tasks are dying faster than being processed.
- End-to-end latency: time from enqueue to completion. Alert if P99 exceeds SLA.
Interview Questions
Q: How do you scale a task queue to handle 100K tasks/second?
Partition by queue name (email, image_resize) — each queue is an independent Redis sorted set. Scale workers horizontally — each worker pulls from its assigned queues. Use Redis Cluster to distribute queue data across multiple nodes (partition by queue name hash). For burst capacity, use auto-scaling worker pools (e.g., AWS ECS with target tracking on queue depth). SQS handles this natively at any scale.
Q: How do you prevent a slow task from blocking the queue?
Use separate queues for fast ( 10 second) tasks. Assign more workers to fast queues. Each worker dequeues one task at a time per goroutine/thread; slow tasks don’t block other workers. Add a task-level timeout: if a task exceeds max_runtime, the watchdog marks it as failed and re-enqueues or sends to DLQ.
Asked at: Uber Interview Guide
Asked at: Netflix Interview Guide
Asked at: Databricks Interview Guide
Asked at: Stripe Interview Guide
Asked at: Atlassian Interview Guide