Build a REST API with FastAPI — Complete Python Guide
FastAPI has become the go-to framework for building APIs in Python. It's fast (on par with Node.js and Go), has automatic OpenAPI docs, and leverages Python type hints for validation. In this guide, you'll build a production-ready REST API from scratch — with a database, authentication, tests, and Docker deployment.
Why FastAPI?
Before we dive in, here's why FastAPI stands out in 2026:
- Performance — built on Starlette and uvicorn (ASGI), it handles async natively
- Type safety — Pydantic v2 validates request/response data automatically
- Auto docs — Swagger UI and ReDoc are generated from your code, zero config
- Dependency injection — clean pattern for auth, DB sessions, shared logic
- Standards-based — built on OpenAPI 3.1 and JSON Schema
Project Setup
Install dependencies
# Create project
mkdir fastapi-tasks && cd fastapi-tasks
python -m venv .venv && source .venv/bin/activate
# Install
pip install "fastapi[standard]" sqlalchemy alembic python-jose[cryptography] passlib[bcrypt] python-multipart
Project structure
fastapi-tasks/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app, routes
│ ├── models.py # SQLAlchemy models
│ ├── schemas.py # Pydantic schemas
│ ├── database.py # DB connection
│ ├── auth.py # JWT authentication
│ └── dependencies.py # Shared dependencies
├── tests/
│ └── test_api.py
├── alembic/ # DB migrations
├── Dockerfile
├── docker-compose.yml
└── requirements.txt
Database Layer
Database connection
# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
DATABASE_URL = "sqlite:///./tasks.db"
# For PostgreSQL:
# DATABASE_URL = "postgresql://user:pass@localhost:5432/tasks"
engine = create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False} # SQLite only
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class Base(DeclarativeBase):
pass
def get_db():
"""Dependency: yields a DB session, closes after request."""
db = SessionLocal()
try:
yield db
finally:
db.close()
SQLAlchemy models
# app/models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime, timezone
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
tasks = relationship("Task", back_populates="owner")
class Task(Base):
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
description = Column(String(1000), default="")
completed = Column(Boolean, default=False)
priority = Column(String(10), default="medium") # low, medium, high
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc))
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="tasks")
Pydantic Schemas
Schemas define what data your API accepts and returns. FastAPI validates everything automatically.
# app/schemas.py
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime
from enum import Enum
class Priority(str, Enum):
low = "low"
medium = "medium"
high = "high"
# --- Task schemas ---
class TaskCreate(BaseModel):
title: str = Field(..., min_length=1, max_length=200)
description: str = Field("", max_length=1000)
priority: Priority = Priority.medium
class TaskUpdate(BaseModel):
title: str | None = Field(None, min_length=1, max_length=200)
description: str | None = Field(None, max_length=1000)
completed: bool | None = None
priority: Priority | None = None
class TaskResponse(BaseModel):
id: int
title: str
description: str
completed: bool
priority: str
created_at: datetime
updated_at: datetime
owner_id: int
model_config = {"from_attributes": True}
# --- User schemas ---
class UserCreate(BaseModel):
email: EmailStr
password: str = Field(..., min_length=8)
class UserResponse(BaseModel):
id: int
email: str
is_active: bool
created_at: datetime
model_config = {"from_attributes": True}
# --- Auth schemas ---
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
Key Pydantic v2 features used here:
- Field(...) — required field with validation constraints
- model_config = {"from_attributes": True} — replaces old orm_mode
- EmailStr — validates email format (requires email-validator)
- str | None — Python 3.10+ union syntax for optional fields
Authentication (JWT)
# app/auth.py
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from . import models, schemas
from .database import get_db
# In production, load from environment variables!
SECRET_KEY = "your-secret-key-change-this-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db),
) -> models.User:
"""Dependency: extracts user from JWT token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(models.User).filter(models.User.email == email).first()
if user is None:
raise credentials_exception
return user
Building the API
Main application
# app/main.py
from fastapi import FastAPI
from .database import engine, Base
# Create tables
Base.metadata.create_all(bind=engine)
app = FastAPI(
title="Task Manager API",
description="A production-ready task management API",
version="1.0.0",
)
# Import and include routers (defined below)
from . import routes_auth, routes_tasks
app.include_router(routes_auth.router)
app.include_router(routes_tasks.router)
@app.get("/health")
def health_check():
return {"status": "healthy"}
Auth routes
# app/routes_auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import timedelta
from . import models, schemas, auth
from .database import get_db
router = APIRouter(prefix="/auth", tags=["Authentication"])
@router.post("/register", response_model=schemas.UserResponse, status_code=201)
def register(user_data: schemas.UserCreate, db: Session = Depends(get_db)):
# Check if email already exists
existing = db.query(models.User).filter(
models.User.email == user_data.email
).first()
if existing:
raise HTTPException(status_code=400, detail="Email already registered")
user = models.User(
email=user_data.email,
hashed_password=auth.hash_password(user_data.password),
)
db.add(user)
db.commit()
db.refresh(user)
return user
@router.post("/login", response_model=schemas.Token)
def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db),
):
user = db.query(models.User).filter(
models.User.email == form_data.username
).first()
if not user or not auth.verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
)
access_token = auth.create_access_token(
data={"sub": user.email},
expires_delta=timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES),
)
return {"access_token": access_token, "token_type": "bearer"}
Task CRUD routes
# app/routes_tasks.py
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from . import models, schemas, auth
from .database import get_db
router = APIRouter(prefix="/tasks", tags=["Tasks"])
@router.post("/", response_model=schemas.TaskResponse, status_code=201)
def create_task(
task_data: schemas.TaskCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(auth.get_current_user),
):
task = models.Task(**task_data.model_dump(), owner_id=current_user.id)
db.add(task)
db.commit()
db.refresh(task)
return task
@router.get("/", response_model=list[schemas.TaskResponse])
def list_tasks(
completed: bool | None = None,
priority: schemas.Priority | None = None,
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
current_user: models.User = Depends(auth.get_current_user),
):
query = db.query(models.Task).filter(
models.Task.owner_id == current_user.id
)
if completed is not None:
query = query.filter(models.Task.completed == completed)
if priority is not None:
query = query.filter(models.Task.priority == priority.value)
return query.order_by(models.Task.created_at.desc()).offset(skip).limit(limit).all()
@router.get("/{task_id}", response_model=schemas.TaskResponse)
def get_task(
task_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(auth.get_current_user),
):
task = db.query(models.Task).filter(
models.Task.id == task_id,
models.Task.owner_id == current_user.id,
).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@router.patch("/{task_id}", response_model=schemas.TaskResponse)
def update_task(
task_id: int,
task_data: schemas.TaskUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(auth.get_current_user),
):
task = db.query(models.Task).filter(
models.Task.id == task_id,
models.Task.owner_id == current_user.id,
).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
# Only update provided fields
update_data = task_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(task, field, value.value if hasattr(value, "value") else value)
db.commit()
db.refresh(task)
return task
@router.delete("/{task_id}", status_code=204)
def delete_task(
task_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(auth.get_current_user),
):
task = db.query(models.Task).filter(
models.Task.id == task_id,
models.Task.owner_id == current_user.id,
).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
db.delete(task)
db.commit()
Middleware and Error Handling
Custom middleware
# Add to app/main.py
import time
import logging
from fastapi.middleware.cors import CORSMiddleware
logger = logging.getLogger(__name__)
# CORS — allow frontend origins
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "https://yourdomain.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.middleware("http")
async def timing_middleware(request, call_next):
"""Log request duration for every endpoint."""
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
response.headers["X-Process-Time"] = f"{duration:.4f}"
if duration > 1.0:
logger.warning(f"Slow request: {request.method} {request.url.path} took {duration:.2f}s")
return response
Global error handler
# Add to app/main.py
from fastapi import Request
from fastapi.responses import JSONResponse
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Catch unhandled exceptions — don't leak stack traces."""
logger.error(f"Unhandled error: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"},
)
Database Migrations with Alembic
Never use create_all() in production. Use Alembic for proper migrations:
# Initialize Alembic
alembic init alembic
# Edit alembic/env.py — set target_metadata
from app.database import Base
target_metadata = Base.metadata
# Edit alembic.ini — set sqlalchemy.url
sqlalchemy.url = sqlite:///./tasks.db
# Create and apply migration
alembic revision --autogenerate -m "initial tables"
alembic upgrade head
# After model changes
alembic revision --autogenerate -m "add priority column"
alembic upgrade head
Testing Your API
FastAPI has first-class testing support with TestClient (built on httpx). See also our comprehensive Python testing guide.
# tests/test_api.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.database import Base, get_db
# Test database — in-memory SQLite
TEST_DB_URL = "sqlite:///./test.db"
engine = create_engine(TEST_DB_URL, connect_args={"check_same_thread": False})
TestSession = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def override_get_db():
db = TestSession()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
@pytest.fixture(autouse=True)
def setup_db():
"""Create tables before each test, drop after."""
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
def test_health():
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def register_and_login(email="test@example.com", password="securepass123"):
"""Helper: register user and return auth headers."""
client.post("/auth/register", json={"email": email, "password": password})
login = client.post("/auth/login", data={"username": email, "password": password})
token = login.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
def test_register_and_login():
# Register
resp = client.post("/auth/register", json={
"email": "user@test.com", "password": "password123"
})
assert resp.status_code == 201
assert resp.json()["email"] == "user@test.com"
# Login
resp = client.post("/auth/login", data={
"username": "user@test.com", "password": "password123"
})
assert resp.status_code == 200
assert "access_token" in resp.json()
def test_duplicate_email():
client.post("/auth/register", json={"email": "a@b.com", "password": "password123"})
resp = client.post("/auth/register", json={"email": "a@b.com", "password": "password456"})
assert resp.status_code == 400
def test_crud_tasks():
headers = register_and_login()
# Create
resp = client.post("/tasks/", json={
"title": "Write tests", "priority": "high"
}, headers=headers)
assert resp.status_code == 201
task_id = resp.json()["id"]
# Read
resp = client.get(f"/tasks/{task_id}", headers=headers)
assert resp.status_code == 200
assert resp.json()["title"] == "Write tests"
# Update
resp = client.patch(f"/tasks/{task_id}", json={
"completed": True
}, headers=headers)
assert resp.status_code == 200
assert resp.json()["completed"] is True
# List (filter by completed)
resp = client.get("/tasks/?completed=true", headers=headers)
assert len(resp.json()) == 1
# Delete
resp = client.delete(f"/tasks/{task_id}", headers=headers)
assert resp.status_code == 204
def test_unauthorized_access():
resp = client.get("/tasks/")
assert resp.status_code == 401
def test_task_not_found():
headers = register_and_login()
resp = client.get("/tasks/9999", headers=headers)
assert resp.status_code == 404
# Run tests
pytest tests/ -v
# With coverage
pytest tests/ --cov=app --cov-report=term-missing
Docker Deployment
Containerize your API for consistent deployments. See our Docker guide for more details.
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
# Install dependencies first (cache layer)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY app/ app/
COPY alembic/ alembic/
COPY alembic.ini .
# Run migrations, then start server
CMD alembic upgrade head && \
uvicorn app.main:app --host 0.0.0.0 --port 8000
# docker-compose.yml
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://postgres:postgres@db:5432/tasks
- SECRET_KEY=${SECRET_KEY}
depends_on:
db:
condition: service_healthy
db:
image: postgres:16-alpine
environment:
POSTGRES_DB: tasks
POSTGRES_PASSWORD: postgres
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5
volumes:
pgdata:
# Build and run
docker compose up -d
# Check logs
docker compose logs -f api
# API is live at http://localhost:8000
# Docs at http://localhost:8000/docs
Production Checklist
Before shipping your FastAPI app:
- Environment variables — SECRET_KEY, DATABASE_URL, never hardcode secrets
- HTTPS — use a reverse proxy (nginx, Caddy, Traefik) for TLS
- Rate limiting — add slowapi or use your reverse proxy
- Logging — structured JSON logs with correlation IDs
- Health checks — /health endpoint that also checks DB connectivity
- Graceful shutdown — handle SIGTERM for clean connection draining
- Workers — run with uvicorn --workers 4 or use Gunicorn with uvicorn workers
- Input validation — Pydantic handles most of it, but validate business logic too
- Pagination — always paginate list endpoints (we did: skip/limit)
- Database pooling — configure pool_size and max_overflow for PostgreSQL
Advanced Patterns
Background tasks
from fastapi import BackgroundTasks
def send_notification(email: str, message: str):
"""Send email notification (runs after response is sent)."""
# Your email logic here
print(f"Sending to {email}: {message}")
@router.post("/tasks/", response_model=schemas.TaskResponse, status_code=201)
def create_task(
task_data: schemas.TaskCreate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: models.User = Depends(auth.get_current_user),
):
task = models.Task(**task_data.model_dump(), owner_id=current_user.id)
db.add(task)
db.commit()
db.refresh(task)
# Fire-and-forget notification
background_tasks.add_task(
send_notification, current_user.email, f"Task '{task.title}' created"
)
return task
Async endpoints
import httpx
@router.get("/tasks/{task_id}/enrich")
async def enrich_task(
task_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(auth.get_current_user),
):
"""Fetch external data to enrich a task (async)."""
task = db.query(models.Task).filter(
models.Task.id == task_id,
models.Task.owner_id == current_user.id,
).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
# Async HTTP call — doesn't block other requests
async with httpx.AsyncClient() as client:
resp = await client.get(
f"https://api.example.com/suggestions?q={task.title}",
timeout=5.0,
)
suggestions = resp.json() if resp.status_code == 200 else []
return {"task": task, "suggestions": suggestions}
Custom response models
from pydantic import BaseModel
class PaginatedResponse(BaseModel):
items: list[schemas.TaskResponse]
total: int
skip: int
limit: int
has_more: bool
@router.get("/tasks/", response_model=PaginatedResponse)
def list_tasks_paginated(
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
current_user: models.User = Depends(auth.get_current_user),
):
query = db.query(models.Task).filter(
models.Task.owner_id == current_user.id
)
total = query.count()
items = query.order_by(
models.Task.created_at.desc()
).offset(skip).limit(limit).all()
return PaginatedResponse(
items=items,
total=total,
skip=skip,
limit=limit,
has_more=(skip + limit) < total,
)
Quick Reference
| Pattern | Code |
|---|---|
| Path param | @app.get("/items/{id}") |
| Query param | def f(skip: int = 0, limit: int = 10) |
| Request body | def f(data: MySchema) |
| Dependency | def f(db: Session = Depends(get_db)) |
| Auth guard | def f(user = Depends(get_current_user)) |
| File upload | def f(file: UploadFile) |
| Headers | def f(x_token: str = Header(...)) |
| Status code | @app.post("/", status_code=201) |
🚀 Want production-ready Python tools, API templates, and automation scripts?
Related Articles
- WebSockets in Python — Build Real-Time Apps — add real-time features to your FastAPI app
- Automate API Integrations with Python — connecting to external APIs with retry logic
- Python Database Operations — SQLite, PostgreSQL & SQLAlchemy Guide
- Python Testing Guide — pytest, Mocking & CI Integration
- Dockerize Python Apps — From Development to Production
- Build a Professional CLI Tool in Python
Need a custom API built for your project? I build Python APIs, scrapers, and automation tools. Reach out on Telegram →