Python Authentication Guide — OAuth2, JWT, Sessions & API Keys
Authentication is the part of your app that attackers hit first. Get it wrong and nothing else matters — not your beautiful UI, not your clever algorithms, not your scale. This guide covers every auth pattern you'll need in Python, with production-ready code and the security pitfalls to avoid.
Choosing the Right Auth Strategy
| Strategy | Best For | Tradeoffs |
|---|---|---|
| JWT (stateless) | APIs, microservices, mobile | Can't revoke easily, larger tokens |
| Sessions (stateful) | Web apps, SSR | Requires server-side storage |
| OAuth2 | "Login with Google/GitHub" | Complex flow, dependency on provider |
| API Keys | Service-to-service, public APIs | No user context, hard to rotate |
| mTLS | Internal services, zero-trust | Certificate management overhead |
Most apps use a combination. Web frontend → sessions. Mobile app → JWT. Third-party integrations → API keys. Internal services → mTLS or JWT.
1. Password Hashing — The Foundation
Never store passwords in plaintext. Never use MD5 or SHA-256 for passwords. Use a purpose-built password hashing algorithm.
# pip install passlib[bcrypt] argon2-cffi
from passlib.context import CryptContext
# Argon2id is the current best practice (winner of Password Hashing Competition)
# bcrypt is still solid and widely supported
pwd_context = CryptContext(
schemes=["argon2", "bcrypt"],
default="argon2",
deprecated="auto",
# Argon2 tuning (adjust based on your server)
argon2__memory_cost=65536, # 64 MB
argon2__time_cost=3, # 3 iterations
argon2__parallelism=4, # 4 threads
)
def hash_password(password: str) -> str:
"""Hash a password for storage."""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def needs_rehash(hashed_password: str) -> bool:
"""Check if hash uses deprecated scheme and needs upgrade."""
return pwd_context.needs_update(hashed_password)
# --- Usage ---
hashed = hash_password("my-secure-password-123")
print(hashed)
# $argon2id$v=19$m=65536,t=3,p=4$...
assert verify_password("my-secure-password-123", hashed)
assert not verify_password("wrong-password", hashed)
# On login: if needs_rehash(stored_hash) → re-hash and update DB
if needs_rehash(hashed):
new_hash = hash_password("my-secure-password-123")
# update_user_hash(user_id, new_hash)
2. JWT Authentication
JSON Web Tokens are the standard for stateless API auth. The token contains the user's identity and is signed (not encrypted) by the server. See our FastAPI guide for the basic setup.
Robust JWT implementation
# pip install python-jose[cryptography]
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from pydantic import BaseModel
import secrets
import os
# --- Configuration ---
class AuthConfig:
SECRET_KEY: str = os.getenv("JWT_SECRET_KEY", secrets.token_hex(32))
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 15 # Short-lived
REFRESH_TOKEN_EXPIRE_DAYS: int = 30 # Long-lived
class TokenPayload(BaseModel):
sub: str # Subject (user ID or email)
exp: datetime # Expiration
iat: datetime # Issued at
jti: str # Unique token ID (for revocation)
scopes: list[str] = [] # Permissions
# --- Token creation ---
def create_access_token(
subject: str,
scopes: list[str] = None,
expires_delta: timedelta = None,
) -> str:
now = datetime.now(timezone.utc)
expire = now + (expires_delta or timedelta(minutes=AuthConfig.ACCESS_TOKEN_EXPIRE_MINUTES))
payload = {
"sub": subject,
"exp": expire,
"iat": now,
"jti": secrets.token_hex(16), # Unique ID
"scopes": scopes or [],
"type": "access",
}
return jwt.encode(payload, AuthConfig.SECRET_KEY, algorithm=AuthConfig.ALGORITHM)
def create_refresh_token(subject: str) -> str:
now = datetime.now(timezone.utc)
expire = now + timedelta(days=AuthConfig.REFRESH_TOKEN_EXPIRE_DAYS)
payload = {
"sub": subject,
"exp": expire,
"iat": now,
"jti": secrets.token_hex(16),
"type": "refresh",
}
return jwt.encode(payload, AuthConfig.SECRET_KEY, algorithm=AuthConfig.ALGORITHM)
# --- Token verification ---
def verify_token(token: str, expected_type: str = "access") -> TokenPayload:
"""Verify and decode a JWT token."""
try:
payload = jwt.decode(
token,
AuthConfig.SECRET_KEY,
algorithms=[AuthConfig.ALGORITHM],
)
except JWTError as e:
raise AuthError(f"Invalid token: {e}")
if payload.get("type") != expected_type:
raise AuthError(f"Expected {expected_type} token, got {payload.get('type')}")
return TokenPayload(**payload)
class AuthError(Exception):
pass
# --- Token pair (access + refresh) ---
def create_token_pair(user_id: str, scopes: list[str] = None) -> dict:
return {
"access_token": create_access_token(user_id, scopes),
"refresh_token": create_refresh_token(user_id),
"token_type": "bearer",
}
# --- Refresh flow ---
def refresh_access_token(refresh_token: str) -> dict:
"""Exchange a valid refresh token for a new access token."""
payload = verify_token(refresh_token, expected_type="refresh")
# Optional: check if refresh token is revoked (stored in Redis/DB)
# if is_token_revoked(payload.jti): raise AuthError("Token revoked")
return {
"access_token": create_access_token(payload.sub),
"token_type": "bearer",
}
FastAPI integration
from fastapi import Depends, HTTPException, status, Security
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/auth/login",
scopes={
"tasks:read": "Read tasks",
"tasks:write": "Create and modify tasks",
"admin": "Admin access",
},
)
async def get_current_user(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db),
):
"""Dependency: validate JWT and check scopes."""
try:
payload = verify_token(token, expected_type="access")
except AuthError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Check required scopes
for scope in security_scopes.scopes:
if scope not in payload.scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required scope: {scope}",
)
user = db.query(User).filter(User.id == payload.sub).first()
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="User not found or inactive")
return user
# --- Route with scope requirements ---
@app.get("/admin/users")
async def list_users(
user: User = Security(get_current_user, scopes=["admin"]),
):
"""Admin-only endpoint."""
return db.query(User).all()
@app.get("/tasks")
async def list_tasks(
user: User = Security(get_current_user, scopes=["tasks:read"]),
):
"""Requires tasks:read scope."""
return db.query(Task).filter(Task.owner_id == user.id).all()
3. OAuth2 — "Login with Google/GitHub"
OAuth2 lets users authenticate with existing accounts. Here's the Authorization Code flow with PKCE (the secure one).
# pip install httpx authlib
from authlib.integrations.starlette_client import OAuth
from starlette.config import Config
config = Config(".env")
oauth = OAuth(config)
oauth.register(
name="google",
client_id=config("GOOGLE_CLIENT_ID"),
client_secret=config("GOOGLE_CLIENT_SECRET"),
server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
client_kwargs={"scope": "openid email profile"},
)
oauth.register(
name="github",
client_id=config("GITHUB_CLIENT_ID"),
client_secret=config("GITHUB_CLIENT_SECRET"),
authorize_url="https://github.com/login/oauth/authorize",
access_token_url="https://github.com/login/oauth/access_token",
api_base_url="https://api.github.com/",
client_kwargs={"scope": "user:email"},
)
# FastAPI OAuth2 routes
from fastapi import Request
from fastapi.responses import RedirectResponse
@app.get("/auth/google/login")
async def google_login(request: Request):
"""Redirect user to Google's consent page."""
redirect_uri = request.url_for("google_callback")
return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get("/auth/google/callback")
async def google_callback(request: Request, db: Session = Depends(get_db)):
"""Handle Google's callback after user consents."""
token = await oauth.google.authorize_access_token(request)
user_info = token.get("userinfo")
if not user_info:
raise HTTPException(400, "Failed to get user info from Google")
# Find or create user
user = db.query(User).filter(User.email == user_info["email"]).first()
if not user:
user = User(
email=user_info["email"],
name=user_info.get("name", ""),
oauth_provider="google",
oauth_id=user_info["sub"],
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
# Issue our own JWT tokens
tokens = create_token_pair(str(user.id), scopes=["tasks:read", "tasks:write"])
# In production: set HTTP-only cookie or redirect with token
response = RedirectResponse(url="/dashboard")
response.set_cookie(
key="access_token",
value=tokens["access_token"],
httponly=True,
secure=True,
samesite="lax",
max_age=AuthConfig.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
)
return response
@app.get("/auth/github/login")
async def github_login(request: Request):
redirect_uri = request.url_for("github_callback")
return await oauth.github.authorize_redirect(request, redirect_uri)
@app.get("/auth/github/callback")
async def github_callback(request: Request, db: Session = Depends(get_db)):
token = await oauth.github.authorize_access_token(request)
# GitHub doesn't include email in token — fetch from API
resp = await oauth.github.get("user", token=token)
github_user = resp.json()
# Get primary email
emails_resp = await oauth.github.get("user/emails", token=token)
primary_email = next(
(e["email"] for e in emails_resp.json() if e["primary"]),
github_user.get("email"),
)
# Find or create user (same pattern as Google)
user = find_or_create_oauth_user(db, primary_email, "github", str(github_user["id"]))
tokens = create_token_pair(str(user.id))
# ... set cookie and redirect
4. Session-Based Authentication
For server-rendered web apps, sessions are simpler and more secure than JWT in cookies.
# pip install itsdangerous redis
from itsdangerous import URLSafeTimedSerializer
import redis
import secrets
import json
from datetime import datetime, timezone
class SessionManager:
"""Server-side session management with Redis."""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
secret_key: str = None,
max_age: int = 86400, # 24 hours
):
self.redis = redis.from_url(redis_url)
self.serializer = URLSafeTimedSerializer(secret_key or secrets.token_hex(32))
self.max_age = max_age
def create_session(self, user_id: str, data: dict = None) -> str:
"""Create a new session, return session ID."""
session_id = secrets.token_urlsafe(32)
session_data = {
"user_id": user_id,
"created_at": datetime.now(timezone.utc).isoformat(),
**(data or {}),
}
# Store in Redis with TTL
self.redis.setex(
f"session:{session_id}",
self.max_age,
json.dumps(session_data),
)
# Sign the session ID (prevents tampering)
signed = self.serializer.dumps(session_id)
return signed
def get_session(self, signed_session_id: str) -> dict | None:
"""Validate and retrieve session data."""
try:
session_id = self.serializer.loads(signed_session_id, max_age=self.max_age)
except Exception:
return None # Expired or tampered
data = self.redis.get(f"session:{session_id}")
if not data:
return None
# Refresh TTL on access (sliding expiration)
self.redis.expire(f"session:{session_id}", self.max_age)
return json.loads(data)
def destroy_session(self, signed_session_id: str):
"""Logout — delete session."""
try:
session_id = self.serializer.loads(signed_session_id, max_age=self.max_age)
self.redis.delete(f"session:{session_id}")
except Exception:
pass
def destroy_all_user_sessions(self, user_id: str):
"""Nuclear logout — kill all sessions for a user."""
# Requires a user→sessions index
for key in self.redis.scan_iter(f"session:*"):
data = self.redis.get(key)
if data and json.loads(data).get("user_id") == user_id:
self.redis.delete(key)
# --- FastAPI middleware ---
sessions = SessionManager()
@app.post("/auth/login")
async def login(form: LoginForm = Depends(), db: Session = Depends(get_db)):
user = authenticate_user(db, form.email, form.password)
if not user:
raise HTTPException(401, "Invalid credentials")
signed_id = sessions.create_session(str(user.id), {"role": user.role})
response = JSONResponse({"message": "Logged in"})
response.set_cookie(
"session",
signed_id,
httponly=True,
secure=True,
samesite="lax",
max_age=86400,
)
return response
@app.post("/auth/logout")
async def logout(request: Request):
session_cookie = request.cookies.get("session")
if session_cookie:
sessions.destroy_session(session_cookie)
response = JSONResponse({"message": "Logged out"})
response.delete_cookie("session")
return response
5. API Key Authentication
import hashlib
import secrets
from fastapi import Security
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
def generate_api_key() -> tuple[str, str]:
"""Generate an API key and its hash for storage."""
# Prefix makes keys identifiable (like GitHub's ghp_)
raw_key = f"atk_{secrets.token_urlsafe(32)}"
# Store the hash, not the key itself
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
return raw_key, key_hash
def verify_api_key(raw_key: str, stored_hash: str) -> bool:
"""Verify an API key against its stored hash."""
return hashlib.sha256(raw_key.encode()).hexdigest() == stored_hash
# --- FastAPI dependency ---
async def get_api_client(
api_key: str = Security(api_key_header),
db: Session = Depends(get_db),
):
"""Validate API key and return the associated client."""
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
client = db.query(APIClient).filter(
APIClient.key_hash == key_hash,
APIClient.is_active == True,
).first()
if not client:
raise HTTPException(
status_code=403,
detail="Invalid or revoked API key",
)
# Track usage
client.last_used_at = datetime.now(timezone.utc)
client.request_count += 1
db.commit()
return client
@app.get("/api/v1/data")
async def get_data(client: APIClient = Depends(get_api_client)):
"""API key protected endpoint."""
return {"data": "...", "client": client.name}
6. Two-Factor Authentication (2FA)
# pip install pyotp qrcode
import pyotp
import qrcode
import io
import base64
class TwoFactorAuth:
"""TOTP-based 2FA (compatible with Google Authenticator, Authy, etc.)."""
@staticmethod
def generate_secret() -> str:
"""Generate a new TOTP secret for a user."""
return pyotp.random_base32()
@staticmethod
def get_provisioning_uri(secret: str, email: str, issuer: str = "MyApp") -> str:
"""Generate the URI for QR code scanning."""
totp = pyotp.TOTP(secret)
return totp.provisioning_uri(name=email, issuer_name=issuer)
@staticmethod
def generate_qr_code(uri: str) -> str:
"""Generate QR code as base64 PNG for embedding in HTML."""
qr = qrcode.make(uri)
buffer = io.BytesIO()
qr.save(buffer, format="PNG")
return base64.b64encode(buffer.getvalue()).decode()
@staticmethod
def verify_code(secret: str, code: str) -> bool:
"""Verify a TOTP code (allows 30s window)."""
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=1)
# --- Enable 2FA flow ---
@app.post("/auth/2fa/setup")
async def setup_2fa(user: User = Depends(get_current_user), db: Session = Depends(get_db)):
secret = TwoFactorAuth.generate_secret()
uri = TwoFactorAuth.get_provisioning_uri(secret, user.email)
qr_base64 = TwoFactorAuth.generate_qr_code(uri)
# Store secret temporarily (confirm after first successful code)
user.totp_secret_pending = secret
db.commit()
return {
"qr_code": f"data:image/png;base64,{qr_base64}",
"manual_key": secret,
"message": "Scan QR code with authenticator app, then confirm with a code",
}
@app.post("/auth/2fa/confirm")
async def confirm_2fa(
code: str,
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
if not user.totp_secret_pending:
raise HTTPException(400, "No pending 2FA setup")
if not TwoFactorAuth.verify_code(user.totp_secret_pending, code):
raise HTTPException(400, "Invalid code")
# Activate 2FA
user.totp_secret = user.totp_secret_pending
user.totp_secret_pending = None
user.two_factor_enabled = True
# Generate backup codes
backup_codes = [secrets.token_hex(4) for _ in range(10)]
user.backup_codes = json.dumps([
hashlib.sha256(c.encode()).hexdigest() for c in backup_codes
])
db.commit()
return {
"message": "2FA enabled",
"backup_codes": backup_codes, # Show once, never again
}
7. Token Revocation
JWTs are stateless — you can't "delete" them. But you can blacklist them:
import redis
token_blacklist = redis.from_url("redis://localhost:6379")
def revoke_token(jti: str, expires_in: int):
"""Add token to blacklist. TTL = token's remaining lifetime."""
token_blacklist.setex(f"revoked:{jti}", expires_in, "1")
def is_token_revoked(jti: str) -> bool:
"""Check if token has been revoked."""
return token_blacklist.exists(f"revoked:{jti}") > 0
# --- In your auth middleware ---
async def get_current_user(token: str = Depends(oauth2_scheme)):
payload = verify_token(token)
if is_token_revoked(payload.jti):
raise HTTPException(401, "Token has been revoked")
# ... fetch user
# --- Logout endpoint ---
@app.post("/auth/logout")
async def logout(token: str = Depends(oauth2_scheme)):
payload = verify_token(token)
remaining = int((payload.exp - datetime.now(timezone.utc)).total_seconds())
if remaining > 0:
revoke_token(payload.jti, remaining)
return {"message": "Logged out"}
Security Checklist
- Passwords: Argon2id or bcrypt, never plain SHA/MD5
- JWTs: Short expiry (15 min), use refresh tokens, sign with HS256 or RS256
- Cookies: httponly=True, secure=True, samesite="lax"
- CORS: Whitelist specific origins, never use * with credentials
- Rate limiting: Limit login attempts (5/min per IP, 10/min per account)
- HTTPS: Always. No exceptions. Use HSTS headers.
- Secrets: Environment variables, never in code. Rotate regularly.
- Password rules: Minimum 8 chars, check against breached password lists (HaveIBeenPwned API)
- Account lockout: After N failed attempts, lock temporarily (not permanently — DoS risk)
- Audit logging: Log all auth events (login, logout, failed attempts, 2FA changes)
🚀 Want production-ready auth templates, security tools, and API automation scripts?
Related Articles
- Build a REST API with FastAPI — JWT auth basics integrated into a full API
- Python Security Best Practices — broader security guide
- Python Microservices — auth across distributed services
- Python Redis Guide — Redis for sessions and token blacklists
- Python Environment & Config Management — storing secrets safely
Need a secure authentication system for your project? I build Python APIs with production-grade auth. Reach out on Telegram →