REST APIs are great, but GraphQL gives clients the power to request exactly the data they need — no over-fetching, no under-fetching. Python's Strawberry framework makes building type-safe GraphQL APIs a joy, especially when paired with FastAPI.
This guide covers everything from basic schema design to production patterns: queries, mutations, subscriptions, authentication, the N+1 problem with DataLoader, file uploads, and testing.
GraphQL solves real problems that REST APIs create at scale:
# Create project
mkdir graphql-api && cd graphql-api
python -m venv .venv && source .venv/bin/activate
# Install dependencies
pip install 'strawberry-graphql[fastapi]' uvicorn sqlalchemy aiosqlite
# Project structure
# graphql-api/
# ├── app/
# │ ├── __init__.py
# │ ├── main.py # FastAPI + Strawberry
# │ ├── schema.py # GraphQL types & resolvers
# │ ├── models.py # SQLAlchemy models
# │ ├── database.py # DB connection
# │ ├── auth.py # Authentication
# │ └── dataloaders.py # DataLoader for N+1
# ├── tests/
# │ └── test_api.py
# └── requirements.txt
# app/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, DeclarativeBase
DATABASE_URL = "sqlite+aiosqlite:///./app.db"
engine = create_async_engine(DATABASE_URL, echo=False)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_session() -> AsyncSession:
async with async_session() as session:
yield session
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# app/models.py
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Text, Boolean
from sqlalchemy.orm import relationship
from app.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
hashed_password = Column(String(128), nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
posts = relationship("Post", back_populates="author", lazy="selectin")
comments = relationship("Comment", back_populates="author", lazy="selectin")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(Text, nullable=False)
published = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
author = relationship("User", back_populates="posts")
comments = relationship("Comment", back_populates="post", lazy="selectin")
tags = relationship("Tag", secondary="post_tags", back_populates="posts")
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True)
content = Column(Text, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
post_id = Column(Integer, ForeignKey("posts.id"), nullable=False)
author = relationship("User", back_populates="comments")
post = relationship("Post", back_populates="comments")
class Tag(Base):
__tablename__ = "tags"
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False)
posts = relationship("Post", secondary="post_tags", back_populates="tags")
# Association table
from sqlalchemy import Table
post_tags = Table(
"post_tags", Base.metadata,
Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True),
)
Strawberry uses Python dataclasses and type hints to define your GraphQL schema. No SDL files — it's all Python:
# app/schema.py
import strawberry
from datetime import datetime
from typing import Optional
@strawberry.type
class UserType:
id: int
username: str
email: str
is_active: bool
created_at: datetime
posts: list["PostType"]
@strawberry.type
class PostType:
id: int
title: str
content: str
published: bool
created_at: datetime
updated_at: datetime
author: UserType
comments: list["CommentType"]
tags: list["TagType"]
@strawberry.type
class CommentType:
id: int
content: str
created_at: datetime
author: UserType
@strawberry.type
class TagType:
id: int
name: str
# app/schema.py (continued)
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.database import async_session
from app.models import User, Post, Tag
def db_user_to_type(user: User) -> UserType:
"""Convert SQLAlchemy model to Strawberry type."""
return UserType(
id=user.id,
username=user.username,
email=user.email,
is_active=user.is_active,
created_at=user.created_at,
posts=[db_post_to_type(p) for p in user.posts],
)
def db_post_to_type(post: Post) -> PostType:
return PostType(
id=post.id,
title=post.title,
content=post.content,
published=post.published,
created_at=post.created_at,
updated_at=post.updated_at,
author=UserType(
id=post.author.id,
username=post.author.username,
email=post.author.email,
is_active=post.author.is_active,
created_at=post.author.created_at,
posts=[],
),
comments=[
CommentType(
id=c.id,
content=c.content,
created_at=c.created_at,
author=UserType(
id=c.author.id,
username=c.author.username,
email=c.author.email,
is_active=c.author.is_active,
created_at=c.author.created_at,
posts=[],
),
)
for c in post.comments
],
tags=[TagType(id=t.id, name=t.name) for t in post.tags],
)
@strawberry.type
class Query:
@strawberry.field
async def users(self) -> list[UserType]:
async with async_session() as session:
result = await session.execute(
select(User).options(selectinload(User.posts))
)
users = result.scalars().all()
return [db_user_to_type(u) for u in users]
@strawberry.field
async def user(self, id: int) -> Optional[UserType]:
async with async_session() as session:
user = await session.get(User, id)
if not user:
return None
return db_user_to_type(user)
@strawberry.field
async def posts(
self,
published_only: bool = True,
limit: int = 20,
offset: int = 0,
) -> list[PostType]:
async with async_session() as session:
query = select(Post).options(
selectinload(Post.author),
selectinload(Post.comments),
selectinload(Post.tags),
)
if published_only:
query = query.where(Post.published == True)
query = query.order_by(Post.created_at.desc()).limit(limit).offset(offset)
result = await session.execute(query)
posts = result.scalars().all()
return [db_post_to_type(p) for p in posts]
@strawberry.field
async def post(self, id: int) -> Optional[PostType]:
async with async_session() as session:
result = await session.execute(
select(Post)
.where(Post.id == id)
.options(
selectinload(Post.author),
selectinload(Post.comments),
selectinload(Post.tags),
)
)
post = result.scalar_one_or_none()
if not post:
return None
return db_post_to_type(post)
@strawberry.field
async def search_posts(self, query: str) -> list[PostType]:
"""Full-text search across titles and content."""
async with async_session() as session:
result = await session.execute(
select(Post)
.where(
(Post.title.ilike(f"%{query}%"))
| (Post.content.ilike(f"%{query}%"))
)
.options(selectinload(Post.author), selectinload(Post.tags))
)
posts = result.scalars().all()
return [db_post_to_type(p) for p in posts]
Now clients can query exactly what they need:
# Fetch only usernames and post titles — no over-fetching
query {
users {
username
posts {
title
}
}
}
# Get a single post with author and comments
query {
post(id: 1) {
title
content
author {
username
}
comments {
content
author { username }
}
tags { name }
}
}
# Search with pagination
query {
posts(publishedOnly: true, limit: 10, offset: 0) {
title
createdAt
}
}
# app/schema.py (continued)
import bcrypt
@strawberry.input
class CreateUserInput:
username: str
email: str
password: str
@strawberry.input
class CreatePostInput:
title: str
content: str
published: bool = False
tag_names: list[str] = strawberry.field(default_factory=list)
@strawberry.input
class UpdatePostInput:
title: Optional[str] = None
content: Optional[str] = None
published: Optional[bool] = None
@strawberry.type
class MutationResult:
success: bool
message: str
@strawberry.type
class Mutation:
@strawberry.mutation
async def create_user(self, input: CreateUserInput) -> UserType:
async with async_session() as session:
# Check uniqueness
existing = await session.execute(
select(User).where(
(User.username == input.username) | (User.email == input.email)
)
)
if existing.scalar_one_or_none():
raise ValueError("Username or email already taken")
hashed = bcrypt.hashpw(
input.password.encode(), bcrypt.gensalt()
).decode()
user = User(
username=input.username,
email=input.email,
hashed_password=hashed,
)
session.add(user)
await session.commit()
await session.refresh(user)
return UserType(
id=user.id,
username=user.username,
email=user.email,
is_active=user.is_active,
created_at=user.created_at,
posts=[],
)
@strawberry.mutation
async def create_post(
self, input: CreatePostInput, info: strawberry.types.Info
) -> PostType:
# Get current user from context
user = info.context.get("current_user")
if not user:
raise PermissionError("Authentication required")
async with async_session() as session:
# Get or create tags
tags = []
for tag_name in input.tag_names:
result = await session.execute(
select(Tag).where(Tag.name == tag_name.lower())
)
tag = result.scalar_one_or_none()
if not tag:
tag = Tag(name=tag_name.lower())
session.add(tag)
tags.append(tag)
post = Post(
title=input.title,
content=input.content,
published=input.published,
author_id=user.id,
tags=tags,
)
session.add(post)
await session.commit()
await session.refresh(post)
return db_post_to_type(post)
@strawberry.mutation
async def update_post(
self, id: int, input: UpdatePostInput, info: strawberry.types.Info
) -> PostType:
user = info.context.get("current_user")
if not user:
raise PermissionError("Authentication required")
async with async_session() as session:
post = await session.get(Post, id)
if not post:
raise ValueError(f"Post {id} not found")
if post.author_id != user.id:
raise PermissionError("Not your post")
if input.title is not None:
post.title = input.title
if input.content is not None:
post.content = input.content
if input.published is not None:
post.published = input.published
await session.commit()
await session.refresh(post)
return db_post_to_type(post)
@strawberry.mutation
async def delete_post(
self, id: int, info: strawberry.types.Info
) -> MutationResult:
user = info.context.get("current_user")
if not user:
raise PermissionError("Authentication required")
async with async_session() as session:
post = await session.get(Post, id)
if not post:
return MutationResult(success=False, message="Post not found")
if post.author_id != user.id:
return MutationResult(success=False, message="Not your post")
await session.delete(post)
await session.commit()
return MutationResult(success=True, message="Post deleted")
@strawberry.mutation
async def add_comment(
self, post_id: int, content: str, info: strawberry.types.Info
) -> CommentType:
user = info.context.get("current_user")
if not user:
raise PermissionError("Authentication required")
async with async_session() as session:
from app.models import Comment as CommentModel
comment = CommentModel(
content=content,
author_id=user.id,
post_id=post_id,
)
session.add(comment)
await session.commit()
await session.refresh(comment)
return CommentType(
id=comment.id,
content=comment.content,
created_at=comment.created_at,
author=UserType(
id=user.id,
username=user.username,
email=user.email,
is_active=user.is_active,
created_at=user.created_at,
posts=[],
),
)
Using mutations:
# Create a user
mutation {
createUser(input: {
username: "alice"
email: "alice@example.com"
password: "secure123"
}) {
id
username
}
}
# Create a post with tags
mutation {
createPost(input: {
title: "GraphQL is awesome"
content: "Here's why..."
published: true
tagNames: ["graphql", "python", "tutorial"]
}) {
id
title
tags { name }
}
}
# app/auth.py
import jwt
import bcrypt
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy import select
from app.database import async_session
from app.models import User
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
def create_access_token(user_id: int) -> str:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {"sub": str(user_id), "exp": expire}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str) -> Optional[int]:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return int(payload["sub"])
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
async def authenticate_user(username: str, password: str) -> Optional[User]:
async with async_session() as session:
result = await session.execute(
select(User).where(User.username == username)
)
user = result.scalar_one_or_none()
if not user:
return None
if not bcrypt.checkpw(password.encode(), user.hashed_password.encode()):
return None
return user
async def get_user_from_token(token: str) -> Optional[User]:
user_id = verify_token(token)
if not user_id:
return None
async with async_session() as session:
return await session.get(User, user_id)
# Add to schema.py Mutation class
@strawberry.type
class AuthPayload:
token: str
user: UserType
# Inside Mutation class:
@strawberry.mutation
async def login(self, username: str, password: str) -> AuthPayload:
user = await authenticate_user(username, password)
if not user:
raise ValueError("Invalid credentials")
token = create_access_token(user.id)
return AuthPayload(
token=token,
user=UserType(
id=user.id,
username=user.username,
email=user.email,
is_active=user.is_active,
created_at=user.created_at,
posts=[],
),
)
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from strawberry.fastapi import GraphQLRouter
import strawberry
from app.schema import Query, Mutation
from app.database import init_db
from app.auth import get_user_from_token
async def get_context(request: Request) -> dict:
"""Extract auth token and resolve current user."""
context = {"request": request}
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
user = await get_user_from_token(token)
if user:
context["current_user"] = user
return context
schema = strawberry.Schema(query=Query, mutation=Mutation)
graphql_router = GraphQLRouter(schema, context_getter=get_context)
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
yield
app = FastAPI(title="GraphQL Blog API", lifespan=lifespan)
app.include_router(graphql_router, prefix="/graphql")
# Optional: REST health check
@app.get("/health")
async def health():
return {"status": "ok"}
# Run the server
uvicorn app.main:app --reload
# Open GraphiQL playground
# http://localhost:8000/graphql
The N+1 problem is GraphQL's biggest performance pitfall. If you query 50 posts and each resolves its author, that's 51 database queries. DataLoader batches them into one:
# app/dataloaders.py
from typing import Any
from strawberry.dataloader import DataLoader
from sqlalchemy import select
from app.database import async_session
from app.models import User, Post, Comment
async def load_users(keys: list[int]) -> list[User]:
"""Batch-load users by ID."""
async with async_session() as session:
result = await session.execute(
select(User).where(User.id.in_(keys))
)
users = {u.id: u for u in result.scalars().all()}
# Return in same order as keys, None for missing
return [users.get(key) for key in keys]
async def load_posts_by_author(keys: list[int]) -> list[list[Post]]:
"""Batch-load posts grouped by author_id."""
async with async_session() as session:
result = await session.execute(
select(Post).where(Post.author_id.in_(keys))
)
posts = result.scalars().all()
# Group by author
grouped: dict[int, list[Post]] = {key: [] for key in keys}
for post in posts:
grouped[post.author_id].append(post)
return [grouped[key] for key in keys]
async def load_comments_by_post(keys: list[int]) -> list[list[Comment]]:
"""Batch-load comments grouped by post_id."""
async with async_session() as session:
result = await session.execute(
select(Comment).where(Comment.post_id.in_(keys))
)
comments = result.scalars().all()
grouped: dict[int, list[Comment]] = {key: [] for key in keys}
for comment in comments:
grouped[comment.post_id].append(comment)
return [grouped[key] for key in keys]
def create_dataloaders() -> dict[str, DataLoader]:
"""Create fresh DataLoaders for each request."""
return {
"user_loader": DataLoader(load_fn=load_users),
"posts_by_author": DataLoader(load_fn=load_posts_by_author),
"comments_by_post": DataLoader(load_fn=load_comments_by_post),
}
# Updated types using DataLoader for lazy resolution
@strawberry.type
class PostTypeWithLoader:
id: int
title: str
content: str
published: bool
created_at: datetime
updated_at: datetime
author_id: strawberry.Private[int] # Hidden from schema
@strawberry.field
async def author(self, info: strawberry.types.Info) -> UserType:
loader = info.context["dataloaders"]["user_loader"]
user = await loader.load(self.author_id)
return UserType(
id=user.id,
username=user.username,
email=user.email,
is_active=user.is_active,
created_at=user.created_at,
posts=[], # Avoid circular loading
)
@strawberry.field
async def comments(self, info: strawberry.types.Info) -> list[CommentType]:
loader = info.context["dataloaders"]["comments_by_post"]
comments = await loader.load(self.id)
return [
CommentType(
id=c.id,
content=c.content,
created_at=c.created_at,
author=UserType(
id=c.author_id, username="", email="",
is_active=True, created_at=c.created_at, posts=[],
),
)
for c in comments
]
# Update context to include dataloaders
async def get_context(request: Request) -> dict:
context = {
"request": request,
"dataloaders": create_dataloaders(), # Fresh per request!
}
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
user = await get_user_from_token(token)
if user:
context["current_user"] = user
return context
Now 50 posts with authors = 2 queries (posts + batch users), not 51.
# app/schema.py
import asyncio
from typing import AsyncGenerator
# Simple in-memory pub/sub (use Redis in production)
class PubSub:
def __init__(self):
self.subscribers: dict[str, list[asyncio.Queue]] = {}
async def publish(self, channel: str, data: Any):
for queue in self.subscribers.get(channel, []):
await queue.put(data)
async def subscribe(self, channel: str) -> AsyncGenerator:
queue: asyncio.Queue = asyncio.Queue()
self.subscribers.setdefault(channel, []).append(queue)
try:
while True:
data = await queue.get()
yield data
finally:
self.subscribers[channel].remove(queue)
pubsub = PubSub()
@strawberry.type
class Subscription:
@strawberry.subscription
async def new_post(self) -> AsyncGenerator[PostType, None]:
"""Subscribe to new posts as they're created."""
async for post_data in pubsub.subscribe("new_post"):
yield post_data
@strawberry.subscription
async def new_comment(
self, post_id: int
) -> AsyncGenerator[CommentType, None]:
"""Subscribe to comments on a specific post."""
async for comment_data in pubsub.subscribe(f"comment:{post_id}"):
yield comment_data
@strawberry.subscription
async def post_count(self) -> AsyncGenerator[int, None]:
"""Periodically emit total post count."""
while True:
async with async_session() as session:
from sqlalchemy import func
result = await session.execute(
select(func.count(Post.id))
)
count = result.scalar()
yield count
await asyncio.sleep(5)
# Update schema to include subscriptions
schema = strawberry.Schema(
query=Query,
mutation=Mutation,
subscription=Subscription,
)
# Publish events from mutations:
# In create_post mutation, after commit:
# await pubsub.publish("new_post", post_type)
# Client-side subscription (JavaScript)
const ws = new WebSocket("ws://localhost:8000/graphql");
ws.onopen = () => {
ws.send(JSON.stringify({
type: "subscribe",
payload: {
query: `subscription { newPost { title author { username } } }`
}
}));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log("New post:", data.payload.data.newPost);
};
# Custom error types for better client UX
@strawberry.type
class FieldError:
field: str
message: str
@strawberry.type
class PostResult:
post: Optional[PostType] = None
errors: list[FieldError] = strawberry.field(default_factory=list)
@property
def success(self) -> bool:
return len(self.errors) == 0
# Use in mutations
@strawberry.mutation
async def create_post_safe(
self, input: CreatePostInput, info: strawberry.types.Info
) -> PostResult:
errors = []
if len(input.title) < 3:
errors.append(FieldError(field="title", message="Title too short (min 3 chars)"))
if len(input.content) < 10:
errors.append(FieldError(field="content", message="Content too short (min 10 chars)"))
if errors:
return PostResult(errors=errors)
user = info.context.get("current_user")
if not user:
return PostResult(errors=[
FieldError(field="auth", message="Authentication required")
])
# ... create post logic
return PostResult(post=post_type)
# Relay-style cursor pagination
import base64
@strawberry.type
class PageInfo:
has_next_page: bool
has_previous_page: bool
start_cursor: Optional[str] = None
end_cursor: Optional[str] = None
@strawberry.type
class PostEdge:
cursor: str
node: PostType
@strawberry.type
class PostConnection:
edges: list[PostEdge]
page_info: PageInfo
total_count: int
def encode_cursor(id: int) -> str:
return base64.b64encode(f"post:{id}".encode()).decode()
def decode_cursor(cursor: str) -> int:
decoded = base64.b64decode(cursor.encode()).decode()
return int(decoded.split(":")[1])
# In Query class:
@strawberry.field
async def posts_paginated(
self,
first: int = 10,
after: Optional[str] = None,
) -> PostConnection:
async with async_session() as session:
query = select(Post).where(Post.published == True)
if after:
cursor_id = decode_cursor(after)
query = query.where(Post.id > cursor_id)
# Get total count
count_result = await session.execute(
select(func.count(Post.id)).where(Post.published == True)
)
total = count_result.scalar()
# Fetch one extra to check has_next_page
query = query.order_by(Post.id).limit(first + 1)
result = await session.execute(query)
posts = result.scalars().all()
has_next = len(posts) > first
posts = posts[:first]
edges = [
PostEdge(cursor=encode_cursor(p.id), node=db_post_to_type(p))
for p in posts
]
return PostConnection(
edges=edges,
page_info=PageInfo(
has_next_page=has_next,
has_previous_page=after is not None,
start_cursor=edges[0].cursor if edges else None,
end_cursor=edges[-1].cursor if edges else None,
),
total_count=total,
)
# Query with cursor pagination
query {
postsPaginated(first: 5) {
totalCount
pageInfo {
hasNextPage
endCursor
}
edges {
cursor
node {
title
createdAt
}
}
}
}
# Next page
query {
postsPaginated(first: 5, after: "cG9zdDo1") {
edges {
node { title }
}
pageInfo { hasNextPage endCursor }
}
}
# Strawberry supports multipart file uploads
from strawberry.file_uploads import Upload
@strawberry.mutation
async def upload_avatar(
self, file: Upload, info: strawberry.types.Info
) -> str:
user = info.context.get("current_user")
if not user:
raise PermissionError("Authentication required")
content = await file.read()
filename = f"avatars/{user.id}_{file.filename}"
# Save file (use S3/GCS in production)
import aiofiles
async with aiofiles.open(f"./uploads/{filename}", "wb") as f:
await f.write(content)
return f"/uploads/{filename}"
# Client usage with curl:
# curl -X POST http://localhost:8000/graphql \
# -F operations='{"query":"mutation($file: Upload!) { uploadAvatar(file: $file) }","variables":{"file":null}}' \
# -F map='{"0":["variables.file"]}' \
# -F 0=@avatar.jpg
# Prevent abusive queries
from strawberry.extensions import QueryDepthLimiter
from strawberry.extensions.query_depth_limiter import QueryDepthLimiter
schema = strawberry.Schema(
query=Query,
mutation=Mutation,
subscription=Subscription,
extensions=[
QueryDepthLimiter(max_depth=10),
],
)
# Custom cost analysis
from strawberry.extensions import Extension
class QueryCostExtension(Extension):
MAX_COST = 1000
def on_operation(self):
yield
# After execution, log the cost
result = self.execution_context.result
if result and result.data:
cost = self._estimate_cost(result.data)
if cost > self.MAX_COST:
raise ValueError(f"Query cost {cost} exceeds limit {self.MAX_COST}")
def _estimate_cost(self, data, depth=0) -> int:
if isinstance(data, list):
return sum(self._estimate_cost(item, depth + 1) for item in data)
if isinstance(data, dict):
return 1 + sum(
self._estimate_cost(v, depth + 1) for v in data.values()
)
return 1
# tests/test_api.py
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
from app.database import init_db, engine, Base
@pytest.fixture(autouse=True)
async def setup_db():
"""Fresh database for each test."""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.mark.asyncio
async def test_create_user(client):
query = """
mutation {
createUser(input: {
username: "testuser"
email: "test@example.com"
password: "password123"
}) {
id
username
email
}
}
"""
response = await client.post("/graphql", json={"query": query})
assert response.status_code == 200
data = response.json()["data"]["createUser"]
assert data["username"] == "testuser"
assert data["email"] == "test@example.com"
assert "id" in data
@pytest.mark.asyncio
async def test_query_users_empty(client):
query = "{ users { id username } }"
response = await client.post("/graphql", json={"query": query})
assert response.status_code == 200
assert response.json()["data"]["users"] == []
@pytest.mark.asyncio
async def test_login_and_create_post(client):
# 1. Create user
await client.post("/graphql", json={
"query": """mutation {
createUser(input: {
username: "author", email: "a@b.com", password: "pass123"
}) { id }
}"""
})
# 2. Login
login_resp = await client.post("/graphql", json={
"query": """mutation {
login(username: "author", password: "pass123") {
token
user { id }
}
}"""
})
token = login_resp.json()["data"]["login"]["token"]
# 3. Create post with auth
post_resp = await client.post(
"/graphql",
json={
"query": """mutation {
createPost(input: {
title: "Test Post"
content: "Hello GraphQL"
published: true
tagNames: ["test", "graphql"]
}) {
id title tags { name }
}
}"""
},
headers={"Authorization": f"Bearer {token}"},
)
data = post_resp.json()["data"]["createPost"]
assert data["title"] == "Test Post"
assert len(data["tags"]) == 2
@pytest.mark.asyncio
async def test_unauthorized_create_post(client):
"""Creating a post without auth should fail."""
resp = await client.post("/graphql", json={
"query": """mutation {
createPost(input: {title: "Nope", content: "Unauthorized"}) {
id
}
}"""
})
assert "errors" in resp.json()
@pytest.mark.asyncio
async def test_search_posts(client):
# Setup: create user, login, create posts
await client.post("/graphql", json={
"query": """mutation {
createUser(input: {
username: "searcher", email: "s@b.com", password: "pass"
}) { id }
}"""
})
login = await client.post("/graphql", json={
"query": """mutation {
login(username: "searcher", password: "pass") { token }
}"""
})
token = login.json()["data"]["login"]["token"]
headers = {"Authorization": f"Bearer {token}"}
# Create posts
for title in ["Python GraphQL Guide", "REST API Tutorial", "GraphQL vs REST"]:
await client.post("/graphql", json={
"query": f"""mutation {{
createPost(input: {{title: "{title}", content: "content", published: true}}) {{ id }}
}}"""
}, headers=headers)
# Search
resp = await client.post("/graphql", json={
"query": """{ searchPosts(query: "GraphQL") { title } }"""
})
results = resp.json()["data"]["searchPosts"]
assert len(results) == 2
assert all("GraphQL" in r["title"] for r in results)
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app/ app/
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql+asyncpg://user:pass@db:5432/graphql_blog
depends_on:
db:
condition: service_healthy
db:
image: postgres:16-alpine
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: graphql_blog
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U user"]
interval: 5s
timeout: 5s
retries: 5
volumes:
pgdata:
🚀 Build Production Python Apps Faster
Get 50+ ready-to-use Python scripts, templates, and guides — including GraphQL, FastAPI, and more.