WebSockets in Python — Build Real-Time Apps

March 2026 · 18 min read · Python, WebSockets, FastAPI, Real-Time

HTTP is request-response: the client asks, the server answers, the connection closes. But what about live chat, stock tickers, multiplayer games, or IoT dashboards? That's where WebSockets come in — persistent, bidirectional connections between client and server. In this guide, you'll build real-time Python apps from scratch.

WebSockets vs HTTP — When to Use What

FeatureHTTPWebSocket
ConnectionShort-lived (per request)Persistent (stays open)
DirectionClient → Server (request/response)Bidirectional (both push)
OverheadHeaders on every requestMinimal after handshake
Use caseCRUD APIs, page loadsChat, live feeds, gaming
Protocolhttp:// / https://ws:// / wss://

Use WebSockets when: you need sub-second updates pushed from the server, or the client sends frequent small messages. For everything else, a regular REST API is simpler and more scalable.

Option 1: The websockets Library

The websockets library is pure Python, async-native, and perfect for standalone WebSocket servers.

pip install websockets

Basic echo server

# server.py
import asyncio
import websockets


async def echo(websocket):
    """Echo back every message received."""
    async for message in websocket:
        print(f"Received: {message}")
        await websocket.send(f"Echo: {message}")


async def main():
    async with websockets.serve(echo, "localhost", 8765):
        print("WebSocket server running on ws://localhost:8765")
        await asyncio.Future()  # Run forever


asyncio.run(main())

Python client

# client.py
import asyncio
import websockets


async def main():
    async with websockets.connect("ws://localhost:8765") as ws:
        await ws.send("Hello, WebSocket!")
        response = await ws.recv()
        print(f"Server said: {response}")


asyncio.run(main())

Chat room (broadcast to all clients)

# chat_server.py
import asyncio
import websockets
import json
from datetime import datetime, timezone

connected_clients: set[websockets.WebSocketServerProtocol] = set()


async def broadcast(message: str, sender=None):
    """Send message to all connected clients except the sender."""
    disconnected = set()
    for client in connected_clients:
        if client != sender:
            try:
                await client.send(message)
            except websockets.ConnectionClosed:
                disconnected.add(client)
    connected_clients -= disconnected


async def handler(websocket):
    # Register client
    connected_clients.add(websocket)
    client_id = id(websocket) % 10000
    print(f"Client {client_id} connected ({len(connected_clients)} total)")

    try:
        # Announce join
        await broadcast(json.dumps({
            "type": "system",
            "message": f"User {client_id} joined",
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }))

        # Handle messages
        async for raw in websocket:
            try:
                data = json.loads(raw)
            except json.JSONDecodeError:
                data = {"text": raw}

            message = json.dumps({
                "type": "message",
                "user": client_id,
                "text": data.get("text", ""),
                "timestamp": datetime.now(timezone.utc).isoformat(),
            })
            await broadcast(message)

    except websockets.ConnectionClosed:
        pass
    finally:
        connected_clients.discard(websocket)
        await broadcast(json.dumps({
            "type": "system",
            "message": f"User {client_id} left",
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }))
        print(f"Client {client_id} disconnected ({len(connected_clients)} total)")


async def main():
    async with websockets.serve(handler, "0.0.0.0", 8765):
        print("Chat server running on ws://0.0.0.0:8765")
        await asyncio.Future()


asyncio.run(main())
💡 Tip: Always handle ConnectionClosed exceptions. Clients disconnect unexpectedly — your server shouldn't crash when they do.

Option 2: FastAPI WebSockets

If you already have a FastAPI REST API, adding WebSocket endpoints is trivial. You get both HTTP and WS on the same server.

pip install "fastapi[standard]"

Basic WebSocket endpoint

# app.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"You said: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")

Connection manager (multi-room chat)

# manager.py
from fastapi import WebSocket
import json


class ConnectionManager:
    """Manages WebSocket connections across multiple rooms."""

    def __init__(self):
        self.rooms: dict[str, list[WebSocket]] = {}

    async def connect(self, websocket: WebSocket, room: str):
        await websocket.accept()
        if room not in self.rooms:
            self.rooms[room] = []
        self.rooms[room].append(websocket)

    def disconnect(self, websocket: WebSocket, room: str):
        if room in self.rooms:
            self.rooms[room].remove(websocket)
            if not self.rooms[room]:
                del self.rooms[room]

    async def broadcast(self, room: str, message: dict, exclude: WebSocket = None):
        if room not in self.rooms:
            return
        dead = []
        for ws in self.rooms[room]:
            if ws != exclude:
                try:
                    await ws.send_json(message)
                except Exception:
                    dead.append(ws)
        for ws in dead:
            self.disconnect(ws, room)

    def get_room_count(self, room: str) -> int:
        return len(self.rooms.get(room, []))


manager = ConnectionManager()
# app.py (continued)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from manager import manager

app = FastAPI()


@app.websocket("/ws/chat/{room}")
async def chat(websocket: WebSocket, room: str, username: str = Query("anonymous")):
    await manager.connect(websocket, room)

    # Announce join
    await manager.broadcast(room, {
        "type": "system",
        "message": f"{username} joined (room: {room}, online: {manager.get_room_count(room)})"
    })

    try:
        while True:
            data = await websocket.receive_json()
            await manager.broadcast(room, {
                "type": "message",
                "user": username,
                "text": data.get("text", ""),
            })
    except WebSocketDisconnect:
        manager.disconnect(websocket, room)
        await manager.broadcast(room, {
            "type": "system",
            "message": f"{username} left"
        })

Authentication for WebSockets

from fastapi import WebSocket, WebSocketDisconnect, status
from jose import JWTError, jwt

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"


async def get_ws_user(websocket: WebSocket) -> str | None:
    """Extract user from token query param or first message."""
    # Method 1: Token in query string — ws://host/ws?token=xxx
    token = websocket.query_params.get("token")

    if not token:
        # Method 2: Token in subprotocol header
        token = websocket.headers.get("sec-websocket-protocol")

    if not token:
        return None

    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload.get("sub")
    except JWTError:
        return None


@app.websocket("/ws/secure")
async def secure_ws(websocket: WebSocket):
    user = await get_ws_user(websocket)
    if not user:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return

    await websocket.accept()
    await websocket.send_json({"type": "auth", "user": user})

    try:
        while True:
            data = await websocket.receive_json()
            await websocket.send_json({"echo": data, "from": user})
    except WebSocketDisconnect:
        pass
🔒 Security: WebSockets don't support custom HTTP headers after the handshake. Pass auth tokens via query params (?token=xxx) or in the first message. Always use wss:// in production.

Live Dashboard (Server Push)

A common pattern: the server pushes real-time metrics to connected clients. No client messages needed — pure server-to-client streaming.

# dashboard.py
import asyncio
import json
import random
from datetime import datetime, timezone
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()
dashboard_clients: list[WebSocket] = []


async def generate_metrics():
    """Simulate server metrics every second."""
    while True:
        metric = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "cpu": round(random.uniform(10, 95), 1),
            "memory": round(random.uniform(40, 85), 1),
            "requests_per_sec": random.randint(50, 500),
            "error_rate": round(random.uniform(0, 5), 2),
        }

        dead = []
        for ws in dashboard_clients:
            try:
                await ws.send_json(metric)
            except Exception:
                dead.append(ws)
        for ws in dead:
            dashboard_clients.remove(ws)

        await asyncio.sleep(1)


@app.on_event("startup")
async def start_metrics():
    asyncio.create_task(generate_metrics())


@app.websocket("/ws/dashboard")
async def dashboard(websocket: WebSocket):
    await websocket.accept()
    dashboard_clients.append(websocket)
    try:
        # Keep connection alive — listen for pings/close
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
        dashboard_clients.remove(websocket)

HTML client for the dashboard

<!-- dashboard.html -->
<div id="metrics"></div>

<script>
const ws = new WebSocket("ws://localhost:8000/ws/dashboard");
const el = document.getElementById("metrics");

ws.onmessage = (event) => {
    const m = JSON.parse(event.data);
    el.innerHTML = `
        <p>CPU: ${m.cpu}% | Memory: ${m.memory}%</p>
        <p>Requests/s: ${m.requests_per_sec} | Errors: ${m.error_rate}%</p>
        <p>Updated: ${m.timestamp}</p>
    `;
};

ws.onclose = () => {
    el.innerHTML = "<p>Disconnected. Refresh to reconnect.</p>";
};
</script>

Reconnection and Heartbeats

Connections drop. Networks fail. A robust WebSocket client needs automatic reconnection and keep-alive pings.

# robust_client.py
import asyncio
import websockets
import json


async def connect_with_retry(url: str, max_retries: int = 10):
    """Connect to WebSocket with exponential backoff."""
    retry = 0
    while retry < max_retries:
        try:
            async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws:
                print(f"Connected to {url}")
                retry = 0  # Reset on successful connection

                async for message in ws:
                    data = json.loads(message)
                    print(f"Received: {data}")

        except (websockets.ConnectionClosed, ConnectionRefusedError, OSError) as e:
            retry += 1
            wait = min(2 ** retry, 60)  # Exponential backoff, max 60s
            print(f"Disconnected ({e}). Retry {retry}/{max_retries} in {wait}s...")
            await asyncio.sleep(wait)

    print("Max retries reached. Giving up.")


asyncio.run(connect_with_retry("ws://localhost:8765"))

Server-side ping/pong

# The websockets library handles ping/pong automatically.
# Configure intervals when creating the server:

async with websockets.serve(
    handler,
    "0.0.0.0",
    8765,
    ping_interval=20,    # Send ping every 20s
    ping_timeout=10,     # Close if no pong within 10s
    close_timeout=5,     # Wait 5s for clean close
):
    await asyncio.Future()
💡 Tip: The websockets library handles ping/pong at the protocol level automatically. FastAPI requires you to implement application-level heartbeats if you need them.

Scaling WebSockets

A single Python process can handle ~10,000 concurrent WebSocket connections. Beyond that, you need horizontal scaling.

Redis Pub/Sub for multi-process broadcast

# When you run multiple server instances behind a load balancer,
# clients on different servers can't see each other's messages.
# Solution: Redis Pub/Sub as a message bus.

pip install redis
# redis_pubsub.py
import asyncio
import redis.asyncio as redis
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()
redis_client = redis.from_url("redis://localhost:6379")
local_clients: list[WebSocket] = []


async def redis_listener():
    """Subscribe to Redis and forward messages to local WebSocket clients."""
    pubsub = redis_client.pubsub()
    await pubsub.subscribe("chat")

    async for message in pubsub.listen():
        if message["type"] == "message":
            data = message["data"].decode()
            dead = []
            for ws in local_clients:
                try:
                    await ws.send_text(data)
                except Exception:
                    dead.append(ws)
            for ws in dead:
                local_clients.remove(ws)


@app.on_event("startup")
async def start_redis():
    asyncio.create_task(redis_listener())


@app.websocket("/ws/chat")
async def chat(websocket: WebSocket):
    await websocket.accept()
    local_clients.append(websocket)
    try:
        while True:
            text = await websocket.receive_text()
            # Publish to Redis — all server instances receive it
            await redis_client.publish("chat", json.dumps({
                "user": id(websocket) % 10000,
                "text": text,
            }))
    except WebSocketDisconnect:
        local_clients.remove(websocket)

Testing WebSockets

# test_ws.py
import pytest
from fastapi.testclient import TestClient
from app import app


def test_websocket_echo():
    client = TestClient(app)
    with client.websocket_connect("/ws") as ws:
        ws.send_text("hello")
        data = ws.receive_text()
        assert data == "You said: hello"


def test_websocket_json():
    client = TestClient(app)
    with client.websocket_connect("/ws/chat/test?username=bot") as ws:
        # Receive join message
        join = ws.receive_json()
        assert join["type"] == "system"

        # Send message
        ws.send_json({"text": "test message"})
        msg = ws.receive_json()
        assert msg["text"] == "test message"


def test_websocket_auth_rejected():
    client = TestClient(app)
    with pytest.raises(Exception):
        with client.websocket_connect("/ws/secure") as ws:
            ws.receive_json()  # Should be closed by server

Production Checklist

🚀 Want production-ready Python tools, WebSocket templates, and automation scripts?

Get the AI Agent Toolkit →

Related Articles

Need a real-time Python app built for your project? I build WebSocket servers, APIs, bots, and automation tools. Reach out on Telegram →