Python & Redis — Caching, Queues & Real-Time Data

March 2026 · 22 min read · Python, Redis, Caching, Queues

Redis is the Swiss Army knife of backend development — an in-memory data store that handles caching, message queues, real-time pub/sub, rate limiting, sessions, and leaderboards. If you're building anything beyond a simple script, you'll eventually need Redis. This guide covers every practical pattern with Python code you can ship today.

Setup

# Install
pip install redis  # redis-py (sync + async)

# Run Redis (Docker)
docker run -d --name redis -p 6379:6379 redis:7-alpine

# Or install locally
# macOS: brew install redis && brew services start redis
# Ubuntu: sudo apt install redis-server
# Basic connection
import redis

# Sync client
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
r.ping()  # True

# Async client
import redis.asyncio as aioredis

async def main():
    r = aioredis.from_url("redis://localhost:6379", decode_responses=True)
    await r.ping()  # True
    await r.close()
💡 Always use decode_responses=True — otherwise Redis returns bytes, not strings. You'll spend hours debugging b'value' vs 'value'.

1. Caching — The #1 Use Case

Simple key-value cache

import json
import hashlib
from datetime import timedelta


class RedisCache:
    """Production-ready caching layer."""

    def __init__(self, redis_client, prefix: str = "cache", default_ttl: int = 300):
        self.r = redis_client
        self.prefix = prefix
        self.default_ttl = default_ttl

    def _key(self, name: str) -> str:
        return f"{self.prefix}:{name}"

    def get(self, key: str):
        """Get cached value, returns None if miss."""
        data = self.r.get(self._key(key))
        if data is None:
            return None
        return json.loads(data)

    def set(self, key: str, value, ttl: int = None):
        """Cache a value with TTL (seconds)."""
        self.r.setex(
            self._key(key),
            ttl or self.default_ttl,
            json.dumps(value, default=str),
        )

    def delete(self, key: str):
        """Invalidate a cached entry."""
        self.r.delete(self._key(key))

    def get_or_set(self, key: str, factory, ttl: int = None):
        """Cache-aside pattern: get from cache, or compute and cache."""
        value = self.get(key)
        if value is not None:
            return value

        value = factory()
        self.set(key, value, ttl)
        return value


# --- Usage ---
cache = RedisCache(r, prefix="app", default_ttl=600)

# Cache API responses
def fetch_user_profile(user_id: int):
    return cache.get_or_set(
        f"user:{user_id}",
        lambda: expensive_api_call(user_id),
        ttl=300,
    )

# Invalidate on update
def update_user(user_id: int, data: dict):
    save_to_db(user_id, data)
    cache.delete(f"user:{user_id}")  # next read will refresh

Decorator-based caching

import functools


def cached(ttl: int = 300, prefix: str = "fn"):
    """Decorator: cache function results in Redis."""
    def decorator(fn):
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            # Build cache key from function name + arguments
            key_data = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True, default=str)
            key_hash = hashlib.md5(key_data.encode()).hexdigest()
            cache_key = f"{prefix}:{fn.__name__}:{key_hash}"

            # Check cache
            cached_result = r.get(cache_key)
            if cached_result is not None:
                return json.loads(cached_result)

            # Compute and cache
            result = fn(*args, **kwargs)
            r.setex(cache_key, ttl, json.dumps(result, default=str))
            return result

        wrapper.invalidate = lambda: r.delete(f"{prefix}:{fn.__name__}:*")
        return wrapper
    return decorator


@cached(ttl=600)
def get_exchange_rate(from_currency: str, to_currency: str) -> float:
    """Expensive API call — cached for 10 minutes."""
    resp = requests.get(f"https://api.exchangerate.host/convert?from={from_currency}&to={to_currency}")
    return resp.json()["result"]

2. Task Queues

Redis lists make excellent lightweight task queues. For simple needs, you don't need Celery — a Redis list + worker loop is enough. See also our task scheduling guide.

import json
import time
import signal
import traceback
from datetime import datetime, timezone


class SimpleQueue:
    """Redis-backed task queue with retry and dead-letter."""

    def __init__(self, redis_client, name: str = "tasks"):
        self.r = redis_client
        self.queue = f"queue:{name}"
        self.processing = f"queue:{name}:processing"
        self.dead_letter = f"queue:{name}:dead"
        self.max_retries = 3

    def enqueue(self, task_type: str, payload: dict, priority: int = 0):
        """Add a task to the queue."""
        task = json.dumps({
            "id": f"{task_type}:{int(time.time()*1000)}",
            "type": task_type,
            "payload": payload,
            "retries": 0,
            "created_at": datetime.now(timezone.utc).isoformat(),
        })
        if priority > 0:
            self.r.lpush(self.queue, task)  # high priority → front
        else:
            self.r.rpush(self.queue, task)  # normal → back

    def dequeue(self, timeout: int = 5):
        """Blocking dequeue with reliability (BRPOPLPUSH)."""
        result = self.r.brpoplpush(self.queue, self.processing, timeout)
        if result is None:
            return None
        return json.loads(result)

    def complete(self, task: dict):
        """Mark task as done — remove from processing."""
        self.r.lrem(self.processing, 1, json.dumps(task, default=str))

    def retry_or_dead_letter(self, task: dict, error: str):
        """Retry failed task or move to dead letter queue."""
        self.r.lrem(self.processing, 1, json.dumps(task, default=str))

        task["retries"] += 1
        task["last_error"] = error

        if task["retries"] <= self.max_retries:
            # Re-enqueue with delay (exponential backoff)
            delay = 2 ** task["retries"]
            task["retry_after"] = (
                datetime.now(timezone.utc).timestamp() + delay
            )
            self.r.rpush(self.queue, json.dumps(task, default=str))
        else:
            # Move to dead letter queue for manual inspection
            self.r.rpush(self.dead_letter, json.dumps(task, default=str))

    def stats(self) -> dict:
        return {
            "pending": self.r.llen(self.queue),
            "processing": self.r.llen(self.processing),
            "dead_letter": self.r.llen(self.dead_letter),
        }


# --- Worker ---
class Worker:
    """Process tasks from the queue."""

    def __init__(self, queue: SimpleQueue):
        self.queue = queue
        self.handlers = {}
        self.running = True

        signal.signal(signal.SIGTERM, self._shutdown)
        signal.signal(signal.SIGINT, self._shutdown)

    def register(self, task_type: str):
        """Decorator to register task handlers."""
        def decorator(fn):
            self.handlers[task_type] = fn
            return fn
        return decorator

    def _shutdown(self, *_):
        print("Shutting down gracefully...")
        self.running = False

    def run(self):
        """Main worker loop."""
        print(f"Worker started. Handlers: {list(self.handlers.keys())}")

        while self.running:
            task = self.queue.dequeue(timeout=2)
            if task is None:
                continue

            handler = self.handlers.get(task["type"])
            if not handler:
                print(f"No handler for task type: {task['type']}")
                self.queue.complete(task)
                continue

            try:
                handler(task["payload"])
                self.queue.complete(task)
                print(f"✅ {task['id']} completed")
            except Exception as e:
                print(f"❌ {task['id']} failed: {e}")
                self.queue.retry_or_dead_letter(task, str(e))


# --- Usage ---
queue = SimpleQueue(r, "emails")
worker = Worker(queue)

@worker.register("send_email")
def handle_send_email(payload):
    send_email(payload["to"], payload["subject"], payload["body"])

@worker.register("generate_report")
def handle_report(payload):
    report = generate_report(payload["report_type"])
    save_report(report)

# Producer (your API):
queue.enqueue("send_email", {
    "to": "alice@example.com",
    "subject": "Welcome!",
    "body": "Thanks for signing up.",
})

# Consumer (separate process):
# worker.run()

3. Pub/Sub — Real-Time Messaging

Redis Pub/Sub is ideal for real-time notifications between services. We used this pattern in our microservices guide.

import threading


# --- Publisher ---
def publish_event(channel: str, data: dict):
    """Publish an event to a Redis channel."""
    r.publish(channel, json.dumps(data, default=str))


# --- Subscriber ---
def subscribe_events(*channels: str, handlers: dict = None):
    """Subscribe to Redis channels and process messages."""
    pubsub = r.pubsub()
    pubsub.subscribe(*channels)

    print(f"Subscribed to: {channels}")

    for message in pubsub.listen():
        if message["type"] != "message":
            continue

        channel = message["channel"]
        data = json.loads(message["data"])

        handler = (handlers or {}).get(channel)
        if handler:
            try:
                handler(data)
            except Exception as e:
                print(f"Handler error on {channel}: {e}")


# --- Usage ---
def on_user_signup(data):
    print(f"New user: {data['email']}")

def on_order(data):
    print(f"New order: ${data['total']}")

# In a separate thread or process:
thread = threading.Thread(
    target=subscribe_events,
    args=("user.signup", "order.created"),
    kwargs={"handlers": {
        "user.signup": on_user_signup,
        "order.created": on_order,
    }},
    daemon=True,
)
thread.start()

# Publish from anywhere:
publish_event("user.signup", {"email": "bob@example.com", "plan": "pro"})
⚠️ Important: Redis Pub/Sub is fire-and-forget — if no subscriber is listening, messages are lost. For guaranteed delivery, use Redis Streams or a proper message broker (RabbitMQ, Kafka).

4. Rate Limiting

Protect your APIs from abuse with a sliding window rate limiter:

import time


class RateLimiter:
    """Sliding window rate limiter using Redis sorted sets."""

    def __init__(self, redis_client, max_requests: int = 100, window_seconds: int = 60):
        self.r = redis_client
        self.max_requests = max_requests
        self.window = window_seconds

    def is_allowed(self, key: str) -> tuple[bool, dict]:
        """Check if request is allowed. Returns (allowed, info)."""
        now = time.time()
        window_start = now - self.window
        pipe_key = f"ratelimit:{key}"

        pipe = self.r.pipeline()
        # Remove expired entries
        pipe.zremrangebyscore(pipe_key, 0, window_start)
        # Count current window
        pipe.zcard(pipe_key)
        # Add current request
        pipe.zadd(pipe_key, {str(now): now})
        # Set expiry on the key
        pipe.expire(pipe_key, self.window)
        results = pipe.execute()

        current_count = results[1]
        allowed = current_count < self.max_requests

        info = {
            "allowed": allowed,
            "current": current_count,
            "limit": self.max_requests,
            "remaining": max(0, self.max_requests - current_count - 1),
            "reset_at": int(now + self.window),
        }

        if not allowed:
            # Remove the request we just added
            self.r.zrem(pipe_key, str(now))

        return allowed, info


# --- FastAPI middleware ---
from fastapi import Request, HTTPException

limiter = RateLimiter(r, max_requests=60, window_seconds=60)

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    client_ip = request.client.host
    allowed, info = limiter.is_allowed(client_ip)

    if not allowed:
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded",
            headers={
                "X-RateLimit-Limit": str(info["limit"]),
                "X-RateLimit-Remaining": "0",
                "X-RateLimit-Reset": str(info["reset_at"]),
                "Retry-After": str(info["reset_at"] - int(time.time())),
            },
        )

    response = await call_next(request)
    response.headers["X-RateLimit-Limit"] = str(info["limit"])
    response.headers["X-RateLimit-Remaining"] = str(info["remaining"])
    return response

5. Session Storage

import secrets


class RedisSessionStore:
    """Server-side session storage with Redis."""

    def __init__(self, redis_client, ttl: int = 3600):
        self.r = redis_client
        self.ttl = ttl
        self.prefix = "session"

    def create(self, user_id: str, data: dict = None) -> str:
        """Create a new session, return session ID."""
        session_id = secrets.token_urlsafe(32)
        key = f"{self.prefix}:{session_id}"

        session_data = {
            "user_id": user_id,
            "created_at": datetime.now(timezone.utc).isoformat(),
            **(data or {}),
        }

        self.r.hset(key, mapping={
            k: json.dumps(v) if isinstance(v, (dict, list)) else str(v)
            for k, v in session_data.items()
        })
        self.r.expire(key, self.ttl)
        return session_id

    def get(self, session_id: str) -> dict | None:
        """Get session data. Returns None if expired/missing."""
        key = f"{self.prefix}:{session_id}"
        data = self.r.hgetall(key)
        if not data:
            return None
        # Refresh TTL on access (sliding expiration)
        self.r.expire(key, self.ttl)
        return data

    def update(self, session_id: str, **kwargs):
        """Update session fields."""
        key = f"{self.prefix}:{session_id}"
        if not self.r.exists(key):
            raise ValueError("Session not found")
        self.r.hset(key, mapping={k: str(v) for k, v in kwargs.items()})

    def destroy(self, session_id: str):
        """Delete session (logout)."""
        self.r.delete(f"{self.prefix}:{session_id}")

6. Leaderboards & Sorted Data

class Leaderboard:
    """Real-time leaderboard using Redis sorted sets."""

    def __init__(self, redis_client, name: str = "leaderboard"):
        self.r = redis_client
        self.key = f"lb:{name}"

    def add_score(self, player: str, score: float):
        """Add or update a player's score."""
        self.r.zadd(self.key, {player: score})

    def increment(self, player: str, amount: float = 1):
        """Increment a player's score."""
        return self.r.zincrby(self.key, amount, player)

    def top(self, n: int = 10) -> list[tuple[str, float]]:
        """Get top N players (highest scores first)."""
        return self.r.zrevrange(self.key, 0, n - 1, withscores=True)

    def rank(self, player: str) -> int | None:
        """Get player's rank (0-indexed, 0 = first)."""
        rank = self.r.zrevrank(self.key, player)
        return rank

    def score(self, player: str) -> float | None:
        """Get player's score."""
        return self.r.zscore(self.key, player)

    def around(self, player: str, n: int = 2) -> list[tuple[str, float]]:
        """Get players around a specific player."""
        rank = self.rank(player)
        if rank is None:
            return []
        start = max(0, rank - n)
        end = rank + n
        return self.r.zrevrange(self.key, start, end, withscores=True)


# --- Usage ---
lb = Leaderboard(r, "weekly_scores")
lb.add_score("alice", 1500)
lb.add_score("bob", 2300)
lb.increment("alice", 200)  # alice: 1700

print(lb.top(5))
# [('bob', 2300.0), ('alice', 1700.0)]

print(f"Alice rank: #{lb.rank('alice') + 1}")  # #2

7. Distributed Locks

import uuid
import time


class RedisLock:
    """Distributed lock using Redis (simplified Redlock)."""

    def __init__(self, redis_client, name: str, ttl: int = 10):
        self.r = redis_client
        self.key = f"lock:{name}"
        self.ttl = ttl
        self.token = str(uuid.uuid4())

    def acquire(self, timeout: float = 5.0) -> bool:
        """Try to acquire lock within timeout."""
        deadline = time.time() + timeout

        while time.time() < deadline:
            if self.r.set(self.key, self.token, nx=True, ex=self.ttl):
                return True
            time.sleep(0.1)

        return False

    def release(self):
        """Release lock (only if we own it)."""
        # Lua script for atomic check-and-delete
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        self.r.eval(script, 1, self.key, self.token)

    def __enter__(self):
        if not self.acquire():
            raise TimeoutError(f"Could not acquire lock: {self.key}")
        return self

    def __exit__(self, *_):
        self.release()


# --- Usage ---
with RedisLock(r, "payment-processing", ttl=30):
    # Only one process can execute this block at a time
    process_payment(order_id)

8. Redis Streams (Persistent Events)

Unlike Pub/Sub, Streams persist messages and support consumer groups — messages aren't lost if no one is listening.

# Producer
r.xadd("orders", {
    "type": "order.created",
    "order_id": "abc-123",
    "total": "99.99",
})

# Consumer group
r.xgroup_create("orders", "notification-service", id="0", mkstream=True)

# Consumer (reads unacknowledged messages)
while True:
    messages = r.xreadgroup(
        "notification-service",  # group
        "worker-1",              # consumer name
        {"orders": ">"},         # ">" = new messages only
        count=10,
        block=5000,              # block 5 seconds
    )

    for stream, entries in messages:
        for msg_id, data in entries:
            print(f"Processing: {data}")

            # Process the message...
            handle_order_event(data)

            # Acknowledge — won't be redelivered
            r.xack("orders", "notification-service", msg_id)

Production Tips

🚀 Want production-ready caching, queue, and automation scripts?

Get the AI Agent Toolkit →

Related Articles

Need help setting up Redis caching or queues for your project? I build Python backends, APIs, and automation tools. Reach out on Telegram →