GraphQL with Python — Build APIs with Strawberry & FastAPI

Published March 2026 · 20 min read

REST APIs are great, but GraphQL gives clients the power to request exactly the data they need — no over-fetching, no under-fetching. Python's Strawberry framework makes building type-safe GraphQL APIs a joy, especially when paired with FastAPI.

This guide covers everything from basic schema design to production patterns: queries, mutations, subscriptions, authentication, the N+1 problem with DataLoader, file uploads, and testing.

Why GraphQL?

GraphQL solves real problems that REST APIs create at scale:

Project Setup

# Create project
mkdir graphql-api && cd graphql-api
python -m venv .venv && source .venv/bin/activate

# Install dependencies
pip install 'strawberry-graphql[fastapi]' uvicorn sqlalchemy aiosqlite

# Project structure
# graphql-api/
# ├── app/
# │   ├── __init__.py
# │   ├── main.py          # FastAPI + Strawberry
# │   ├── schema.py         # GraphQL types & resolvers
# │   ├── models.py         # SQLAlchemy models
# │   ├── database.py       # DB connection
# │   ├── auth.py           # Authentication
# │   └── dataloaders.py    # DataLoader for N+1
# ├── tests/
# │   └── test_api.py
# └── requirements.txt

Database Layer

SQLAlchemy Models

# app/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, DeclarativeBase

DATABASE_URL = "sqlite+aiosqlite:///./app.db"

engine = create_async_engine(DATABASE_URL, echo=False)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)


class Base(DeclarativeBase):
    pass


async def get_session() -> AsyncSession:
    async with async_session() as session:
        yield session


async def init_db():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
# app/models.py
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Text, Boolean
from sqlalchemy.orm import relationship
from app.database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    hashed_password = Column(String(128), nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)

    posts = relationship("Post", back_populates="author", lazy="selectin")
    comments = relationship("Comment", back_populates="author", lazy="selectin")


class Post(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    content = Column(Text, nullable=False)
    published = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)

    author = relationship("User", back_populates="posts")
    comments = relationship("Comment", back_populates="post", lazy="selectin")
    tags = relationship("Tag", secondary="post_tags", back_populates="posts")


class Comment(Base):
    __tablename__ = "comments"

    id = Column(Integer, primary_key=True)
    content = Column(Text, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    post_id = Column(Integer, ForeignKey("posts.id"), nullable=False)

    author = relationship("User", back_populates="comments")
    post = relationship("Post", back_populates="comments")


class Tag(Base):
    __tablename__ = "tags"

    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True, nullable=False)

    posts = relationship("Post", secondary="post_tags", back_populates="tags")


# Association table
from sqlalchemy import Table
post_tags = Table(
    "post_tags", Base.metadata,
    Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True),
    Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True),
)

GraphQL Schema — Types

Strawberry uses Python dataclasses and type hints to define your GraphQL schema. No SDL files — it's all Python:

# app/schema.py
import strawberry
from datetime import datetime
from typing import Optional


@strawberry.type
class UserType:
    id: int
    username: str
    email: str
    is_active: bool
    created_at: datetime
    posts: list["PostType"]


@strawberry.type
class PostType:
    id: int
    title: str
    content: str
    published: bool
    created_at: datetime
    updated_at: datetime
    author: UserType
    comments: list["CommentType"]
    tags: list["TagType"]


@strawberry.type
class CommentType:
    id: int
    content: str
    created_at: datetime
    author: UserType


@strawberry.type
class TagType:
    id: int
    name: str
💡 Tip: Strawberry types are just Python classes with type annotations. No string-based schemas, full IDE support and type checking.

Queries — Reading Data

# app/schema.py (continued)
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.database import async_session
from app.models import User, Post, Tag


def db_user_to_type(user: User) -> UserType:
    """Convert SQLAlchemy model to Strawberry type."""
    return UserType(
        id=user.id,
        username=user.username,
        email=user.email,
        is_active=user.is_active,
        created_at=user.created_at,
        posts=[db_post_to_type(p) for p in user.posts],
    )


def db_post_to_type(post: Post) -> PostType:
    return PostType(
        id=post.id,
        title=post.title,
        content=post.content,
        published=post.published,
        created_at=post.created_at,
        updated_at=post.updated_at,
        author=UserType(
            id=post.author.id,
            username=post.author.username,
            email=post.author.email,
            is_active=post.author.is_active,
            created_at=post.author.created_at,
            posts=[],
        ),
        comments=[
            CommentType(
                id=c.id,
                content=c.content,
                created_at=c.created_at,
                author=UserType(
                    id=c.author.id,
                    username=c.author.username,
                    email=c.author.email,
                    is_active=c.author.is_active,
                    created_at=c.author.created_at,
                    posts=[],
                ),
            )
            for c in post.comments
        ],
        tags=[TagType(id=t.id, name=t.name) for t in post.tags],
    )


@strawberry.type
class Query:
    @strawberry.field
    async def users(self) -> list[UserType]:
        async with async_session() as session:
            result = await session.execute(
                select(User).options(selectinload(User.posts))
            )
            users = result.scalars().all()
            return [db_user_to_type(u) for u in users]

    @strawberry.field
    async def user(self, id: int) -> Optional[UserType]:
        async with async_session() as session:
            user = await session.get(User, id)
            if not user:
                return None
            return db_user_to_type(user)

    @strawberry.field
    async def posts(
        self,
        published_only: bool = True,
        limit: int = 20,
        offset: int = 0,
    ) -> list[PostType]:
        async with async_session() as session:
            query = select(Post).options(
                selectinload(Post.author),
                selectinload(Post.comments),
                selectinload(Post.tags),
            )
            if published_only:
                query = query.where(Post.published == True)
            query = query.order_by(Post.created_at.desc()).limit(limit).offset(offset)

            result = await session.execute(query)
            posts = result.scalars().all()
            return [db_post_to_type(p) for p in posts]

    @strawberry.field
    async def post(self, id: int) -> Optional[PostType]:
        async with async_session() as session:
            result = await session.execute(
                select(Post)
                .where(Post.id == id)
                .options(
                    selectinload(Post.author),
                    selectinload(Post.comments),
                    selectinload(Post.tags),
                )
            )
            post = result.scalar_one_or_none()
            if not post:
                return None
            return db_post_to_type(post)

    @strawberry.field
    async def search_posts(self, query: str) -> list[PostType]:
        """Full-text search across titles and content."""
        async with async_session() as session:
            result = await session.execute(
                select(Post)
                .where(
                    (Post.title.ilike(f"%{query}%"))
                    | (Post.content.ilike(f"%{query}%"))
                )
                .options(selectinload(Post.author), selectinload(Post.tags))
            )
            posts = result.scalars().all()
            return [db_post_to_type(p) for p in posts]

Now clients can query exactly what they need:

# Fetch only usernames and post titles — no over-fetching
query {
  users {
    username
    posts {
      title
    }
  }
}

# Get a single post with author and comments
query {
  post(id: 1) {
    title
    content
    author {
      username
    }
    comments {
      content
      author { username }
    }
    tags { name }
  }
}

# Search with pagination
query {
  posts(publishedOnly: true, limit: 10, offset: 0) {
    title
    createdAt
  }
}

Mutations — Writing Data

# app/schema.py (continued)
import bcrypt


@strawberry.input
class CreateUserInput:
    username: str
    email: str
    password: str


@strawberry.input
class CreatePostInput:
    title: str
    content: str
    published: bool = False
    tag_names: list[str] = strawberry.field(default_factory=list)


@strawberry.input
class UpdatePostInput:
    title: Optional[str] = None
    content: Optional[str] = None
    published: Optional[bool] = None


@strawberry.type
class MutationResult:
    success: bool
    message: str


@strawberry.type
class Mutation:
    @strawberry.mutation
    async def create_user(self, input: CreateUserInput) -> UserType:
        async with async_session() as session:
            # Check uniqueness
            existing = await session.execute(
                select(User).where(
                    (User.username == input.username) | (User.email == input.email)
                )
            )
            if existing.scalar_one_or_none():
                raise ValueError("Username or email already taken")

            hashed = bcrypt.hashpw(
                input.password.encode(), bcrypt.gensalt()
            ).decode()

            user = User(
                username=input.username,
                email=input.email,
                hashed_password=hashed,
            )
            session.add(user)
            await session.commit()
            await session.refresh(user)

            return UserType(
                id=user.id,
                username=user.username,
                email=user.email,
                is_active=user.is_active,
                created_at=user.created_at,
                posts=[],
            )

    @strawberry.mutation
    async def create_post(
        self, input: CreatePostInput, info: strawberry.types.Info
    ) -> PostType:
        # Get current user from context
        user = info.context.get("current_user")
        if not user:
            raise PermissionError("Authentication required")

        async with async_session() as session:
            # Get or create tags
            tags = []
            for tag_name in input.tag_names:
                result = await session.execute(
                    select(Tag).where(Tag.name == tag_name.lower())
                )
                tag = result.scalar_one_or_none()
                if not tag:
                    tag = Tag(name=tag_name.lower())
                    session.add(tag)
                tags.append(tag)

            post = Post(
                title=input.title,
                content=input.content,
                published=input.published,
                author_id=user.id,
                tags=tags,
            )
            session.add(post)
            await session.commit()
            await session.refresh(post)

            return db_post_to_type(post)

    @strawberry.mutation
    async def update_post(
        self, id: int, input: UpdatePostInput, info: strawberry.types.Info
    ) -> PostType:
        user = info.context.get("current_user")
        if not user:
            raise PermissionError("Authentication required")

        async with async_session() as session:
            post = await session.get(Post, id)
            if not post:
                raise ValueError(f"Post {id} not found")
            if post.author_id != user.id:
                raise PermissionError("Not your post")

            if input.title is not None:
                post.title = input.title
            if input.content is not None:
                post.content = input.content
            if input.published is not None:
                post.published = input.published

            await session.commit()
            await session.refresh(post)
            return db_post_to_type(post)

    @strawberry.mutation
    async def delete_post(
        self, id: int, info: strawberry.types.Info
    ) -> MutationResult:
        user = info.context.get("current_user")
        if not user:
            raise PermissionError("Authentication required")

        async with async_session() as session:
            post = await session.get(Post, id)
            if not post:
                return MutationResult(success=False, message="Post not found")
            if post.author_id != user.id:
                return MutationResult(success=False, message="Not your post")

            await session.delete(post)
            await session.commit()
            return MutationResult(success=True, message="Post deleted")

    @strawberry.mutation
    async def add_comment(
        self, post_id: int, content: str, info: strawberry.types.Info
    ) -> CommentType:
        user = info.context.get("current_user")
        if not user:
            raise PermissionError("Authentication required")

        async with async_session() as session:
            from app.models import Comment as CommentModel

            comment = CommentModel(
                content=content,
                author_id=user.id,
                post_id=post_id,
            )
            session.add(comment)
            await session.commit()
            await session.refresh(comment)

            return CommentType(
                id=comment.id,
                content=comment.content,
                created_at=comment.created_at,
                author=UserType(
                    id=user.id,
                    username=user.username,
                    email=user.email,
                    is_active=user.is_active,
                    created_at=user.created_at,
                    posts=[],
                ),
            )

Using mutations:

# Create a user
mutation {
  createUser(input: {
    username: "alice"
    email: "alice@example.com"
    password: "secure123"
  }) {
    id
    username
  }
}

# Create a post with tags
mutation {
  createPost(input: {
    title: "GraphQL is awesome"
    content: "Here's why..."
    published: true
    tagNames: ["graphql", "python", "tutorial"]
  }) {
    id
    title
    tags { name }
  }
}

Authentication

# app/auth.py
import jwt
import bcrypt
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy import select
from app.database import async_session
from app.models import User

SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60


def create_access_token(user_id: int) -> str:
    expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    payload = {"sub": str(user_id), "exp": expire}
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)


def verify_token(token: str) -> Optional[int]:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return int(payload["sub"])
    except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
        return None


async def authenticate_user(username: str, password: str) -> Optional[User]:
    async with async_session() as session:
        result = await session.execute(
            select(User).where(User.username == username)
        )
        user = result.scalar_one_or_none()
        if not user:
            return None
        if not bcrypt.checkpw(password.encode(), user.hashed_password.encode()):
            return None
        return user


async def get_user_from_token(token: str) -> Optional[User]:
    user_id = verify_token(token)
    if not user_id:
        return None
    async with async_session() as session:
        return await session.get(User, user_id)

Login Mutation

# Add to schema.py Mutation class

@strawberry.type
class AuthPayload:
    token: str
    user: UserType


# Inside Mutation class:
@strawberry.mutation
async def login(self, username: str, password: str) -> AuthPayload:
    user = await authenticate_user(username, password)
    if not user:
        raise ValueError("Invalid credentials")

    token = create_access_token(user.id)
    return AuthPayload(
        token=token,
        user=UserType(
            id=user.id,
            username=user.username,
            email=user.email,
            is_active=user.is_active,
            created_at=user.created_at,
            posts=[],
        ),
    )

FastAPI Integration

# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from strawberry.fastapi import GraphQLRouter
import strawberry

from app.schema import Query, Mutation
from app.database import init_db
from app.auth import get_user_from_token


async def get_context(request: Request) -> dict:
    """Extract auth token and resolve current user."""
    context = {"request": request}

    auth_header = request.headers.get("Authorization", "")
    if auth_header.startswith("Bearer "):
        token = auth_header[7:]
        user = await get_user_from_token(token)
        if user:
            context["current_user"] = user

    return context


schema = strawberry.Schema(query=Query, mutation=Mutation)
graphql_router = GraphQLRouter(schema, context_getter=get_context)


@asynccontextmanager
async def lifespan(app: FastAPI):
    await init_db()
    yield


app = FastAPI(title="GraphQL Blog API", lifespan=lifespan)
app.include_router(graphql_router, prefix="/graphql")


# Optional: REST health check
@app.get("/health")
async def health():
    return {"status": "ok"}
# Run the server
uvicorn app.main:app --reload

# Open GraphiQL playground
# http://localhost:8000/graphql
💡 Tip: Strawberry includes a built-in GraphiQL IDE at /graphql. It has auto-complete, docs explorer, and query history — no external tools needed.

Solving N+1 with DataLoader

The N+1 problem is GraphQL's biggest performance pitfall. If you query 50 posts and each resolves its author, that's 51 database queries. DataLoader batches them into one:

# app/dataloaders.py
from typing import Any
from strawberry.dataloader import DataLoader
from sqlalchemy import select
from app.database import async_session
from app.models import User, Post, Comment


async def load_users(keys: list[int]) -> list[User]:
    """Batch-load users by ID."""
    async with async_session() as session:
        result = await session.execute(
            select(User).where(User.id.in_(keys))
        )
        users = {u.id: u for u in result.scalars().all()}
        # Return in same order as keys, None for missing
        return [users.get(key) for key in keys]


async def load_posts_by_author(keys: list[int]) -> list[list[Post]]:
    """Batch-load posts grouped by author_id."""
    async with async_session() as session:
        result = await session.execute(
            select(Post).where(Post.author_id.in_(keys))
        )
        posts = result.scalars().all()

        # Group by author
        grouped: dict[int, list[Post]] = {key: [] for key in keys}
        for post in posts:
            grouped[post.author_id].append(post)

        return [grouped[key] for key in keys]


async def load_comments_by_post(keys: list[int]) -> list[list[Comment]]:
    """Batch-load comments grouped by post_id."""
    async with async_session() as session:
        result = await session.execute(
            select(Comment).where(Comment.post_id.in_(keys))
        )
        comments = result.scalars().all()

        grouped: dict[int, list[Comment]] = {key: [] for key in keys}
        for comment in comments:
            grouped[comment.post_id].append(comment)

        return [grouped[key] for key in keys]


def create_dataloaders() -> dict[str, DataLoader]:
    """Create fresh DataLoaders for each request."""
    return {
        "user_loader": DataLoader(load_fn=load_users),
        "posts_by_author": DataLoader(load_fn=load_posts_by_author),
        "comments_by_post": DataLoader(load_fn=load_comments_by_post),
    }

Using DataLoaders in Types

# Updated types using DataLoader for lazy resolution

@strawberry.type
class PostTypeWithLoader:
    id: int
    title: str
    content: str
    published: bool
    created_at: datetime
    updated_at: datetime
    author_id: strawberry.Private[int]  # Hidden from schema

    @strawberry.field
    async def author(self, info: strawberry.types.Info) -> UserType:
        loader = info.context["dataloaders"]["user_loader"]
        user = await loader.load(self.author_id)
        return UserType(
            id=user.id,
            username=user.username,
            email=user.email,
            is_active=user.is_active,
            created_at=user.created_at,
            posts=[],  # Avoid circular loading
        )

    @strawberry.field
    async def comments(self, info: strawberry.types.Info) -> list[CommentType]:
        loader = info.context["dataloaders"]["comments_by_post"]
        comments = await loader.load(self.id)
        return [
            CommentType(
                id=c.id,
                content=c.content,
                created_at=c.created_at,
                author=UserType(
                    id=c.author_id, username="", email="",
                    is_active=True, created_at=c.created_at, posts=[],
                ),
            )
            for c in comments
        ]
# Update context to include dataloaders
async def get_context(request: Request) -> dict:
    context = {
        "request": request,
        "dataloaders": create_dataloaders(),  # Fresh per request!
    }

    auth_header = request.headers.get("Authorization", "")
    if auth_header.startswith("Bearer "):
        token = auth_header[7:]
        user = await get_user_from_token(token)
        if user:
            context["current_user"] = user

    return context

Now 50 posts with authors = 2 queries (posts + batch users), not 51.

Subscriptions — Real-Time Data

# app/schema.py
import asyncio
from typing import AsyncGenerator


# Simple in-memory pub/sub (use Redis in production)
class PubSub:
    def __init__(self):
        self.subscribers: dict[str, list[asyncio.Queue]] = {}

    async def publish(self, channel: str, data: Any):
        for queue in self.subscribers.get(channel, []):
            await queue.put(data)

    async def subscribe(self, channel: str) -> AsyncGenerator:
        queue: asyncio.Queue = asyncio.Queue()
        self.subscribers.setdefault(channel, []).append(queue)
        try:
            while True:
                data = await queue.get()
                yield data
        finally:
            self.subscribers[channel].remove(queue)


pubsub = PubSub()


@strawberry.type
class Subscription:
    @strawberry.subscription
    async def new_post(self) -> AsyncGenerator[PostType, None]:
        """Subscribe to new posts as they're created."""
        async for post_data in pubsub.subscribe("new_post"):
            yield post_data

    @strawberry.subscription
    async def new_comment(
        self, post_id: int
    ) -> AsyncGenerator[CommentType, None]:
        """Subscribe to comments on a specific post."""
        async for comment_data in pubsub.subscribe(f"comment:{post_id}"):
            yield comment_data

    @strawberry.subscription
    async def post_count(self) -> AsyncGenerator[int, None]:
        """Periodically emit total post count."""
        while True:
            async with async_session() as session:
                from sqlalchemy import func
                result = await session.execute(
                    select(func.count(Post.id))
                )
                count = result.scalar()
                yield count
            await asyncio.sleep(5)


# Update schema to include subscriptions
schema = strawberry.Schema(
    query=Query,
    mutation=Mutation,
    subscription=Subscription,
)


# Publish events from mutations:
# In create_post mutation, after commit:
# await pubsub.publish("new_post", post_type)
# Client-side subscription (JavaScript)
const ws = new WebSocket("ws://localhost:8000/graphql");

ws.onopen = () => {
  ws.send(JSON.stringify({
    type: "subscribe",
    payload: {
      query: `subscription { newPost { title author { username } } }`
    }
  }));
};

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log("New post:", data.payload.data.newPost);
};

Error Handling

# Custom error types for better client UX
@strawberry.type
class FieldError:
    field: str
    message: str


@strawberry.type
class PostResult:
    post: Optional[PostType] = None
    errors: list[FieldError] = strawberry.field(default_factory=list)

    @property
    def success(self) -> bool:
        return len(self.errors) == 0


# Use in mutations
@strawberry.mutation
async def create_post_safe(
    self, input: CreatePostInput, info: strawberry.types.Info
) -> PostResult:
    errors = []

    if len(input.title) < 3:
        errors.append(FieldError(field="title", message="Title too short (min 3 chars)"))
    if len(input.content) < 10:
        errors.append(FieldError(field="content", message="Content too short (min 10 chars)"))

    if errors:
        return PostResult(errors=errors)

    user = info.context.get("current_user")
    if not user:
        return PostResult(errors=[
            FieldError(field="auth", message="Authentication required")
        ])

    # ... create post logic
    return PostResult(post=post_type)
💡 Tip: Return structured errors in your mutation responses instead of raising exceptions. This lets clients handle specific field errors gracefully (e.g., showing validation messages next to form fields).

Pagination — Cursor-Based

# Relay-style cursor pagination
import base64


@strawberry.type
class PageInfo:
    has_next_page: bool
    has_previous_page: bool
    start_cursor: Optional[str] = None
    end_cursor: Optional[str] = None


@strawberry.type
class PostEdge:
    cursor: str
    node: PostType


@strawberry.type
class PostConnection:
    edges: list[PostEdge]
    page_info: PageInfo
    total_count: int


def encode_cursor(id: int) -> str:
    return base64.b64encode(f"post:{id}".encode()).decode()


def decode_cursor(cursor: str) -> int:
    decoded = base64.b64decode(cursor.encode()).decode()
    return int(decoded.split(":")[1])


# In Query class:
@strawberry.field
async def posts_paginated(
    self,
    first: int = 10,
    after: Optional[str] = None,
) -> PostConnection:
    async with async_session() as session:
        query = select(Post).where(Post.published == True)

        if after:
            cursor_id = decode_cursor(after)
            query = query.where(Post.id > cursor_id)

        # Get total count
        count_result = await session.execute(
            select(func.count(Post.id)).where(Post.published == True)
        )
        total = count_result.scalar()

        # Fetch one extra to check has_next_page
        query = query.order_by(Post.id).limit(first + 1)
        result = await session.execute(query)
        posts = result.scalars().all()

        has_next = len(posts) > first
        posts = posts[:first]

        edges = [
            PostEdge(cursor=encode_cursor(p.id), node=db_post_to_type(p))
            for p in posts
        ]

        return PostConnection(
            edges=edges,
            page_info=PageInfo(
                has_next_page=has_next,
                has_previous_page=after is not None,
                start_cursor=edges[0].cursor if edges else None,
                end_cursor=edges[-1].cursor if edges else None,
            ),
            total_count=total,
        )
# Query with cursor pagination
query {
  postsPaginated(first: 5) {
    totalCount
    pageInfo {
      hasNextPage
      endCursor
    }
    edges {
      cursor
      node {
        title
        createdAt
      }
    }
  }
}

# Next page
query {
  postsPaginated(first: 5, after: "cG9zdDo1") {
    edges {
      node { title }
    }
    pageInfo { hasNextPage endCursor }
  }
}

File Uploads

# Strawberry supports multipart file uploads
from strawberry.file_uploads import Upload


@strawberry.mutation
async def upload_avatar(
    self, file: Upload, info: strawberry.types.Info
) -> str:
    user = info.context.get("current_user")
    if not user:
        raise PermissionError("Authentication required")

    content = await file.read()
    filename = f"avatars/{user.id}_{file.filename}"

    # Save file (use S3/GCS in production)
    import aiofiles
    async with aiofiles.open(f"./uploads/{filename}", "wb") as f:
        await f.write(content)

    return f"/uploads/{filename}"


# Client usage with curl:
# curl -X POST http://localhost:8000/graphql \
#   -F operations='{"query":"mutation($file: Upload!) { uploadAvatar(file: $file) }","variables":{"file":null}}' \
#   -F map='{"0":["variables.file"]}' \
#   -F 0=@avatar.jpg

Query Complexity & Depth Limiting

# Prevent abusive queries
from strawberry.extensions import QueryDepthLimiter
from strawberry.extensions.query_depth_limiter import QueryDepthLimiter


schema = strawberry.Schema(
    query=Query,
    mutation=Mutation,
    subscription=Subscription,
    extensions=[
        QueryDepthLimiter(max_depth=10),
    ],
)


# Custom cost analysis
from strawberry.extensions import Extension


class QueryCostExtension(Extension):
    MAX_COST = 1000

    def on_operation(self):
        yield
        # After execution, log the cost
        result = self.execution_context.result
        if result and result.data:
            cost = self._estimate_cost(result.data)
            if cost > self.MAX_COST:
                raise ValueError(f"Query cost {cost} exceeds limit {self.MAX_COST}")

    def _estimate_cost(self, data, depth=0) -> int:
        if isinstance(data, list):
            return sum(self._estimate_cost(item, depth + 1) for item in data)
        if isinstance(data, dict):
            return 1 + sum(
                self._estimate_cost(v, depth + 1) for v in data.values()
            )
        return 1

Testing

# tests/test_api.py
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
from app.database import init_db, engine, Base


@pytest.fixture(autouse=True)
async def setup_db():
    """Fresh database for each test."""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)


@pytest.fixture
async def client():
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac


@pytest.mark.asyncio
async def test_create_user(client):
    query = """
    mutation {
        createUser(input: {
            username: "testuser"
            email: "test@example.com"
            password: "password123"
        }) {
            id
            username
            email
        }
    }
    """
    response = await client.post("/graphql", json={"query": query})
    assert response.status_code == 200

    data = response.json()["data"]["createUser"]
    assert data["username"] == "testuser"
    assert data["email"] == "test@example.com"
    assert "id" in data


@pytest.mark.asyncio
async def test_query_users_empty(client):
    query = "{ users { id username } }"
    response = await client.post("/graphql", json={"query": query})
    assert response.status_code == 200
    assert response.json()["data"]["users"] == []


@pytest.mark.asyncio
async def test_login_and_create_post(client):
    # 1. Create user
    await client.post("/graphql", json={
        "query": """mutation {
            createUser(input: {
                username: "author", email: "a@b.com", password: "pass123"
            }) { id }
        }"""
    })

    # 2. Login
    login_resp = await client.post("/graphql", json={
        "query": """mutation {
            login(username: "author", password: "pass123") {
                token
                user { id }
            }
        }"""
    })
    token = login_resp.json()["data"]["login"]["token"]

    # 3. Create post with auth
    post_resp = await client.post(
        "/graphql",
        json={
            "query": """mutation {
                createPost(input: {
                    title: "Test Post"
                    content: "Hello GraphQL"
                    published: true
                    tagNames: ["test", "graphql"]
                }) {
                    id title tags { name }
                }
            }"""
        },
        headers={"Authorization": f"Bearer {token}"},
    )
    data = post_resp.json()["data"]["createPost"]
    assert data["title"] == "Test Post"
    assert len(data["tags"]) == 2


@pytest.mark.asyncio
async def test_unauthorized_create_post(client):
    """Creating a post without auth should fail."""
    resp = await client.post("/graphql", json={
        "query": """mutation {
            createPost(input: {title: "Nope", content: "Unauthorized"}) {
                id
            }
        }"""
    })
    assert "errors" in resp.json()


@pytest.mark.asyncio
async def test_search_posts(client):
    # Setup: create user, login, create posts
    await client.post("/graphql", json={
        "query": """mutation {
            createUser(input: {
                username: "searcher", email: "s@b.com", password: "pass"
            }) { id }
        }"""
    })
    login = await client.post("/graphql", json={
        "query": """mutation {
            login(username: "searcher", password: "pass") { token }
        }"""
    })
    token = login.json()["data"]["login"]["token"]
    headers = {"Authorization": f"Bearer {token}"}

    # Create posts
    for title in ["Python GraphQL Guide", "REST API Tutorial", "GraphQL vs REST"]:
        await client.post("/graphql", json={
            "query": f"""mutation {{
                createPost(input: {{title: "{title}", content: "content", published: true}}) {{ id }}
            }}"""
        }, headers=headers)

    # Search
    resp = await client.post("/graphql", json={
        "query": """{ searchPosts(query: "GraphQL") { title } }"""
    })
    results = resp.json()["data"]["searchPosts"]
    assert len(results) == 2
    assert all("GraphQL" in r["title"] for r in results)

Docker Deployment

# Dockerfile
FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY app/ app/
EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql+asyncpg://user:pass@db:5432/graphql_blog
    depends_on:
      db:
        condition: service_healthy

  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
      POSTGRES_DB: graphql_blog
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U user"]
      interval: 5s
      timeout: 5s
      retries: 5

volumes:
  pgdata:

GraphQL vs REST — When to Use What

Production Checklist

🚀 Build Production Python Apps Faster

Get 50+ ready-to-use Python scripts, templates, and guides — including GraphQL, FastAPI, and more.

Get the AI Toolkit →

Related Articles