Python Authentication Guide — OAuth2, JWT, Sessions & API Keys

March 2026 · 24 min read · Python, Security, Authentication, FastAPI

Authentication is the part of your app that attackers hit first. Get it wrong and nothing else matters — not your beautiful UI, not your clever algorithms, not your scale. This guide covers every auth pattern you'll need in Python, with production-ready code and the security pitfalls to avoid.

Choosing the Right Auth Strategy

StrategyBest ForTradeoffs
JWT (stateless)APIs, microservices, mobileCan't revoke easily, larger tokens
Sessions (stateful)Web apps, SSRRequires server-side storage
OAuth2"Login with Google/GitHub"Complex flow, dependency on provider
API KeysService-to-service, public APIsNo user context, hard to rotate
mTLSInternal services, zero-trustCertificate management overhead

Most apps use a combination. Web frontend → sessions. Mobile app → JWT. Third-party integrations → API keys. Internal services → mTLS or JWT.

1. Password Hashing — The Foundation

Never store passwords in plaintext. Never use MD5 or SHA-256 for passwords. Use a purpose-built password hashing algorithm.

# pip install passlib[bcrypt] argon2-cffi
from passlib.context import CryptContext

# Argon2id is the current best practice (winner of Password Hashing Competition)
# bcrypt is still solid and widely supported
pwd_context = CryptContext(
    schemes=["argon2", "bcrypt"],
    default="argon2",
    deprecated="auto",
    # Argon2 tuning (adjust based on your server)
    argon2__memory_cost=65536,   # 64 MB
    argon2__time_cost=3,         # 3 iterations
    argon2__parallelism=4,       # 4 threads
)


def hash_password(password: str) -> str:
    """Hash a password for storage."""
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Verify a password against its hash."""
    return pwd_context.verify(plain_password, hashed_password)


def needs_rehash(hashed_password: str) -> bool:
    """Check if hash uses deprecated scheme and needs upgrade."""
    return pwd_context.needs_update(hashed_password)


# --- Usage ---
hashed = hash_password("my-secure-password-123")
print(hashed)
# $argon2id$v=19$m=65536,t=3,p=4$...

assert verify_password("my-secure-password-123", hashed)
assert not verify_password("wrong-password", hashed)

# On login: if needs_rehash(stored_hash) → re-hash and update DB
if needs_rehash(hashed):
    new_hash = hash_password("my-secure-password-123")
    # update_user_hash(user_id, new_hash)
🔒 Why Argon2id over bcrypt? Argon2id is memory-hard — it requires significant RAM to compute, making GPU/ASIC attacks much more expensive. Bcrypt is still fine for most apps, but Argon2id is the modern choice. Both are supported by passlib.

2. JWT Authentication

JSON Web Tokens are the standard for stateless API auth. The token contains the user's identity and is signed (not encrypted) by the server. See our FastAPI guide for the basic setup.

Robust JWT implementation

# pip install python-jose[cryptography]
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from pydantic import BaseModel
import secrets
import os


# --- Configuration ---
class AuthConfig:
    SECRET_KEY: str = os.getenv("JWT_SECRET_KEY", secrets.token_hex(32))
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 15    # Short-lived
    REFRESH_TOKEN_EXPIRE_DAYS: int = 30       # Long-lived


class TokenPayload(BaseModel):
    sub: str           # Subject (user ID or email)
    exp: datetime      # Expiration
    iat: datetime      # Issued at
    jti: str           # Unique token ID (for revocation)
    scopes: list[str] = []  # Permissions


# --- Token creation ---
def create_access_token(
    subject: str,
    scopes: list[str] = None,
    expires_delta: timedelta = None,
) -> str:
    now = datetime.now(timezone.utc)
    expire = now + (expires_delta or timedelta(minutes=AuthConfig.ACCESS_TOKEN_EXPIRE_MINUTES))

    payload = {
        "sub": subject,
        "exp": expire,
        "iat": now,
        "jti": secrets.token_hex(16),  # Unique ID
        "scopes": scopes or [],
        "type": "access",
    }
    return jwt.encode(payload, AuthConfig.SECRET_KEY, algorithm=AuthConfig.ALGORITHM)


def create_refresh_token(subject: str) -> str:
    now = datetime.now(timezone.utc)
    expire = now + timedelta(days=AuthConfig.REFRESH_TOKEN_EXPIRE_DAYS)

    payload = {
        "sub": subject,
        "exp": expire,
        "iat": now,
        "jti": secrets.token_hex(16),
        "type": "refresh",
    }
    return jwt.encode(payload, AuthConfig.SECRET_KEY, algorithm=AuthConfig.ALGORITHM)


# --- Token verification ---
def verify_token(token: str, expected_type: str = "access") -> TokenPayload:
    """Verify and decode a JWT token."""
    try:
        payload = jwt.decode(
            token,
            AuthConfig.SECRET_KEY,
            algorithms=[AuthConfig.ALGORITHM],
        )
    except JWTError as e:
        raise AuthError(f"Invalid token: {e}")

    if payload.get("type") != expected_type:
        raise AuthError(f"Expected {expected_type} token, got {payload.get('type')}")

    return TokenPayload(**payload)


class AuthError(Exception):
    pass


# --- Token pair (access + refresh) ---
def create_token_pair(user_id: str, scopes: list[str] = None) -> dict:
    return {
        "access_token": create_access_token(user_id, scopes),
        "refresh_token": create_refresh_token(user_id),
        "token_type": "bearer",
    }


# --- Refresh flow ---
def refresh_access_token(refresh_token: str) -> dict:
    """Exchange a valid refresh token for a new access token."""
    payload = verify_token(refresh_token, expected_type="refresh")

    # Optional: check if refresh token is revoked (stored in Redis/DB)
    # if is_token_revoked(payload.jti): raise AuthError("Token revoked")

    return {
        "access_token": create_access_token(payload.sub),
        "token_type": "bearer",
    }

FastAPI integration

from fastapi import Depends, HTTPException, status, Security
from fastapi.security import OAuth2PasswordBearer, SecurityScopes

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="/auth/login",
    scopes={
        "tasks:read": "Read tasks",
        "tasks:write": "Create and modify tasks",
        "admin": "Admin access",
    },
)


async def get_current_user(
    security_scopes: SecurityScopes,
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db),
):
    """Dependency: validate JWT and check scopes."""
    try:
        payload = verify_token(token, expected_type="access")
    except AuthError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

    # Check required scopes
    for scope in security_scopes.scopes:
        if scope not in payload.scopes:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Missing required scope: {scope}",
            )

    user = db.query(User).filter(User.id == payload.sub).first()
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="User not found or inactive")

    return user


# --- Route with scope requirements ---
@app.get("/admin/users")
async def list_users(
    user: User = Security(get_current_user, scopes=["admin"]),
):
    """Admin-only endpoint."""
    return db.query(User).all()


@app.get("/tasks")
async def list_tasks(
    user: User = Security(get_current_user, scopes=["tasks:read"]),
):
    """Requires tasks:read scope."""
    return db.query(Task).filter(Task.owner_id == user.id).all()

3. OAuth2 — "Login with Google/GitHub"

OAuth2 lets users authenticate with existing accounts. Here's the Authorization Code flow with PKCE (the secure one).

# pip install httpx authlib
from authlib.integrations.starlette_client import OAuth
from starlette.config import Config

config = Config(".env")

oauth = OAuth(config)
oauth.register(
    name="google",
    client_id=config("GOOGLE_CLIENT_ID"),
    client_secret=config("GOOGLE_CLIENT_SECRET"),
    server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
    client_kwargs={"scope": "openid email profile"},
)

oauth.register(
    name="github",
    client_id=config("GITHUB_CLIENT_ID"),
    client_secret=config("GITHUB_CLIENT_SECRET"),
    authorize_url="https://github.com/login/oauth/authorize",
    access_token_url="https://github.com/login/oauth/access_token",
    api_base_url="https://api.github.com/",
    client_kwargs={"scope": "user:email"},
)
# FastAPI OAuth2 routes
from fastapi import Request
from fastapi.responses import RedirectResponse


@app.get("/auth/google/login")
async def google_login(request: Request):
    """Redirect user to Google's consent page."""
    redirect_uri = request.url_for("google_callback")
    return await oauth.google.authorize_redirect(request, redirect_uri)


@app.get("/auth/google/callback")
async def google_callback(request: Request, db: Session = Depends(get_db)):
    """Handle Google's callback after user consents."""
    token = await oauth.google.authorize_access_token(request)
    user_info = token.get("userinfo")

    if not user_info:
        raise HTTPException(400, "Failed to get user info from Google")

    # Find or create user
    user = db.query(User).filter(User.email == user_info["email"]).first()
    if not user:
        user = User(
            email=user_info["email"],
            name=user_info.get("name", ""),
            oauth_provider="google",
            oauth_id=user_info["sub"],
            is_active=True,
        )
        db.add(user)
        db.commit()
        db.refresh(user)

    # Issue our own JWT tokens
    tokens = create_token_pair(str(user.id), scopes=["tasks:read", "tasks:write"])

    # In production: set HTTP-only cookie or redirect with token
    response = RedirectResponse(url="/dashboard")
    response.set_cookie(
        key="access_token",
        value=tokens["access_token"],
        httponly=True,
        secure=True,
        samesite="lax",
        max_age=AuthConfig.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
    )
    return response


@app.get("/auth/github/login")
async def github_login(request: Request):
    redirect_uri = request.url_for("github_callback")
    return await oauth.github.authorize_redirect(request, redirect_uri)


@app.get("/auth/github/callback")
async def github_callback(request: Request, db: Session = Depends(get_db)):
    token = await oauth.github.authorize_access_token(request)

    # GitHub doesn't include email in token — fetch from API
    resp = await oauth.github.get("user", token=token)
    github_user = resp.json()

    # Get primary email
    emails_resp = await oauth.github.get("user/emails", token=token)
    primary_email = next(
        (e["email"] for e in emails_resp.json() if e["primary"]),
        github_user.get("email"),
    )

    # Find or create user (same pattern as Google)
    user = find_or_create_oauth_user(db, primary_email, "github", str(github_user["id"]))
    tokens = create_token_pair(str(user.id))
    # ... set cookie and redirect
🔒 Security: Always use Authorization Code flow with PKCE, never the Implicit flow. Store tokens in HTTP-only cookies (not localStorage) to prevent XSS attacks.

4. Session-Based Authentication

For server-rendered web apps, sessions are simpler and more secure than JWT in cookies.

# pip install itsdangerous redis
from itsdangerous import URLSafeTimedSerializer
import redis
import secrets
import json
from datetime import datetime, timezone


class SessionManager:
    """Server-side session management with Redis."""

    def __init__(
        self,
        redis_url: str = "redis://localhost:6379",
        secret_key: str = None,
        max_age: int = 86400,  # 24 hours
    ):
        self.redis = redis.from_url(redis_url)
        self.serializer = URLSafeTimedSerializer(secret_key or secrets.token_hex(32))
        self.max_age = max_age

    def create_session(self, user_id: str, data: dict = None) -> str:
        """Create a new session, return session ID."""
        session_id = secrets.token_urlsafe(32)

        session_data = {
            "user_id": user_id,
            "created_at": datetime.now(timezone.utc).isoformat(),
            **(data or {}),
        }

        # Store in Redis with TTL
        self.redis.setex(
            f"session:{session_id}",
            self.max_age,
            json.dumps(session_data),
        )

        # Sign the session ID (prevents tampering)
        signed = self.serializer.dumps(session_id)
        return signed

    def get_session(self, signed_session_id: str) -> dict | None:
        """Validate and retrieve session data."""
        try:
            session_id = self.serializer.loads(signed_session_id, max_age=self.max_age)
        except Exception:
            return None  # Expired or tampered

        data = self.redis.get(f"session:{session_id}")
        if not data:
            return None

        # Refresh TTL on access (sliding expiration)
        self.redis.expire(f"session:{session_id}", self.max_age)
        return json.loads(data)

    def destroy_session(self, signed_session_id: str):
        """Logout — delete session."""
        try:
            session_id = self.serializer.loads(signed_session_id, max_age=self.max_age)
            self.redis.delete(f"session:{session_id}")
        except Exception:
            pass

    def destroy_all_user_sessions(self, user_id: str):
        """Nuclear logout — kill all sessions for a user."""
        # Requires a user→sessions index
        for key in self.redis.scan_iter(f"session:*"):
            data = self.redis.get(key)
            if data and json.loads(data).get("user_id") == user_id:
                self.redis.delete(key)


# --- FastAPI middleware ---
sessions = SessionManager()


@app.post("/auth/login")
async def login(form: LoginForm = Depends(), db: Session = Depends(get_db)):
    user = authenticate_user(db, form.email, form.password)
    if not user:
        raise HTTPException(401, "Invalid credentials")

    signed_id = sessions.create_session(str(user.id), {"role": user.role})

    response = JSONResponse({"message": "Logged in"})
    response.set_cookie(
        "session",
        signed_id,
        httponly=True,
        secure=True,
        samesite="lax",
        max_age=86400,
    )
    return response


@app.post("/auth/logout")
async def logout(request: Request):
    session_cookie = request.cookies.get("session")
    if session_cookie:
        sessions.destroy_session(session_cookie)

    response = JSONResponse({"message": "Logged out"})
    response.delete_cookie("session")
    return response

5. API Key Authentication

import hashlib
import secrets
from fastapi import Security
from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")


def generate_api_key() -> tuple[str, str]:
    """Generate an API key and its hash for storage."""
    # Prefix makes keys identifiable (like GitHub's ghp_)
    raw_key = f"atk_{secrets.token_urlsafe(32)}"
    # Store the hash, not the key itself
    key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
    return raw_key, key_hash


def verify_api_key(raw_key: str, stored_hash: str) -> bool:
    """Verify an API key against its stored hash."""
    return hashlib.sha256(raw_key.encode()).hexdigest() == stored_hash


# --- FastAPI dependency ---
async def get_api_client(
    api_key: str = Security(api_key_header),
    db: Session = Depends(get_db),
):
    """Validate API key and return the associated client."""
    key_hash = hashlib.sha256(api_key.encode()).hexdigest()

    client = db.query(APIClient).filter(
        APIClient.key_hash == key_hash,
        APIClient.is_active == True,
    ).first()

    if not client:
        raise HTTPException(
            status_code=403,
            detail="Invalid or revoked API key",
        )

    # Track usage
    client.last_used_at = datetime.now(timezone.utc)
    client.request_count += 1
    db.commit()

    return client


@app.get("/api/v1/data")
async def get_data(client: APIClient = Depends(get_api_client)):
    """API key protected endpoint."""
    return {"data": "...", "client": client.name}

6. Two-Factor Authentication (2FA)

# pip install pyotp qrcode
import pyotp
import qrcode
import io
import base64


class TwoFactorAuth:
    """TOTP-based 2FA (compatible with Google Authenticator, Authy, etc.)."""

    @staticmethod
    def generate_secret() -> str:
        """Generate a new TOTP secret for a user."""
        return pyotp.random_base32()

    @staticmethod
    def get_provisioning_uri(secret: str, email: str, issuer: str = "MyApp") -> str:
        """Generate the URI for QR code scanning."""
        totp = pyotp.TOTP(secret)
        return totp.provisioning_uri(name=email, issuer_name=issuer)

    @staticmethod
    def generate_qr_code(uri: str) -> str:
        """Generate QR code as base64 PNG for embedding in HTML."""
        qr = qrcode.make(uri)
        buffer = io.BytesIO()
        qr.save(buffer, format="PNG")
        return base64.b64encode(buffer.getvalue()).decode()

    @staticmethod
    def verify_code(secret: str, code: str) -> bool:
        """Verify a TOTP code (allows 30s window)."""
        totp = pyotp.TOTP(secret)
        return totp.verify(code, valid_window=1)


# --- Enable 2FA flow ---
@app.post("/auth/2fa/setup")
async def setup_2fa(user: User = Depends(get_current_user), db: Session = Depends(get_db)):
    secret = TwoFactorAuth.generate_secret()
    uri = TwoFactorAuth.get_provisioning_uri(secret, user.email)
    qr_base64 = TwoFactorAuth.generate_qr_code(uri)

    # Store secret temporarily (confirm after first successful code)
    user.totp_secret_pending = secret
    db.commit()

    return {
        "qr_code": f"data:image/png;base64,{qr_base64}",
        "manual_key": secret,
        "message": "Scan QR code with authenticator app, then confirm with a code",
    }


@app.post("/auth/2fa/confirm")
async def confirm_2fa(
    code: str,
    user: User = Depends(get_current_user),
    db: Session = Depends(get_db),
):
    if not user.totp_secret_pending:
        raise HTTPException(400, "No pending 2FA setup")

    if not TwoFactorAuth.verify_code(user.totp_secret_pending, code):
        raise HTTPException(400, "Invalid code")

    # Activate 2FA
    user.totp_secret = user.totp_secret_pending
    user.totp_secret_pending = None
    user.two_factor_enabled = True

    # Generate backup codes
    backup_codes = [secrets.token_hex(4) for _ in range(10)]
    user.backup_codes = json.dumps([
        hashlib.sha256(c.encode()).hexdigest() for c in backup_codes
    ])
    db.commit()

    return {
        "message": "2FA enabled",
        "backup_codes": backup_codes,  # Show once, never again
    }

7. Token Revocation

JWTs are stateless — you can't "delete" them. But you can blacklist them:

import redis

token_blacklist = redis.from_url("redis://localhost:6379")


def revoke_token(jti: str, expires_in: int):
    """Add token to blacklist. TTL = token's remaining lifetime."""
    token_blacklist.setex(f"revoked:{jti}", expires_in, "1")


def is_token_revoked(jti: str) -> bool:
    """Check if token has been revoked."""
    return token_blacklist.exists(f"revoked:{jti}") > 0


# --- In your auth middleware ---
async def get_current_user(token: str = Depends(oauth2_scheme)):
    payload = verify_token(token)

    if is_token_revoked(payload.jti):
        raise HTTPException(401, "Token has been revoked")

    # ... fetch user


# --- Logout endpoint ---
@app.post("/auth/logout")
async def logout(token: str = Depends(oauth2_scheme)):
    payload = verify_token(token)
    remaining = int((payload.exp - datetime.now(timezone.utc)).total_seconds())
    if remaining > 0:
        revoke_token(payload.jti, remaining)
    return {"message": "Logged out"}

Security Checklist

🚀 Want production-ready auth templates, security tools, and API automation scripts?

Get the AI Agent Toolkit →

Related Articles

Need a secure authentication system for your project? I build Python APIs with production-grade auth. Reach out on Telegram →