Build a REST API with FastAPI — Complete Python Guide

March 2026 · 20 min read · Python, FastAPI, REST, SQLAlchemy

FastAPI has become the go-to framework for building APIs in Python. It's fast (on par with Node.js and Go), has automatic OpenAPI docs, and leverages Python type hints for validation. In this guide, you'll build a production-ready REST API from scratch — with a database, authentication, tests, and Docker deployment.

Why FastAPI?

Before we dive in, here's why FastAPI stands out in 2026:

Project Setup

Install dependencies

# Create project
mkdir fastapi-tasks && cd fastapi-tasks
python -m venv .venv && source .venv/bin/activate

# Install
pip install "fastapi[standard]" sqlalchemy alembic python-jose[cryptography] passlib[bcrypt] python-multipart

Project structure

fastapi-tasks/
├── app/
│   ├── __init__.py
│   ├── main.py          # FastAPI app, routes
│   ├── models.py         # SQLAlchemy models
│   ├── schemas.py        # Pydantic schemas
│   ├── database.py       # DB connection
│   ├── auth.py           # JWT authentication
│   └── dependencies.py   # Shared dependencies
├── tests/
│   └── test_api.py
├── alembic/              # DB migrations
├── Dockerfile
├── docker-compose.yml
└── requirements.txt

Database Layer

Database connection

# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase

DATABASE_URL = "sqlite:///./tasks.db"
# For PostgreSQL:
# DATABASE_URL = "postgresql://user:pass@localhost:5432/tasks"

engine = create_engine(
    DATABASE_URL,
    connect_args={"check_same_thread": False}  # SQLite only
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


class Base(DeclarativeBase):
    pass


def get_db():
    """Dependency: yields a DB session, closes after request."""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

SQLAlchemy models

# app/models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime, timezone

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))

    tasks = relationship("Task", back_populates="owner")


class Task(Base):
    __tablename__ = "tasks"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False)
    description = Column(String(1000), default="")
    completed = Column(Boolean, default=False)
    priority = Column(String(10), default="medium")  # low, medium, high
    created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
    updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc),
                        onupdate=lambda: datetime.now(timezone.utc))
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="tasks")
💡 Tip: For production, use SQLAlchemy with PostgreSQL — SQLite is great for development but doesn't handle concurrent writes well.

Pydantic Schemas

Schemas define what data your API accepts and returns. FastAPI validates everything automatically.

# app/schemas.py
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime
from enum import Enum


class Priority(str, Enum):
    low = "low"
    medium = "medium"
    high = "high"


# --- Task schemas ---
class TaskCreate(BaseModel):
    title: str = Field(..., min_length=1, max_length=200)
    description: str = Field("", max_length=1000)
    priority: Priority = Priority.medium


class TaskUpdate(BaseModel):
    title: str | None = Field(None, min_length=1, max_length=200)
    description: str | None = Field(None, max_length=1000)
    completed: bool | None = None
    priority: Priority | None = None


class TaskResponse(BaseModel):
    id: int
    title: str
    description: str
    completed: bool
    priority: str
    created_at: datetime
    updated_at: datetime
    owner_id: int

    model_config = {"from_attributes": True}


# --- User schemas ---
class UserCreate(BaseModel):
    email: EmailStr
    password: str = Field(..., min_length=8)


class UserResponse(BaseModel):
    id: int
    email: str
    is_active: bool
    created_at: datetime

    model_config = {"from_attributes": True}


# --- Auth schemas ---
class Token(BaseModel):
    access_token: str
    token_type: str = "bearer"

Key Pydantic v2 features used here:

Authentication (JWT)

# app/auth.py
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session

from . import models, schemas
from .database import get_db

# In production, load from environment variables!
SECRET_KEY = "your-secret-key-change-this-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")


def hash_password(password: str) -> str:
    return pwd_context.hash(password)


def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db),
) -> models.User:
    """Dependency: extracts user from JWT token."""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid authentication credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        email: str = payload.get("sub")
        if email is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = db.query(models.User).filter(models.User.email == email).first()
    if user is None:
        raise credentials_exception
    return user
🔒 Security note: Always store SECRET_KEY in environment variables. Use openssl rand -hex 32 to generate a strong key.

Building the API

Main application

# app/main.py
from fastapi import FastAPI
from .database import engine, Base

# Create tables
Base.metadata.create_all(bind=engine)

app = FastAPI(
    title="Task Manager API",
    description="A production-ready task management API",
    version="1.0.0",
)

# Import and include routers (defined below)
from . import routes_auth, routes_tasks
app.include_router(routes_auth.router)
app.include_router(routes_tasks.router)


@app.get("/health")
def health_check():
    return {"status": "healthy"}

Auth routes

# app/routes_auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import timedelta

from . import models, schemas, auth
from .database import get_db

router = APIRouter(prefix="/auth", tags=["Authentication"])


@router.post("/register", response_model=schemas.UserResponse, status_code=201)
def register(user_data: schemas.UserCreate, db: Session = Depends(get_db)):
    # Check if email already exists
    existing = db.query(models.User).filter(
        models.User.email == user_data.email
    ).first()
    if existing:
        raise HTTPException(status_code=400, detail="Email already registered")

    user = models.User(
        email=user_data.email,
        hashed_password=auth.hash_password(user_data.password),
    )
    db.add(user)
    db.commit()
    db.refresh(user)
    return user


@router.post("/login", response_model=schemas.Token)
def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: Session = Depends(get_db),
):
    user = db.query(models.User).filter(
        models.User.email == form_data.username
    ).first()
    if not user or not auth.verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
        )

    access_token = auth.create_access_token(
        data={"sub": user.email},
        expires_delta=timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES),
    )
    return {"access_token": access_token, "token_type": "bearer"}

Task CRUD routes

# app/routes_tasks.py
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session

from . import models, schemas, auth
from .database import get_db

router = APIRouter(prefix="/tasks", tags=["Tasks"])


@router.post("/", response_model=schemas.TaskResponse, status_code=201)
def create_task(
    task_data: schemas.TaskCreate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(auth.get_current_user),
):
    task = models.Task(**task_data.model_dump(), owner_id=current_user.id)
    db.add(task)
    db.commit()
    db.refresh(task)
    return task


@router.get("/", response_model=list[schemas.TaskResponse])
def list_tasks(
    completed: bool | None = None,
    priority: schemas.Priority | None = None,
    skip: int = Query(0, ge=0),
    limit: int = Query(20, ge=1, le=100),
    db: Session = Depends(get_db),
    current_user: models.User = Depends(auth.get_current_user),
):
    query = db.query(models.Task).filter(
        models.Task.owner_id == current_user.id
    )
    if completed is not None:
        query = query.filter(models.Task.completed == completed)
    if priority is not None:
        query = query.filter(models.Task.priority == priority.value)

    return query.order_by(models.Task.created_at.desc()).offset(skip).limit(limit).all()


@router.get("/{task_id}", response_model=schemas.TaskResponse)
def get_task(
    task_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(auth.get_current_user),
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.owner_id == current_user.id,
    ).first()
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
    return task


@router.patch("/{task_id}", response_model=schemas.TaskResponse)
def update_task(
    task_id: int,
    task_data: schemas.TaskUpdate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(auth.get_current_user),
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.owner_id == current_user.id,
    ).first()
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    # Only update provided fields
    update_data = task_data.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(task, field, value.value if hasattr(value, "value") else value)

    db.commit()
    db.refresh(task)
    return task


@router.delete("/{task_id}", status_code=204)
def delete_task(
    task_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(auth.get_current_user),
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.owner_id == current_user.id,
    ).first()
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    db.delete(task)
    db.commit()

Middleware and Error Handling

Custom middleware

# Add to app/main.py
import time
import logging
from fastapi.middleware.cors import CORSMiddleware

logger = logging.getLogger(__name__)

# CORS — allow frontend origins
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000", "https://yourdomain.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.middleware("http")
async def timing_middleware(request, call_next):
    """Log request duration for every endpoint."""
    start = time.perf_counter()
    response = await call_next(request)
    duration = time.perf_counter() - start
    response.headers["X-Process-Time"] = f"{duration:.4f}"
    if duration > 1.0:
        logger.warning(f"Slow request: {request.method} {request.url.path} took {duration:.2f}s")
    return response

Global error handler

# Add to app/main.py
from fastapi import Request
from fastapi.responses import JSONResponse


@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    """Catch unhandled exceptions — don't leak stack traces."""
    logger.error(f"Unhandled error: {exc}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={"detail": "Internal server error"},
    )

Database Migrations with Alembic

Never use create_all() in production. Use Alembic for proper migrations:

# Initialize Alembic
alembic init alembic

# Edit alembic/env.py — set target_metadata
from app.database import Base
target_metadata = Base.metadata

# Edit alembic.ini — set sqlalchemy.url
sqlalchemy.url = sqlite:///./tasks.db

# Create and apply migration
alembic revision --autogenerate -m "initial tables"
alembic upgrade head

# After model changes
alembic revision --autogenerate -m "add priority column"
alembic upgrade head
💡 Tip: Alembic auto-detects model changes and generates migration scripts. Always review the generated migration before applying it.

Testing Your API

FastAPI has first-class testing support with TestClient (built on httpx). See also our comprehensive Python testing guide.

# tests/test_api.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from app.main import app
from app.database import Base, get_db

# Test database — in-memory SQLite
TEST_DB_URL = "sqlite:///./test.db"
engine = create_engine(TEST_DB_URL, connect_args={"check_same_thread": False})
TestSession = sessionmaker(autocommit=False, autoflush=False, bind=engine)


def override_get_db():
    db = TestSession()
    try:
        yield db
    finally:
        db.close()


app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)


@pytest.fixture(autouse=True)
def setup_db():
    """Create tables before each test, drop after."""
    Base.metadata.create_all(bind=engine)
    yield
    Base.metadata.drop_all(bind=engine)


def test_health():
    response = client.get("/health")
    assert response.status_code == 200
    assert response.json()["status"] == "healthy"


def register_and_login(email="test@example.com", password="securepass123"):
    """Helper: register user and return auth headers."""
    client.post("/auth/register", json={"email": email, "password": password})
    login = client.post("/auth/login", data={"username": email, "password": password})
    token = login.json()["access_token"]
    return {"Authorization": f"Bearer {token}"}


def test_register_and_login():
    # Register
    resp = client.post("/auth/register", json={
        "email": "user@test.com", "password": "password123"
    })
    assert resp.status_code == 201
    assert resp.json()["email"] == "user@test.com"

    # Login
    resp = client.post("/auth/login", data={
        "username": "user@test.com", "password": "password123"
    })
    assert resp.status_code == 200
    assert "access_token" in resp.json()


def test_duplicate_email():
    client.post("/auth/register", json={"email": "a@b.com", "password": "password123"})
    resp = client.post("/auth/register", json={"email": "a@b.com", "password": "password456"})
    assert resp.status_code == 400


def test_crud_tasks():
    headers = register_and_login()

    # Create
    resp = client.post("/tasks/", json={
        "title": "Write tests", "priority": "high"
    }, headers=headers)
    assert resp.status_code == 201
    task_id = resp.json()["id"]

    # Read
    resp = client.get(f"/tasks/{task_id}", headers=headers)
    assert resp.status_code == 200
    assert resp.json()["title"] == "Write tests"

    # Update
    resp = client.patch(f"/tasks/{task_id}", json={
        "completed": True
    }, headers=headers)
    assert resp.status_code == 200
    assert resp.json()["completed"] is True

    # List (filter by completed)
    resp = client.get("/tasks/?completed=true", headers=headers)
    assert len(resp.json()) == 1

    # Delete
    resp = client.delete(f"/tasks/{task_id}", headers=headers)
    assert resp.status_code == 204


def test_unauthorized_access():
    resp = client.get("/tasks/")
    assert resp.status_code == 401


def test_task_not_found():
    headers = register_and_login()
    resp = client.get("/tasks/9999", headers=headers)
    assert resp.status_code == 404
# Run tests
pytest tests/ -v

# With coverage
pytest tests/ --cov=app --cov-report=term-missing

Docker Deployment

Containerize your API for consistent deployments. See our Docker guide for more details.

# Dockerfile
FROM python:3.12-slim

WORKDIR /app

# Install dependencies first (cache layer)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY app/ app/
COPY alembic/ alembic/
COPY alembic.ini .

# Run migrations, then start server
CMD alembic upgrade head && \
    uvicorn app.main:app --host 0.0.0.0 --port 8000
# docker-compose.yml
services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://postgres:postgres@db:5432/tasks
      - SECRET_KEY=${SECRET_KEY}
    depends_on:
      db:
        condition: service_healthy

  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: tasks
      POSTGRES_PASSWORD: postgres
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s
      timeout: 5s
      retries: 5

volumes:
  pgdata:
# Build and run
docker compose up -d

# Check logs
docker compose logs -f api

# API is live at http://localhost:8000
# Docs at http://localhost:8000/docs

Production Checklist

Before shipping your FastAPI app:

Advanced Patterns

Background tasks

from fastapi import BackgroundTasks

def send_notification(email: str, message: str):
    """Send email notification (runs after response is sent)."""
    # Your email logic here
    print(f"Sending to {email}: {message}")


@router.post("/tasks/", response_model=schemas.TaskResponse, status_code=201)
def create_task(
    task_data: schemas.TaskCreate,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(auth.get_current_user),
):
    task = models.Task(**task_data.model_dump(), owner_id=current_user.id)
    db.add(task)
    db.commit()
    db.refresh(task)

    # Fire-and-forget notification
    background_tasks.add_task(
        send_notification, current_user.email, f"Task '{task.title}' created"
    )
    return task

Async endpoints

import httpx

@router.get("/tasks/{task_id}/enrich")
async def enrich_task(
    task_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(auth.get_current_user),
):
    """Fetch external data to enrich a task (async)."""
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.owner_id == current_user.id,
    ).first()
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    # Async HTTP call — doesn't block other requests
    async with httpx.AsyncClient() as client:
        resp = await client.get(
            f"https://api.example.com/suggestions?q={task.title}",
            timeout=5.0,
        )
        suggestions = resp.json() if resp.status_code == 200 else []

    return {"task": task, "suggestions": suggestions}

Custom response models

from pydantic import BaseModel


class PaginatedResponse(BaseModel):
    items: list[schemas.TaskResponse]
    total: int
    skip: int
    limit: int
    has_more: bool


@router.get("/tasks/", response_model=PaginatedResponse)
def list_tasks_paginated(
    skip: int = Query(0, ge=0),
    limit: int = Query(20, ge=1, le=100),
    db: Session = Depends(get_db),
    current_user: models.User = Depends(auth.get_current_user),
):
    query = db.query(models.Task).filter(
        models.Task.owner_id == current_user.id
    )
    total = query.count()
    items = query.order_by(
        models.Task.created_at.desc()
    ).offset(skip).limit(limit).all()

    return PaginatedResponse(
        items=items,
        total=total,
        skip=skip,
        limit=limit,
        has_more=(skip + limit) < total,
    )

Quick Reference

PatternCode
Path param@app.get("/items/{id}")
Query paramdef f(skip: int = 0, limit: int = 10)
Request bodydef f(data: MySchema)
Dependencydef f(db: Session = Depends(get_db))
Auth guarddef f(user = Depends(get_current_user))
File uploaddef f(file: UploadFile)
Headersdef f(x_token: str = Header(...))
Status code@app.post("/", status_code=201)

🚀 Want production-ready Python tools, API templates, and automation scripts?

Get the AI Agent Toolkit →

Related Articles

Need a custom API built for your project? I build Python APIs, scrapers, and automation tools. Reach out on Telegram →