Celery Task Queues — Background Jobs in Python

March 2026 · 22 min read · Python, Celery, Redis, Task Queues

Some things shouldn't block your API response: sending emails, processing images, generating reports, syncing data. That's where Celery comes in — a distributed task queue that lets you push work to background workers. This guide covers everything from setup to production deployment.

When You Need a Task Queue

💡 Don't reach for Celery too early. For simple cases, FastAPI's BackgroundTasks (see our FastAPI guide) or Python's threading.Thread might be enough. Celery shines when you need reliability, retries, scheduling, and horizontal scaling.

Setup

# Install
pip install celery[redis] redis

# Project structure
myapp/
├── app/
│   ├── __init__.py
│   ├── celery_app.py    # Celery configuration
│   ├── tasks.py         # Task definitions
│   └── main.py          # FastAPI app
├── celery_worker.sh
├── docker-compose.yml
└── requirements.txt

Configure Celery

# app/celery_app.py
from celery import Celery

app = Celery(
    "myapp",
    broker="redis://localhost:6379/0",       # Message broker
    backend="redis://localhost:6379/1",       # Result backend
)

# Configuration
app.conf.update(
    # Serialization
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",

    # Timezone
    timezone="UTC",
    enable_utc=True,

    # Reliability
    task_acks_late=True,              # Ack after task completes (not before)
    worker_prefetch_multiplier=1,     # Don't grab extra tasks
    task_reject_on_worker_lost=True,  # Re-queue if worker crashes

    # Limits
    task_time_limit=300,              # Hard kill after 5 min
    task_soft_time_limit=240,         # Raise SoftTimeLimitExceeded at 4 min

    # Results
    result_expires=3600,              # Results expire after 1 hour
)

# Auto-discover tasks in all app modules
app.autodiscover_tasks(["app"])

Defining Tasks

# app/tasks.py
import time
import httpx
from celery import shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


@shared_task(bind=True, name="send_email")
def send_email(self, to: str, subject: str, body: str):
    """Send an email — retries on failure."""
    logger.info(f"Sending email to {to}: {subject}")

    try:
        # Simulate sending (replace with real SMTP/API)
        response = httpx.post(
            "https://api.mailservice.com/send",
            json={"to": to, "subject": subject, "body": body},
            timeout=10.0,
        )
        response.raise_for_status()
        return {"status": "sent", "to": to}

    except httpx.HTTPError as exc:
        logger.warning(f"Email failed: {exc}. Retrying...")
        raise self.retry(exc=exc, countdown=60, max_retries=3)


@shared_task(name="process_image")
def process_image(image_path: str, operations: list[str]) -> dict:
    """Heavy image processing — runs in background."""
    logger.info(f"Processing {image_path}: {operations}")

    # Simulate work
    time.sleep(5)

    result_path = image_path.replace(".", "_processed.")
    return {
        "input": image_path,
        "output": result_path,
        "operations": operations,
    }


@shared_task(
    bind=True,
    name="sync_data",
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,        # Exponential backoff
    retry_backoff_max=600,     # Max 10 min between retries
    retry_jitter=True,         # Add randomness
    max_retries=5,
)
def sync_data(self, source_url: str):
    """Sync data from external API with automatic retry."""
    logger.info(f"Syncing from {source_url} (attempt {self.request.retries + 1})")

    with httpx.Client(timeout=30.0) as client:
        resp = client.get(source_url)
        resp.raise_for_status()
        data = resp.json()

    # Process data...
    return {"synced": len(data), "source": source_url}

Calling Tasks

# Basic call — fire and forget
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up.")

# With options
send_email.apply_async(
    args=["user@example.com", "Welcome!", "Thanks!"],
    countdown=300,        # Delay 5 minutes
    expires=3600,         # Discard if not executed within 1 hour
    queue="emails",       # Route to specific queue
)

# Get result
result = process_image.delay("/uploads/photo.jpg", ["resize", "compress"])
print(result.id)          # Task ID (UUID)
print(result.status)      # PENDING → STARTED → SUCCESS/FAILURE
print(result.get(timeout=30))  # Block until done (use sparingly!)

# Check without blocking
if result.ready():
    print(result.result)
elif result.failed():
    print(result.traceback)

Task chains and groups

from celery import chain, group, chord


# Chain: A → B → C (each gets result of previous)
workflow = chain(
    fetch_data.s("https://api.example.com/users"),
    transform_data.s(),
    save_to_db.s(),
)
result = workflow.apply_async()


# Group: run A, B, C in parallel
batch = group(
    process_image.s(f"/uploads/{i}.jpg", ["resize"])
    for i in range(100)
)
results = batch.apply_async()
# results.get()  # Wait for all 100


# Chord: run group in parallel, then callback when all done
workflow = chord(
    [process_image.s(f"/uploads/{i}.jpg", ["resize"]) for i in range(10)],
    body=generate_report.s(),  # Called with list of results
)
workflow.apply_async()

Periodic Tasks (Celery Beat)

Celery Beat is a scheduler that kicks off tasks at intervals — like cron, but integrated with your task queue. See also our task scheduling guide for alternatives.

# app/celery_app.py — add to configuration
from celery.schedules import crontab

app.conf.beat_schedule = {
    # Every 5 minutes
    "check-api-health": {
        "task": "check_health",
        "schedule": 300.0,  # seconds
    },

    # Every day at 2 AM UTC
    "daily-cleanup": {
        "task": "cleanup_old_data",
        "schedule": crontab(hour=2, minute=0),
    },

    # Every Monday at 9 AM
    "weekly-report": {
        "task": "generate_weekly_report",
        "schedule": crontab(hour=9, minute=0, day_of_week=1),
        "args": ["admin@company.com"],
    },

    # Every hour during business hours (Mon-Fri, 9-17)
    "sync-crm": {
        "task": "sync_data",
        "schedule": crontab(hour="9-17", minute=0, day_of_week="1-5"),
        "args": ["https://api.crm.com/contacts"],
    },
}
# Start Beat scheduler (separate process)
celery -A app.celery_app beat --loglevel=info

# Or combined worker + beat (dev only)
celery -A app.celery_app worker --beat --loglevel=info

FastAPI Integration

# app/main.py
from fastapi import FastAPI, HTTPException
from celery.result import AsyncResult
from .celery_app import app as celery_app
from . import tasks

api = FastAPI(title="Task Queue Demo")


@api.post("/tasks/email")
def queue_email(to: str, subject: str, body: str):
    """Queue an email for background sending."""
    result = tasks.send_email.delay(to, subject, body)
    return {"task_id": result.id, "status": "queued"}


@api.post("/tasks/process-image")
def queue_image_processing(path: str, operations: list[str]):
    result = tasks.process_image.delay(path, operations)
    return {"task_id": result.id, "status": "queued"}


@api.get("/tasks/{task_id}")
def get_task_status(task_id: str):
    """Check task status by ID."""
    result = AsyncResult(task_id, app=celery_app)

    response = {
        "task_id": task_id,
        "status": result.status,
    }

    if result.ready():
        if result.successful():
            response["result"] = result.result
        else:
            response["error"] = str(result.result)
            response["traceback"] = result.traceback

    return response


@api.post("/tasks/{task_id}/revoke")
def revoke_task(task_id: str, terminate: bool = False):
    """Cancel a pending or running task."""
    celery_app.control.revoke(task_id, terminate=terminate)
    return {"task_id": task_id, "action": "revoked"}

Error Handling & Dead Letter Queue

# app/tasks.py — robust error handling
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
import json
from pathlib import Path


DLQ_PATH = Path("dead_letter_queue/")
DLQ_PATH.mkdir(exist_ok=True)


class TaskFailure(Exception):
    """Non-retryable task failure."""
    pass


@shared_task(
    bind=True,
    name="process_payment",
    max_retries=3,
    default_retry_delay=60,
)
def process_payment(self, order_id: str, amount: float):
    try:
        # Business logic...
        if amount <= 0:
            raise TaskFailure(f"Invalid amount: {amount}")

        result = call_payment_api(order_id, amount)
        return {"order_id": order_id, "status": "paid"}

    except TaskFailure:
        # Non-retryable — send to DLQ
        _send_to_dlq(self, order_id=order_id, amount=amount)
        raise

    except SoftTimeLimitExceeded:
        # Task took too long — log and retry
        logger.error(f"Payment timeout for order {order_id}")
        raise self.retry(countdown=120)

    except Exception as exc:
        if self.request.retries >= self.max_retries:
            # All retries exhausted — DLQ
            _send_to_dlq(self, order_id=order_id, amount=amount, error=str(exc))
            raise
        raise self.retry(exc=exc)


def _send_to_dlq(task, **context):
    """Save failed task to dead letter queue for manual review."""
    dlq_entry = {
        "task_id": task.request.id,
        "task_name": task.name,
        "retries": task.request.retries,
        "context": context,
        "timestamp": __import__("datetime").datetime.now().isoformat(),
    }
    dlq_file = DLQ_PATH / f"{task.request.id}.json"
    dlq_file.write_text(json.dumps(dlq_entry, indent=2))
    logger.error(f"Task {task.request.id} sent to DLQ: {dlq_file}")

Queue Routing

Route different tasks to different queues for priority and resource isolation:

# app/celery_app.py
app.conf.task_routes = {
    "send_email": {"queue": "emails"},
    "process_image": {"queue": "heavy"},
    "process_payment": {"queue": "payments"},
    "sync_data": {"queue": "default"},
}

app.conf.task_default_queue = "default"
# Start workers per queue
# Email worker — lightweight, many concurrent
celery -A app.celery_app worker -Q emails -c 20 --loglevel=info

# Heavy processing — fewer concurrent, more memory
celery -A app.celery_app worker -Q heavy -c 2 --loglevel=info

# Payment worker — single concurrency for ordering guarantees
celery -A app.celery_app worker -Q payments -c 1 --loglevel=info

# Default — handles everything else
celery -A app.celery_app worker -Q default -c 10 --loglevel=info

Monitoring with Flower

# Install
pip install flower

# Start Flower web UI
celery -A app.celery_app flower --port=5555

# Open http://localhost:5555
# See: active workers, task rates, success/failure, queue lengths

Flower gives you real-time visibility into your task queue:

Docker Deployment

See our Docker guide for Dockerfile fundamentals.

# docker-compose.yml
services:
  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s

  api:
    build: .
    command: uvicorn app.main:api --host 0.0.0.0 --port 8000
    ports: ["8000:8000"]
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
      CELERY_RESULT_BACKEND: redis://redis:6379/1
    depends_on:
      redis: { condition: service_healthy }

  worker-default:
    build: .
    command: celery -A app.celery_app worker -Q default -c 10 -l info
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
      CELERY_RESULT_BACKEND: redis://redis:6379/1
    depends_on:
      redis: { condition: service_healthy }

  worker-heavy:
    build: .
    command: celery -A app.celery_app worker -Q heavy -c 2 -l info
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
      CELERY_RESULT_BACKEND: redis://redis:6379/1
    depends_on:
      redis: { condition: service_healthy }
    deploy:
      resources:
        limits:
          memory: 2G

  beat:
    build: .
    command: celery -A app.celery_app beat -l info
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
    depends_on:
      redis: { condition: service_healthy }

  flower:
    build: .
    command: celery -A app.celery_app flower --port=5555
    ports: ["5555:5555"]
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
    depends_on:
      redis: { condition: service_healthy }
# Start everything
docker compose up -d

# Scale workers dynamically
docker compose up -d --scale worker-default=3

# API: http://localhost:8000/docs
# Flower: http://localhost:5555

Production Checklist

🔑 The #1 mistake: Passing large objects (files, DataFrames) as task arguments. Instead, save to storage (S3, disk) and pass the path/key. Celery arguments go through Redis — keep them small.

🚀 Want production-ready task queue templates, API tools, and automation scripts?

Get the AI Agent Toolkit →

Related Articles

Need help building a task queue system or background processing pipeline? I build Python APIs, workers, and automation tools. Reach out on Telegram →