Celery Task Queues — Background Jobs in Python
Some things shouldn't block your API response: sending emails, processing images, generating reports, syncing data. That's where Celery comes in — a distributed task queue that lets you push work to background workers. This guide covers everything from setup to production deployment.
When You Need a Task Queue
- Slow operations — image/video processing, PDF generation, ML inference
- External API calls — sending emails, SMS, webhooks (unreliable, slow)
- Scheduled jobs — daily reports, cleanup tasks, data syncs
- Rate-limited work — process 1000 items but API allows 10/sec
- Fan-out — one event triggers many independent actions
Setup
# Install
pip install celery[redis] redis
# Project structure
myapp/
├── app/
│ ├── __init__.py
│ ├── celery_app.py # Celery configuration
│ ├── tasks.py # Task definitions
│ └── main.py # FastAPI app
├── celery_worker.sh
├── docker-compose.yml
└── requirements.txt
Configure Celery
# app/celery_app.py
from celery import Celery
app = Celery(
"myapp",
broker="redis://localhost:6379/0", # Message broker
backend="redis://localhost:6379/1", # Result backend
)
# Configuration
app.conf.update(
# Serialization
task_serializer="json",
accept_content=["json"],
result_serializer="json",
# Timezone
timezone="UTC",
enable_utc=True,
# Reliability
task_acks_late=True, # Ack after task completes (not before)
worker_prefetch_multiplier=1, # Don't grab extra tasks
task_reject_on_worker_lost=True, # Re-queue if worker crashes
# Limits
task_time_limit=300, # Hard kill after 5 min
task_soft_time_limit=240, # Raise SoftTimeLimitExceeded at 4 min
# Results
result_expires=3600, # Results expire after 1 hour
)
# Auto-discover tasks in all app modules
app.autodiscover_tasks(["app"])
Defining Tasks
# app/tasks.py
import time
import httpx
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(bind=True, name="send_email")
def send_email(self, to: str, subject: str, body: str):
"""Send an email — retries on failure."""
logger.info(f"Sending email to {to}: {subject}")
try:
# Simulate sending (replace with real SMTP/API)
response = httpx.post(
"https://api.mailservice.com/send",
json={"to": to, "subject": subject, "body": body},
timeout=10.0,
)
response.raise_for_status()
return {"status": "sent", "to": to}
except httpx.HTTPError as exc:
logger.warning(f"Email failed: {exc}. Retrying...")
raise self.retry(exc=exc, countdown=60, max_retries=3)
@shared_task(name="process_image")
def process_image(image_path: str, operations: list[str]) -> dict:
"""Heavy image processing — runs in background."""
logger.info(f"Processing {image_path}: {operations}")
# Simulate work
time.sleep(5)
result_path = image_path.replace(".", "_processed.")
return {
"input": image_path,
"output": result_path,
"operations": operations,
}
@shared_task(
bind=True,
name="sync_data",
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True, # Exponential backoff
retry_backoff_max=600, # Max 10 min between retries
retry_jitter=True, # Add randomness
max_retries=5,
)
def sync_data(self, source_url: str):
"""Sync data from external API with automatic retry."""
logger.info(f"Syncing from {source_url} (attempt {self.request.retries + 1})")
with httpx.Client(timeout=30.0) as client:
resp = client.get(source_url)
resp.raise_for_status()
data = resp.json()
# Process data...
return {"synced": len(data), "source": source_url}
Calling Tasks
# Basic call — fire and forget
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up.")
# With options
send_email.apply_async(
args=["user@example.com", "Welcome!", "Thanks!"],
countdown=300, # Delay 5 minutes
expires=3600, # Discard if not executed within 1 hour
queue="emails", # Route to specific queue
)
# Get result
result = process_image.delay("/uploads/photo.jpg", ["resize", "compress"])
print(result.id) # Task ID (UUID)
print(result.status) # PENDING → STARTED → SUCCESS/FAILURE
print(result.get(timeout=30)) # Block until done (use sparingly!)
# Check without blocking
if result.ready():
print(result.result)
elif result.failed():
print(result.traceback)
Task chains and groups
from celery import chain, group, chord
# Chain: A → B → C (each gets result of previous)
workflow = chain(
fetch_data.s("https://api.example.com/users"),
transform_data.s(),
save_to_db.s(),
)
result = workflow.apply_async()
# Group: run A, B, C in parallel
batch = group(
process_image.s(f"/uploads/{i}.jpg", ["resize"])
for i in range(100)
)
results = batch.apply_async()
# results.get() # Wait for all 100
# Chord: run group in parallel, then callback when all done
workflow = chord(
[process_image.s(f"/uploads/{i}.jpg", ["resize"]) for i in range(10)],
body=generate_report.s(), # Called with list of results
)
workflow.apply_async()
Periodic Tasks (Celery Beat)
Celery Beat is a scheduler that kicks off tasks at intervals — like cron, but integrated with your task queue. See also our task scheduling guide for alternatives.
# app/celery_app.py — add to configuration
from celery.schedules import crontab
app.conf.beat_schedule = {
# Every 5 minutes
"check-api-health": {
"task": "check_health",
"schedule": 300.0, # seconds
},
# Every day at 2 AM UTC
"daily-cleanup": {
"task": "cleanup_old_data",
"schedule": crontab(hour=2, minute=0),
},
# Every Monday at 9 AM
"weekly-report": {
"task": "generate_weekly_report",
"schedule": crontab(hour=9, minute=0, day_of_week=1),
"args": ["admin@company.com"],
},
# Every hour during business hours (Mon-Fri, 9-17)
"sync-crm": {
"task": "sync_data",
"schedule": crontab(hour="9-17", minute=0, day_of_week="1-5"),
"args": ["https://api.crm.com/contacts"],
},
}
# Start Beat scheduler (separate process)
celery -A app.celery_app beat --loglevel=info
# Or combined worker + beat (dev only)
celery -A app.celery_app worker --beat --loglevel=info
FastAPI Integration
# app/main.py
from fastapi import FastAPI, HTTPException
from celery.result import AsyncResult
from .celery_app import app as celery_app
from . import tasks
api = FastAPI(title="Task Queue Demo")
@api.post("/tasks/email")
def queue_email(to: str, subject: str, body: str):
"""Queue an email for background sending."""
result = tasks.send_email.delay(to, subject, body)
return {"task_id": result.id, "status": "queued"}
@api.post("/tasks/process-image")
def queue_image_processing(path: str, operations: list[str]):
result = tasks.process_image.delay(path, operations)
return {"task_id": result.id, "status": "queued"}
@api.get("/tasks/{task_id}")
def get_task_status(task_id: str):
"""Check task status by ID."""
result = AsyncResult(task_id, app=celery_app)
response = {
"task_id": task_id,
"status": result.status,
}
if result.ready():
if result.successful():
response["result"] = result.result
else:
response["error"] = str(result.result)
response["traceback"] = result.traceback
return response
@api.post("/tasks/{task_id}/revoke")
def revoke_task(task_id: str, terminate: bool = False):
"""Cancel a pending or running task."""
celery_app.control.revoke(task_id, terminate=terminate)
return {"task_id": task_id, "action": "revoked"}
Error Handling & Dead Letter Queue
# app/tasks.py — robust error handling
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
import json
from pathlib import Path
DLQ_PATH = Path("dead_letter_queue/")
DLQ_PATH.mkdir(exist_ok=True)
class TaskFailure(Exception):
"""Non-retryable task failure."""
pass
@shared_task(
bind=True,
name="process_payment",
max_retries=3,
default_retry_delay=60,
)
def process_payment(self, order_id: str, amount: float):
try:
# Business logic...
if amount <= 0:
raise TaskFailure(f"Invalid amount: {amount}")
result = call_payment_api(order_id, amount)
return {"order_id": order_id, "status": "paid"}
except TaskFailure:
# Non-retryable — send to DLQ
_send_to_dlq(self, order_id=order_id, amount=amount)
raise
except SoftTimeLimitExceeded:
# Task took too long — log and retry
logger.error(f"Payment timeout for order {order_id}")
raise self.retry(countdown=120)
except Exception as exc:
if self.request.retries >= self.max_retries:
# All retries exhausted — DLQ
_send_to_dlq(self, order_id=order_id, amount=amount, error=str(exc))
raise
raise self.retry(exc=exc)
def _send_to_dlq(task, **context):
"""Save failed task to dead letter queue for manual review."""
dlq_entry = {
"task_id": task.request.id,
"task_name": task.name,
"retries": task.request.retries,
"context": context,
"timestamp": __import__("datetime").datetime.now().isoformat(),
}
dlq_file = DLQ_PATH / f"{task.request.id}.json"
dlq_file.write_text(json.dumps(dlq_entry, indent=2))
logger.error(f"Task {task.request.id} sent to DLQ: {dlq_file}")
Queue Routing
Route different tasks to different queues for priority and resource isolation:
# app/celery_app.py
app.conf.task_routes = {
"send_email": {"queue": "emails"},
"process_image": {"queue": "heavy"},
"process_payment": {"queue": "payments"},
"sync_data": {"queue": "default"},
}
app.conf.task_default_queue = "default"
# Start workers per queue
# Email worker — lightweight, many concurrent
celery -A app.celery_app worker -Q emails -c 20 --loglevel=info
# Heavy processing — fewer concurrent, more memory
celery -A app.celery_app worker -Q heavy -c 2 --loglevel=info
# Payment worker — single concurrency for ordering guarantees
celery -A app.celery_app worker -Q payments -c 1 --loglevel=info
# Default — handles everything else
celery -A app.celery_app worker -Q default -c 10 --loglevel=info
Monitoring with Flower
# Install
pip install flower
# Start Flower web UI
celery -A app.celery_app flower --port=5555
# Open http://localhost:5555
# See: active workers, task rates, success/failure, queue lengths
Flower gives you real-time visibility into your task queue:
- Dashboard — worker status, active/processed/failed task counts
- Tasks — search, filter, inspect individual task results and tracebacks
- Workers — pool size, memory usage, prefetch count
- Queues — message count per queue (spot bottlenecks)
- API — /api/tasks for programmatic monitoring
Docker Deployment
See our Docker guide for Dockerfile fundamentals.
# docker-compose.yml
services:
redis:
image: redis:7-alpine
ports: ["6379:6379"]
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
api:
build: .
command: uvicorn app.main:api --host 0.0.0.0 --port 8000
ports: ["8000:8000"]
environment:
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/1
depends_on:
redis: { condition: service_healthy }
worker-default:
build: .
command: celery -A app.celery_app worker -Q default -c 10 -l info
environment:
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/1
depends_on:
redis: { condition: service_healthy }
worker-heavy:
build: .
command: celery -A app.celery_app worker -Q heavy -c 2 -l info
environment:
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/1
depends_on:
redis: { condition: service_healthy }
deploy:
resources:
limits:
memory: 2G
beat:
build: .
command: celery -A app.celery_app beat -l info
environment:
CELERY_BROKER_URL: redis://redis:6379/0
depends_on:
redis: { condition: service_healthy }
flower:
build: .
command: celery -A app.celery_app flower --port=5555
ports: ["5555:5555"]
environment:
CELERY_BROKER_URL: redis://redis:6379/0
depends_on:
redis: { condition: service_healthy }
# Start everything
docker compose up -d
# Scale workers dynamically
docker compose up -d --scale worker-default=3
# API: http://localhost:8000/docs
# Flower: http://localhost:5555
Production Checklist
- acks_late = True — don't lose tasks when workers crash
- Time limits — both soft (graceful) and hard (kill) limits on every task
- Retry with backoff — exponential backoff + jitter for external calls
- Dead letter queue — failed tasks go somewhere reviewable, not into the void
- Idempotent tasks — tasks can run twice safely (dedup by task ID or business key)
- Queue routing — separate heavy/fast/critical tasks into different queues
- Monitoring — Flower + alerting on queue depth and failure rate
- Result expiry — set result_expires to prevent Redis from filling up
- Serialization — use JSON (not pickle) for security and debuggability
- Graceful shutdown — celery worker --without-mingle --without-gossip for faster restarts
🚀 Want production-ready task queue templates, API tools, and automation scripts?
Related Articles
- Build a REST API with FastAPI — integrate Celery with your API
- Task Scheduling & Cron Jobs in Python — alternatives to Celery Beat
- Python Redis Guide — the broker behind Celery
- Python Microservices — async communication between services
- Dockerize Python Apps — deploy workers in containers
- Python Async Programming — asyncio vs Celery for background work
Need help building a task queue system or background processing pipeline? I build Python APIs, workers, and automation tools. Reach out on Telegram →