WebSockets in Python — Build Real-Time Apps
HTTP is request-response: the client asks, the server answers, the connection closes. But what about live chat, stock tickers, multiplayer games, or IoT dashboards? That's where WebSockets come in — persistent, bidirectional connections between client and server. In this guide, you'll build real-time Python apps from scratch.
WebSockets vs HTTP — When to Use What
| Feature | HTTP | WebSocket |
|---|---|---|
| Connection | Short-lived (per request) | Persistent (stays open) |
| Direction | Client → Server (request/response) | Bidirectional (both push) |
| Overhead | Headers on every request | Minimal after handshake |
| Use case | CRUD APIs, page loads | Chat, live feeds, gaming |
| Protocol | http:// / https:// | ws:// / wss:// |
Use WebSockets when: you need sub-second updates pushed from the server, or the client sends frequent small messages. For everything else, a regular REST API is simpler and more scalable.
Option 1: The websockets Library
The websockets library is pure Python, async-native, and perfect for standalone WebSocket servers.
pip install websockets
Basic echo server
# server.py
import asyncio
import websockets
async def echo(websocket):
"""Echo back every message received."""
async for message in websocket:
print(f"Received: {message}")
await websocket.send(f"Echo: {message}")
async def main():
async with websockets.serve(echo, "localhost", 8765):
print("WebSocket server running on ws://localhost:8765")
await asyncio.Future() # Run forever
asyncio.run(main())
Python client
# client.py
import asyncio
import websockets
async def main():
async with websockets.connect("ws://localhost:8765") as ws:
await ws.send("Hello, WebSocket!")
response = await ws.recv()
print(f"Server said: {response}")
asyncio.run(main())
Chat room (broadcast to all clients)
# chat_server.py
import asyncio
import websockets
import json
from datetime import datetime, timezone
connected_clients: set[websockets.WebSocketServerProtocol] = set()
async def broadcast(message: str, sender=None):
"""Send message to all connected clients except the sender."""
disconnected = set()
for client in connected_clients:
if client != sender:
try:
await client.send(message)
except websockets.ConnectionClosed:
disconnected.add(client)
connected_clients -= disconnected
async def handler(websocket):
# Register client
connected_clients.add(websocket)
client_id = id(websocket) % 10000
print(f"Client {client_id} connected ({len(connected_clients)} total)")
try:
# Announce join
await broadcast(json.dumps({
"type": "system",
"message": f"User {client_id} joined",
"timestamp": datetime.now(timezone.utc).isoformat(),
}))
# Handle messages
async for raw in websocket:
try:
data = json.loads(raw)
except json.JSONDecodeError:
data = {"text": raw}
message = json.dumps({
"type": "message",
"user": client_id,
"text": data.get("text", ""),
"timestamp": datetime.now(timezone.utc).isoformat(),
})
await broadcast(message)
except websockets.ConnectionClosed:
pass
finally:
connected_clients.discard(websocket)
await broadcast(json.dumps({
"type": "system",
"message": f"User {client_id} left",
"timestamp": datetime.now(timezone.utc).isoformat(),
}))
print(f"Client {client_id} disconnected ({len(connected_clients)} total)")
async def main():
async with websockets.serve(handler, "0.0.0.0", 8765):
print("Chat server running on ws://0.0.0.0:8765")
await asyncio.Future()
asyncio.run(main())
Option 2: FastAPI WebSockets
If you already have a FastAPI REST API, adding WebSocket endpoints is trivial. You get both HTTP and WS on the same server.
pip install "fastapi[standard]"
Basic WebSocket endpoint
# app.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"You said: {data}")
except WebSocketDisconnect:
print("Client disconnected")
Connection manager (multi-room chat)
# manager.py
from fastapi import WebSocket
import json
class ConnectionManager:
"""Manages WebSocket connections across multiple rooms."""
def __init__(self):
self.rooms: dict[str, list[WebSocket]] = {}
async def connect(self, websocket: WebSocket, room: str):
await websocket.accept()
if room not in self.rooms:
self.rooms[room] = []
self.rooms[room].append(websocket)
def disconnect(self, websocket: WebSocket, room: str):
if room in self.rooms:
self.rooms[room].remove(websocket)
if not self.rooms[room]:
del self.rooms[room]
async def broadcast(self, room: str, message: dict, exclude: WebSocket = None):
if room not in self.rooms:
return
dead = []
for ws in self.rooms[room]:
if ws != exclude:
try:
await ws.send_json(message)
except Exception:
dead.append(ws)
for ws in dead:
self.disconnect(ws, room)
def get_room_count(self, room: str) -> int:
return len(self.rooms.get(room, []))
manager = ConnectionManager()
# app.py (continued)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from manager import manager
app = FastAPI()
@app.websocket("/ws/chat/{room}")
async def chat(websocket: WebSocket, room: str, username: str = Query("anonymous")):
await manager.connect(websocket, room)
# Announce join
await manager.broadcast(room, {
"type": "system",
"message": f"{username} joined (room: {room}, online: {manager.get_room_count(room)})"
})
try:
while True:
data = await websocket.receive_json()
await manager.broadcast(room, {
"type": "message",
"user": username,
"text": data.get("text", ""),
})
except WebSocketDisconnect:
manager.disconnect(websocket, room)
await manager.broadcast(room, {
"type": "system",
"message": f"{username} left"
})
Authentication for WebSockets
from fastapi import WebSocket, WebSocketDisconnect, status
from jose import JWTError, jwt
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
async def get_ws_user(websocket: WebSocket) -> str | None:
"""Extract user from token query param or first message."""
# Method 1: Token in query string — ws://host/ws?token=xxx
token = websocket.query_params.get("token")
if not token:
# Method 2: Token in subprotocol header
token = websocket.headers.get("sec-websocket-protocol")
if not token:
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload.get("sub")
except JWTError:
return None
@app.websocket("/ws/secure")
async def secure_ws(websocket: WebSocket):
user = await get_ws_user(websocket)
if not user:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await websocket.accept()
await websocket.send_json({"type": "auth", "user": user})
try:
while True:
data = await websocket.receive_json()
await websocket.send_json({"echo": data, "from": user})
except WebSocketDisconnect:
pass
Live Dashboard (Server Push)
A common pattern: the server pushes real-time metrics to connected clients. No client messages needed — pure server-to-client streaming.
# dashboard.py
import asyncio
import json
import random
from datetime import datetime, timezone
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
dashboard_clients: list[WebSocket] = []
async def generate_metrics():
"""Simulate server metrics every second."""
while True:
metric = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"cpu": round(random.uniform(10, 95), 1),
"memory": round(random.uniform(40, 85), 1),
"requests_per_sec": random.randint(50, 500),
"error_rate": round(random.uniform(0, 5), 2),
}
dead = []
for ws in dashboard_clients:
try:
await ws.send_json(metric)
except Exception:
dead.append(ws)
for ws in dead:
dashboard_clients.remove(ws)
await asyncio.sleep(1)
@app.on_event("startup")
async def start_metrics():
asyncio.create_task(generate_metrics())
@app.websocket("/ws/dashboard")
async def dashboard(websocket: WebSocket):
await websocket.accept()
dashboard_clients.append(websocket)
try:
# Keep connection alive — listen for pings/close
while True:
await websocket.receive_text()
except WebSocketDisconnect:
dashboard_clients.remove(websocket)
HTML client for the dashboard
<!-- dashboard.html -->
<div id="metrics"></div>
<script>
const ws = new WebSocket("ws://localhost:8000/ws/dashboard");
const el = document.getElementById("metrics");
ws.onmessage = (event) => {
const m = JSON.parse(event.data);
el.innerHTML = `
<p>CPU: ${m.cpu}% | Memory: ${m.memory}%</p>
<p>Requests/s: ${m.requests_per_sec} | Errors: ${m.error_rate}%</p>
<p>Updated: ${m.timestamp}</p>
`;
};
ws.onclose = () => {
el.innerHTML = "<p>Disconnected. Refresh to reconnect.</p>";
};
</script>
Reconnection and Heartbeats
Connections drop. Networks fail. A robust WebSocket client needs automatic reconnection and keep-alive pings.
# robust_client.py
import asyncio
import websockets
import json
async def connect_with_retry(url: str, max_retries: int = 10):
"""Connect to WebSocket with exponential backoff."""
retry = 0
while retry < max_retries:
try:
async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws:
print(f"Connected to {url}")
retry = 0 # Reset on successful connection
async for message in ws:
data = json.loads(message)
print(f"Received: {data}")
except (websockets.ConnectionClosed, ConnectionRefusedError, OSError) as e:
retry += 1
wait = min(2 ** retry, 60) # Exponential backoff, max 60s
print(f"Disconnected ({e}). Retry {retry}/{max_retries} in {wait}s...")
await asyncio.sleep(wait)
print("Max retries reached. Giving up.")
asyncio.run(connect_with_retry("ws://localhost:8765"))
Server-side ping/pong
# The websockets library handles ping/pong automatically.
# Configure intervals when creating the server:
async with websockets.serve(
handler,
"0.0.0.0",
8765,
ping_interval=20, # Send ping every 20s
ping_timeout=10, # Close if no pong within 10s
close_timeout=5, # Wait 5s for clean close
):
await asyncio.Future()
Scaling WebSockets
A single Python process can handle ~10,000 concurrent WebSocket connections. Beyond that, you need horizontal scaling.
Redis Pub/Sub for multi-process broadcast
# When you run multiple server instances behind a load balancer,
# clients on different servers can't see each other's messages.
# Solution: Redis Pub/Sub as a message bus.
pip install redis
# redis_pubsub.py
import asyncio
import redis.asyncio as redis
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
redis_client = redis.from_url("redis://localhost:6379")
local_clients: list[WebSocket] = []
async def redis_listener():
"""Subscribe to Redis and forward messages to local WebSocket clients."""
pubsub = redis_client.pubsub()
await pubsub.subscribe("chat")
async for message in pubsub.listen():
if message["type"] == "message":
data = message["data"].decode()
dead = []
for ws in local_clients:
try:
await ws.send_text(data)
except Exception:
dead.append(ws)
for ws in dead:
local_clients.remove(ws)
@app.on_event("startup")
async def start_redis():
asyncio.create_task(redis_listener())
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket):
await websocket.accept()
local_clients.append(websocket)
try:
while True:
text = await websocket.receive_text()
# Publish to Redis — all server instances receive it
await redis_client.publish("chat", json.dumps({
"user": id(websocket) % 10000,
"text": text,
}))
except WebSocketDisconnect:
local_clients.remove(websocket)
Testing WebSockets
# test_ws.py
import pytest
from fastapi.testclient import TestClient
from app import app
def test_websocket_echo():
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_text("hello")
data = ws.receive_text()
assert data == "You said: hello"
def test_websocket_json():
client = TestClient(app)
with client.websocket_connect("/ws/chat/test?username=bot") as ws:
# Receive join message
join = ws.receive_json()
assert join["type"] == "system"
# Send message
ws.send_json({"text": "test message"})
msg = ws.receive_json()
assert msg["text"] == "test message"
def test_websocket_auth_rejected():
client = TestClient(app)
with pytest.raises(Exception):
with client.websocket_connect("/ws/secure") as ws:
ws.receive_json() # Should be closed by server
Production Checklist
- Use wss:// (TLS) — always encrypt WebSocket traffic in production
- Implement heartbeats — detect dead connections before they pile up
- Set connection limits — prevent a single client from opening thousands of connections
- Handle backpressure — if a client can't keep up, buffer or drop messages
- Rate limit messages — prevent spam/abuse on chat-like endpoints
- Use Redis Pub/Sub — for multi-process or multi-server deployments
- Monitor connections — track active count, message rates, error rates
- Graceful shutdown — close all connections cleanly on SIGTERM
- Message size limits — set max_size to prevent memory exhaustion
- Reconnection logic — clients should reconnect with exponential backoff
🚀 Want production-ready Python tools, WebSocket templates, and automation scripts?
Related Articles
- Build a REST API with FastAPI — Complete Python Guide — pair REST endpoints with WebSocket endpoints
- Automate API Integrations with Python — HTTP-based integrations and retry logic
- How to Build a Telegram Bot in Python — another real-time messaging pattern
- Dockerize Python Apps — From Development to Production
- Python Testing Guide — pytest, Mocking & CI
Need a real-time Python app built for your project? I build WebSocket servers, APIs, bots, and automation tools. Reach out on Telegram →