What Is a Distributed Task Queue?
A distributed task queue decouples work producers from work executors. Producers enqueue jobs; workers poll the queue and execute them asynchronously. Examples: sending emails after user signup, resizing images, processing payments, running ML inference. Systems like Celery (Python), Sidekiq (Ruby), Bull (Node.js), and cloud-native options (AWS SQS + Lambda, Google Cloud Tasks) implement this pattern.
Requirements
Functional: enqueue tasks with a payload, process tasks by worker pool, support priority queues, delayed/scheduled tasks, retry with exponential backoff on failure, dead letter queue for permanently failed tasks, task status tracking.
Non-functional: at-least-once delivery (no task loss), exactly-once processing for critical tasks (idempotent workers), horizontal worker scaling, monitoring (queue depth, worker utilization, error rate).
Core Design
Task Model
@dataclass
class Task:
task_id: str
queue: str # 'email', 'image_resize', 'payment'
payload: dict # serialized job arguments
priority: int # higher = more urgent (0-9)
status: str # PENDING | RUNNING | DONE | FAILED | DEAD
created_at: datetime
scheduled_at: datetime # for delayed tasks
run_at: Optional[datetime]
completed_at: Optional[datetime]
retry_count: int = 0
max_retries: int = 3
worker_id: Optional[str] = None
error: Optional[str] = None
Queue Backend Options
| Backend | Strengths | Weaknesses |
|---|---|---|
| Redis (LPUSH/BRPOP) | Sub-millisecond latency, sorted sets for priority/delay | In-memory, durability risk |
| Redis Streams | Persistent, consumer groups, at-least-once semantics | More complex |
| PostgreSQL | ACID, existing infra, advisory locks | Not built for queuing, high polling load |
| Kafka | High throughput, replay, partitioned parallelism | Overkill for low-volume, no native delay |
| SQS | Managed, infinite scale, DLQ built-in | At-least-once only, no ordering |
Priority Queue with Redis Sorted Sets
import redis
import json, time, uuid
r = redis.Redis()
QUEUE = 'tasks:priority'
DELAYED = 'tasks:delayed'
RUNNING = 'tasks:running'
def enqueue(payload: dict, priority: int = 5, delay_seconds: int = 0):
task = {
'task_id': str(uuid.uuid4()),
'payload': payload,
'priority': priority,
'retry_count': 0,
'created_at': time.time(),
}
score = time.time() + delay_seconds
if delay_seconds > 0:
r.zadd(DELAYED, {json.dumps(task): score})
else:
# Score: lower = higher priority; use negative priority + timestamp
score = -priority * 1e12 + time.time()
r.zadd(QUEUE, {json.dumps(task): score})
def promote_delayed():
"""Move due delayed tasks to the priority queue. Run every second."""
now = time.time()
due = r.zrangebyscore(DELAYED, '-inf', now)
for raw in due:
task = json.loads(raw)
score = -task['priority'] * 1e12 + time.time()
pipe = r.pipeline()
pipe.zrem(DELAYED, raw)
pipe.zadd(QUEUE, {raw: score})
pipe.execute()
def dequeue(worker_id: str) -> Optional[dict]:
"""Atomic pop from priority queue with visibility timeout."""
pipe = r.pipeline()
items = r.zrange(QUEUE, 0, 0) # peek at highest priority
if not items:
return None
raw = items[0]
task = json.loads(raw)
task['worker_id'] = worker_id
task['started_at'] = time.time()
visibility_deadline = time.time() + 300 # 5 min visibility timeout
pipe.zrem(QUEUE, raw)
pipe.zadd(RUNNING, {json.dumps(task): visibility_deadline})
pipe.execute()
return task
Worker Loop with Retry
BACKOFF_BASE = 2 # seconds
MAX_RETRIES = 3
def run_worker(worker_id: str):
while True:
task = dequeue(worker_id)
if not task:
time.sleep(1)
continue
try:
handler = HANDLERS[task['payload']['type']]
handler(task['payload'])
acknowledge(task) # remove from RUNNING
except Exception as e:
handle_failure(task, str(e))
def handle_failure(task: dict, error: str):
task['retry_count'] += 1
task['error'] = error
if task['retry_count'] <= MAX_RETRIES:
delay = BACKOFF_BASE ** task['retry_count'] # 2, 4, 8 seconds
enqueue(task['payload'], task['priority'], delay_seconds=delay)
else:
# Move to dead letter queue
r.lpush('tasks:dead', json.dumps(task))
remove_from_running(task)
Visibility Timeout and At-Least-Once Delivery
When a worker dequeues a task, it moves to a RUNNING set with a deadline. If the worker crashes before acknowledging, a watchdog job re-enqueues the task after the deadline expires. This ensures no task is silently dropped at the cost of potential duplicate processing.
def recover_stuck_tasks():
"""Watchdog: re-enqueue tasks whose workers timed out."""
now = time.time()
stuck = r.zrangebyscore(RUNNING, '-inf', now)
for raw in stuck:
task = json.loads(raw)
r.zrem(RUNNING, raw)
enqueue(task['payload'], task['priority'])
Exactly-Once Processing
At-least-once delivery means a task may be processed twice (worker crash after processing but before ACK). For exactly-once, make workers idempotent: use the task_id as a deduplication key. Before processing: check if task_id exists in a “processed” Redis set. After processing: add task_id to the set with a TTL equal to the visibility timeout. If the key already exists, skip processing and ACK.
Cron / Scheduled Tasks
For recurring jobs (daily reports, hourly cleanup), store cron schedules in a separate table. A scheduler process runs every minute, queries cron jobs due in the current window, and enqueues them. Use a database lock or Redis SET NX to prevent multiple scheduler instances from enqueuing the same cron job twice.
Monitoring
- Queue depth: ZCARD on each queue. Alert if depth exceeds threshold (backlog building).
- Worker utilization: active workers / total workers. Alert if below 20% (overprovisioned) or above 90% (underprovisioned).
- Error rate: DLQ growth rate. Alert if tasks are dying faster than being processed.
- End-to-end latency: time from enqueue to completion. Alert if P99 exceeds SLA.
Interview Questions
Q: How do you scale a task queue to handle 100K tasks/second?
Partition by queue name (email, image_resize) — each queue is an independent Redis sorted set. Scale workers horizontally — each worker pulls from its assigned queues. Use Redis Cluster to distribute queue data across multiple nodes (partition by queue name hash). For burst capacity, use auto-scaling worker pools (e.g., AWS ECS with target tracking on queue depth). SQS handles this natively at any scale.
Q: How do you prevent a slow task from blocking the queue?
Use separate queues for fast ( 10 second) tasks. Assign more workers to fast queues. Each worker dequeues one task at a time per goroutine/thread; slow tasks don’t block other workers. Add a task-level timeout: if a task exceeds max_runtime, the watchdog marks it as failed and re-enqueues or sends to DLQ.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What is the difference between a task queue and a message queue?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A message queue (Kafka, RabbitMQ, SQS) is a generic pub/sub or point-to-point system for transmitting data between services u2014 the producer sends a message and doesn’t care what happens next. A task queue (Celery, Sidekiq, Bull) is specifically designed for executing units of work (tasks/jobs): it adds worker management, retry logic, scheduling, progress tracking, and result storage on top of a message queue backend. Task queues typically use a message queue (Redis, RabbitMQ, SQS) as their broker. Think of a task queue as a message queue with built-in worker orchestration.”
}
},
{
“@type”: “Question”,
“name”: “How does a visibility timeout prevent task loss in SQS or Redis-based queues?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “When a worker dequeues a task, the task is not deleted u2014 it becomes invisible to other workers for a visibility timeout period (e.g., 5 minutes). If the worker processes and acknowledges the task within the timeout, the task is deleted. If the worker crashes or times out without acknowledging, the task becomes visible again and another worker picks it up. This ensures at-least-once delivery u2014 tasks are never permanently lost due to worker failure. The trade-off: if a task takes longer than the visibility timeout, it may be processed twice.”
}
},
{
“@type”: “Question”,
“name”: “How do you implement retry with exponential backoff in a task queue?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “When a task fails, instead of immediately re-enqueuing it, schedule it with an increasing delay: delay = base^retry_count seconds (e.g., 2^1=2s, 2^2=4s, 2^3=8s). Store the retry_count with the task. Use a delayed queue (Redis sorted set with score = execute_at timestamp) to hold tasks until their retry time. A scheduler job periodically promotes due tasks to the active queue. After max_retries (e.g., 3-5), move the task to a dead letter queue (DLQ) for manual inspection. Add jitter (random offset) to avoid synchronized retry storms.”
}
},
{
“@type”: “Question”,
“name”: “How do you implement exactly-once processing in a task queue?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “At-least-once delivery (the default) means tasks may be processed twice. For exactly-once: make workers idempotent using the task_id as a deduplication key. Before processing: SET task:{id}:processing 1 NX EX 300 in Redis (atomic set-if-not-exists). If the key already exists, skip and ACK. After processing: set task:{id}:done 1 with a TTL longer than the visibility timeout. On retry, the done check short-circuits. This works when the processing action itself is idempotent (database upsert with unique constraint, Stripe idempotency key). True exactly-once also requires idempotent downstream systems.”
}
},
{
“@type”: “Question”,
“name”: “How do you scale a task queue to handle 100K tasks per second?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Partition by queue name u2014 each queue (email, image_resize, payments) is an independent sorted set in Redis, processed by its own worker pool. Scale workers horizontally: each stateless worker polls its assigned queues. Use Redis Cluster to shard queue data across multiple nodes. For burst traffic, use auto-scaling worker pools triggered by queue depth metrics (CloudWatch + ECS target tracking, or Kubernetes HPA on custom metrics). For very high throughput, replace Redis with Kafka: each task type = a Kafka topic with multiple partitions; consumer groups provide parallelism without polling overhead.”
}
}
]
}
Asked at: Uber Interview Guide
Asked at: Netflix Interview Guide
Asked at: Databricks Interview Guide
Asked at: Stripe Interview Guide
Asked at: Atlassian Interview Guide