Python & Redis — Caching, Queues & Real-Time Data
Redis is the Swiss Army knife of backend development — an in-memory data store that handles caching, message queues, real-time pub/sub, rate limiting, sessions, and leaderboards. If you're building anything beyond a simple script, you'll eventually need Redis. This guide covers every practical pattern with Python code you can ship today.
Setup
# Install
pip install redis # redis-py (sync + async)
# Run Redis (Docker)
docker run -d --name redis -p 6379:6379 redis:7-alpine
# Or install locally
# macOS: brew install redis && brew services start redis
# Ubuntu: sudo apt install redis-server
# Basic connection
import redis
# Sync client
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
r.ping() # True
# Async client
import redis.asyncio as aioredis
async def main():
r = aioredis.from_url("redis://localhost:6379", decode_responses=True)
await r.ping() # True
await r.close()
1. Caching — The #1 Use Case
Simple key-value cache
import json
import hashlib
from datetime import timedelta
class RedisCache:
"""Production-ready caching layer."""
def __init__(self, redis_client, prefix: str = "cache", default_ttl: int = 300):
self.r = redis_client
self.prefix = prefix
self.default_ttl = default_ttl
def _key(self, name: str) -> str:
return f"{self.prefix}:{name}"
def get(self, key: str):
"""Get cached value, returns None if miss."""
data = self.r.get(self._key(key))
if data is None:
return None
return json.loads(data)
def set(self, key: str, value, ttl: int = None):
"""Cache a value with TTL (seconds)."""
self.r.setex(
self._key(key),
ttl or self.default_ttl,
json.dumps(value, default=str),
)
def delete(self, key: str):
"""Invalidate a cached entry."""
self.r.delete(self._key(key))
def get_or_set(self, key: str, factory, ttl: int = None):
"""Cache-aside pattern: get from cache, or compute and cache."""
value = self.get(key)
if value is not None:
return value
value = factory()
self.set(key, value, ttl)
return value
# --- Usage ---
cache = RedisCache(r, prefix="app", default_ttl=600)
# Cache API responses
def fetch_user_profile(user_id: int):
return cache.get_or_set(
f"user:{user_id}",
lambda: expensive_api_call(user_id),
ttl=300,
)
# Invalidate on update
def update_user(user_id: int, data: dict):
save_to_db(user_id, data)
cache.delete(f"user:{user_id}") # next read will refresh
Decorator-based caching
import functools
def cached(ttl: int = 300, prefix: str = "fn"):
"""Decorator: cache function results in Redis."""
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
# Build cache key from function name + arguments
key_data = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True, default=str)
key_hash = hashlib.md5(key_data.encode()).hexdigest()
cache_key = f"{prefix}:{fn.__name__}:{key_hash}"
# Check cache
cached_result = r.get(cache_key)
if cached_result is not None:
return json.loads(cached_result)
# Compute and cache
result = fn(*args, **kwargs)
r.setex(cache_key, ttl, json.dumps(result, default=str))
return result
wrapper.invalidate = lambda: r.delete(f"{prefix}:{fn.__name__}:*")
return wrapper
return decorator
@cached(ttl=600)
def get_exchange_rate(from_currency: str, to_currency: str) -> float:
"""Expensive API call — cached for 10 minutes."""
resp = requests.get(f"https://api.exchangerate.host/convert?from={from_currency}&to={to_currency}")
return resp.json()["result"]
2. Task Queues
Redis lists make excellent lightweight task queues. For simple needs, you don't need Celery — a Redis list + worker loop is enough. See also our task scheduling guide.
import json
import time
import signal
import traceback
from datetime import datetime, timezone
class SimpleQueue:
"""Redis-backed task queue with retry and dead-letter."""
def __init__(self, redis_client, name: str = "tasks"):
self.r = redis_client
self.queue = f"queue:{name}"
self.processing = f"queue:{name}:processing"
self.dead_letter = f"queue:{name}:dead"
self.max_retries = 3
def enqueue(self, task_type: str, payload: dict, priority: int = 0):
"""Add a task to the queue."""
task = json.dumps({
"id": f"{task_type}:{int(time.time()*1000)}",
"type": task_type,
"payload": payload,
"retries": 0,
"created_at": datetime.now(timezone.utc).isoformat(),
})
if priority > 0:
self.r.lpush(self.queue, task) # high priority → front
else:
self.r.rpush(self.queue, task) # normal → back
def dequeue(self, timeout: int = 5):
"""Blocking dequeue with reliability (BRPOPLPUSH)."""
result = self.r.brpoplpush(self.queue, self.processing, timeout)
if result is None:
return None
return json.loads(result)
def complete(self, task: dict):
"""Mark task as done — remove from processing."""
self.r.lrem(self.processing, 1, json.dumps(task, default=str))
def retry_or_dead_letter(self, task: dict, error: str):
"""Retry failed task or move to dead letter queue."""
self.r.lrem(self.processing, 1, json.dumps(task, default=str))
task["retries"] += 1
task["last_error"] = error
if task["retries"] <= self.max_retries:
# Re-enqueue with delay (exponential backoff)
delay = 2 ** task["retries"]
task["retry_after"] = (
datetime.now(timezone.utc).timestamp() + delay
)
self.r.rpush(self.queue, json.dumps(task, default=str))
else:
# Move to dead letter queue for manual inspection
self.r.rpush(self.dead_letter, json.dumps(task, default=str))
def stats(self) -> dict:
return {
"pending": self.r.llen(self.queue),
"processing": self.r.llen(self.processing),
"dead_letter": self.r.llen(self.dead_letter),
}
# --- Worker ---
class Worker:
"""Process tasks from the queue."""
def __init__(self, queue: SimpleQueue):
self.queue = queue
self.handlers = {}
self.running = True
signal.signal(signal.SIGTERM, self._shutdown)
signal.signal(signal.SIGINT, self._shutdown)
def register(self, task_type: str):
"""Decorator to register task handlers."""
def decorator(fn):
self.handlers[task_type] = fn
return fn
return decorator
def _shutdown(self, *_):
print("Shutting down gracefully...")
self.running = False
def run(self):
"""Main worker loop."""
print(f"Worker started. Handlers: {list(self.handlers.keys())}")
while self.running:
task = self.queue.dequeue(timeout=2)
if task is None:
continue
handler = self.handlers.get(task["type"])
if not handler:
print(f"No handler for task type: {task['type']}")
self.queue.complete(task)
continue
try:
handler(task["payload"])
self.queue.complete(task)
print(f"✅ {task['id']} completed")
except Exception as e:
print(f"❌ {task['id']} failed: {e}")
self.queue.retry_or_dead_letter(task, str(e))
# --- Usage ---
queue = SimpleQueue(r, "emails")
worker = Worker(queue)
@worker.register("send_email")
def handle_send_email(payload):
send_email(payload["to"], payload["subject"], payload["body"])
@worker.register("generate_report")
def handle_report(payload):
report = generate_report(payload["report_type"])
save_report(report)
# Producer (your API):
queue.enqueue("send_email", {
"to": "alice@example.com",
"subject": "Welcome!",
"body": "Thanks for signing up.",
})
# Consumer (separate process):
# worker.run()
3. Pub/Sub — Real-Time Messaging
Redis Pub/Sub is ideal for real-time notifications between services. We used this pattern in our microservices guide.
import threading
# --- Publisher ---
def publish_event(channel: str, data: dict):
"""Publish an event to a Redis channel."""
r.publish(channel, json.dumps(data, default=str))
# --- Subscriber ---
def subscribe_events(*channels: str, handlers: dict = None):
"""Subscribe to Redis channels and process messages."""
pubsub = r.pubsub()
pubsub.subscribe(*channels)
print(f"Subscribed to: {channels}")
for message in pubsub.listen():
if message["type"] != "message":
continue
channel = message["channel"]
data = json.loads(message["data"])
handler = (handlers or {}).get(channel)
if handler:
try:
handler(data)
except Exception as e:
print(f"Handler error on {channel}: {e}")
# --- Usage ---
def on_user_signup(data):
print(f"New user: {data['email']}")
def on_order(data):
print(f"New order: ${data['total']}")
# In a separate thread or process:
thread = threading.Thread(
target=subscribe_events,
args=("user.signup", "order.created"),
kwargs={"handlers": {
"user.signup": on_user_signup,
"order.created": on_order,
}},
daemon=True,
)
thread.start()
# Publish from anywhere:
publish_event("user.signup", {"email": "bob@example.com", "plan": "pro"})
4. Rate Limiting
Protect your APIs from abuse with a sliding window rate limiter:
import time
class RateLimiter:
"""Sliding window rate limiter using Redis sorted sets."""
def __init__(self, redis_client, max_requests: int = 100, window_seconds: int = 60):
self.r = redis_client
self.max_requests = max_requests
self.window = window_seconds
def is_allowed(self, key: str) -> tuple[bool, dict]:
"""Check if request is allowed. Returns (allowed, info)."""
now = time.time()
window_start = now - self.window
pipe_key = f"ratelimit:{key}"
pipe = self.r.pipeline()
# Remove expired entries
pipe.zremrangebyscore(pipe_key, 0, window_start)
# Count current window
pipe.zcard(pipe_key)
# Add current request
pipe.zadd(pipe_key, {str(now): now})
# Set expiry on the key
pipe.expire(pipe_key, self.window)
results = pipe.execute()
current_count = results[1]
allowed = current_count < self.max_requests
info = {
"allowed": allowed,
"current": current_count,
"limit": self.max_requests,
"remaining": max(0, self.max_requests - current_count - 1),
"reset_at": int(now + self.window),
}
if not allowed:
# Remove the request we just added
self.r.zrem(pipe_key, str(now))
return allowed, info
# --- FastAPI middleware ---
from fastapi import Request, HTTPException
limiter = RateLimiter(r, max_requests=60, window_seconds=60)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host
allowed, info = limiter.is_allowed(client_ip)
if not allowed:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={
"X-RateLimit-Limit": str(info["limit"]),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(info["reset_at"]),
"Retry-After": str(info["reset_at"] - int(time.time())),
},
)
response = await call_next(request)
response.headers["X-RateLimit-Limit"] = str(info["limit"])
response.headers["X-RateLimit-Remaining"] = str(info["remaining"])
return response
5. Session Storage
import secrets
class RedisSessionStore:
"""Server-side session storage with Redis."""
def __init__(self, redis_client, ttl: int = 3600):
self.r = redis_client
self.ttl = ttl
self.prefix = "session"
def create(self, user_id: str, data: dict = None) -> str:
"""Create a new session, return session ID."""
session_id = secrets.token_urlsafe(32)
key = f"{self.prefix}:{session_id}"
session_data = {
"user_id": user_id,
"created_at": datetime.now(timezone.utc).isoformat(),
**(data or {}),
}
self.r.hset(key, mapping={
k: json.dumps(v) if isinstance(v, (dict, list)) else str(v)
for k, v in session_data.items()
})
self.r.expire(key, self.ttl)
return session_id
def get(self, session_id: str) -> dict | None:
"""Get session data. Returns None if expired/missing."""
key = f"{self.prefix}:{session_id}"
data = self.r.hgetall(key)
if not data:
return None
# Refresh TTL on access (sliding expiration)
self.r.expire(key, self.ttl)
return data
def update(self, session_id: str, **kwargs):
"""Update session fields."""
key = f"{self.prefix}:{session_id}"
if not self.r.exists(key):
raise ValueError("Session not found")
self.r.hset(key, mapping={k: str(v) for k, v in kwargs.items()})
def destroy(self, session_id: str):
"""Delete session (logout)."""
self.r.delete(f"{self.prefix}:{session_id}")
6. Leaderboards & Sorted Data
class Leaderboard:
"""Real-time leaderboard using Redis sorted sets."""
def __init__(self, redis_client, name: str = "leaderboard"):
self.r = redis_client
self.key = f"lb:{name}"
def add_score(self, player: str, score: float):
"""Add or update a player's score."""
self.r.zadd(self.key, {player: score})
def increment(self, player: str, amount: float = 1):
"""Increment a player's score."""
return self.r.zincrby(self.key, amount, player)
def top(self, n: int = 10) -> list[tuple[str, float]]:
"""Get top N players (highest scores first)."""
return self.r.zrevrange(self.key, 0, n - 1, withscores=True)
def rank(self, player: str) -> int | None:
"""Get player's rank (0-indexed, 0 = first)."""
rank = self.r.zrevrank(self.key, player)
return rank
def score(self, player: str) -> float | None:
"""Get player's score."""
return self.r.zscore(self.key, player)
def around(self, player: str, n: int = 2) -> list[tuple[str, float]]:
"""Get players around a specific player."""
rank = self.rank(player)
if rank is None:
return []
start = max(0, rank - n)
end = rank + n
return self.r.zrevrange(self.key, start, end, withscores=True)
# --- Usage ---
lb = Leaderboard(r, "weekly_scores")
lb.add_score("alice", 1500)
lb.add_score("bob", 2300)
lb.increment("alice", 200) # alice: 1700
print(lb.top(5))
# [('bob', 2300.0), ('alice', 1700.0)]
print(f"Alice rank: #{lb.rank('alice') + 1}") # #2
7. Distributed Locks
import uuid
import time
class RedisLock:
"""Distributed lock using Redis (simplified Redlock)."""
def __init__(self, redis_client, name: str, ttl: int = 10):
self.r = redis_client
self.key = f"lock:{name}"
self.ttl = ttl
self.token = str(uuid.uuid4())
def acquire(self, timeout: float = 5.0) -> bool:
"""Try to acquire lock within timeout."""
deadline = time.time() + timeout
while time.time() < deadline:
if self.r.set(self.key, self.token, nx=True, ex=self.ttl):
return True
time.sleep(0.1)
return False
def release(self):
"""Release lock (only if we own it)."""
# Lua script for atomic check-and-delete
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.r.eval(script, 1, self.key, self.token)
def __enter__(self):
if not self.acquire():
raise TimeoutError(f"Could not acquire lock: {self.key}")
return self
def __exit__(self, *_):
self.release()
# --- Usage ---
with RedisLock(r, "payment-processing", ttl=30):
# Only one process can execute this block at a time
process_payment(order_id)
8. Redis Streams (Persistent Events)
Unlike Pub/Sub, Streams persist messages and support consumer groups — messages aren't lost if no one is listening.
# Producer
r.xadd("orders", {
"type": "order.created",
"order_id": "abc-123",
"total": "99.99",
})
# Consumer group
r.xgroup_create("orders", "notification-service", id="0", mkstream=True)
# Consumer (reads unacknowledged messages)
while True:
messages = r.xreadgroup(
"notification-service", # group
"worker-1", # consumer name
{"orders": ">"}, # ">" = new messages only
count=10,
block=5000, # block 5 seconds
)
for stream, entries in messages:
for msg_id, data in entries:
print(f"Processing: {data}")
# Process the message...
handle_order_event(data)
# Acknowledge — won't be redelivered
r.xack("orders", "notification-service", msg_id)
Production Tips
- Connection pooling — use redis.ConnectionPool or redis.from_url() which pools automatically
- Pipelines — batch multiple commands: pipe = r.pipeline(); pipe.get("a"); pipe.get("b"); results = pipe.execute()
- Key expiry — always set TTL on cache keys to prevent memory leaks
- Key naming — use colons as separators: app:user:123:profile
- Memory limits — configure maxmemory and maxmemory-policy allkeys-lru
- Persistence — enable RDB snapshots or AOF for data you can't lose
- Monitoring — redis-cli INFO for stats, MONITOR for live commands (dev only)
- Sentinel/Cluster — for high availability in production
🚀 Want production-ready caching, queue, and automation scripts?
Related Articles
- Python Microservices — Redis as the communication backbone
- Build a REST API with FastAPI — add Redis caching to your API
- Python Async Programming — async Redis with aioredis
- Task Scheduling & Cron Jobs — Redis queues vs cron
- Dockerize Python Apps — Redis in Docker Compose
- Python Performance Optimization — caching as optimization
Need help setting up Redis caching or queues for your project? I build Python backends, APIs, and automation tools. Reach out on Telegram →