Python + MongoDB — Build NoSQL Applications
MongoDB is the most popular NoSQL database for a reason — flexible schemas, powerful queries, and natural fit for Python's dict-based data model. This guide covers everything from basic CRUD to production-grade patterns with aggregation pipelines, async operations, and ODM frameworks.
When to Use MongoDB
- Flexible schemas — documents can have different fields (user profiles, product catalogs, logs)
- Nested data — embed related data instead of JOINs (orders with items, articles with comments)
- High write throughput — event logging, IoT data, real-time analytics
- Rapid prototyping — no migrations, just start storing data
For relational data with strict schemas, see our SQLite/PostgreSQL guide. For caching and simple key-value, see our Redis guide.
Setup
# Install PyMongo (sync) and Motor (async)
pip install pymongo motor beanie
# Run MongoDB with Docker
docker run -d --name mongo \
-p 27017:27017 \
-e MONGO_INITDB_ROOT_USERNAME=admin \
-e MONGO_INITDB_ROOT_PASSWORD=secret \
-v mongodata:/data/db \
mongo:7
# Or use MongoDB Atlas (free tier):
# https://cloud.mongodb.com → Create Cluster → Get connection string
PyMongo Basics — Synchronous CRUD
Connect and create
from pymongo import MongoClient
from datetime import datetime, timezone
# Connect
client = MongoClient("mongodb://admin:secret@localhost:27017")
db = client["myapp"] # database
users = db["users"] # collection
# Insert one
user = {
"name": "Alice Chen",
"email": "alice@example.com",
"role": "admin",
"skills": ["python", "mongodb", "fastapi"],
"profile": {
"bio": "Full-stack developer",
"location": "San Francisco",
},
"created_at": datetime.now(timezone.utc),
}
result = users.insert_one(user)
print(f"Inserted: {result.inserted_id}") # ObjectId('...')
# Insert many
new_users = [
{"name": "Bob", "email": "bob@example.com", "role": "user", "skills": ["javascript"]},
{"name": "Carol", "email": "carol@example.com", "role": "editor", "skills": ["python", "data"]},
{"name": "Dave", "email": "dave@example.com", "role": "user", "skills": ["rust", "go"]},
]
result = users.insert_many(new_users)
print(f"Inserted {len(result.inserted_ids)} users")
Read (query)
# Find one
alice = users.find_one({"email": "alice@example.com"})
print(alice["name"]) # Alice Chen
# Find many with filter
python_devs = users.find({"skills": "python"})
for dev in python_devs:
print(f" {dev['name']} — {dev['skills']}")
# Query operators
from datetime import timedelta
# Users created in last 7 days
recent = users.find({
"created_at": {"$gte": datetime.now(timezone.utc) - timedelta(days=7)}
})
# Users with 3+ skills
skilled = users.find({
"skills": {"$size": 3} # exact size
})
# OR query
admins_or_editors = users.find({
"$or": [
{"role": "admin"},
{"role": "editor"},
]
})
# Nested field query
sf_users = users.find({"profile.location": "San Francisco"})
# Projection — only return specific fields
names_only = users.find(
{"role": "user"},
{"name": 1, "email": 1, "_id": 0} # 1=include, 0=exclude
)
# Sorting + pagination
page = users.find().sort("name", 1).skip(20).limit(10)
# Count
total = users.count_documents({"role": "user"})
print(f"Total users: {total}")
Update
# Update one field
users.update_one(
{"email": "alice@example.com"},
{"$set": {"role": "superadmin", "updated_at": datetime.now(timezone.utc)}}
)
# Add to array
users.update_one(
{"email": "bob@example.com"},
{"$push": {"skills": "typescript"}}
)
# Remove from array
users.update_one(
{"email": "bob@example.com"},
{"$pull": {"skills": "javascript"}}
)
# Increment a counter
users.update_one(
{"email": "alice@example.com"},
{"$inc": {"login_count": 1}}
)
# Update many
users.update_many(
{"role": "user"},
{"$set": {"is_verified": False}}
)
# Upsert — update if exists, insert if not
users.update_one(
{"email": "eve@example.com"},
{"$set": {"name": "Eve", "role": "user"}},
upsert=True
)
Delete
# Delete one
users.delete_one({"email": "dave@example.com"})
# Delete many
result = users.delete_many({"role": "inactive"})
print(f"Deleted {result.deleted_count} users")
# Delete with condition
users.delete_many({
"created_at": {"$lt": datetime(2025, 1, 1, tzinfo=timezone.utc)}
})
Data Modeling — Embed vs Reference
The biggest design decision in MongoDB: when to embed documents vs when to reference them.
Embed (denormalize) — for tightly coupled data
# Good: order with items — always read together
order = {
"order_id": "ORD-001",
"customer": {
"name": "Alice",
"email": "alice@example.com",
},
"items": [
{"product": "Widget A", "qty": 2, "price": 29.99},
{"product": "Widget B", "qty": 1, "price": 49.99},
],
"total": 109.97,
"status": "shipped",
"created_at": datetime.now(timezone.utc),
}
# One read = all data. No JOINs needed.
Reference — for loosely coupled, large, or shared data
# Good: blog post referencing author — authors shared across posts
post = {
"title": "MongoDB Best Practices",
"content": "...",
"author_id": ObjectId("65a1b2c3d4e5f6a7b8c9d0e1"), # reference
"tags": ["mongodb", "python"],
}
# Lookup author separately
author = db.authors.find_one({"_id": post["author_id"]})
| Embed when | Reference when |
|---|---|
| Data is read together | Data is shared across documents |
| One-to-few relationship | One-to-many (thousands+) |
| Child data is small | Child data is large or grows unbounded |
| Atomic updates needed | Independent update patterns |
Aggregation Pipelines
MongoDB's most powerful feature. Pipelines transform data through stages — like Unix pipes for your database.
# Setup: orders collection
orders = db["orders"]
# Revenue by status
pipeline = [
{"$group": {
"_id": "$status",
"total_revenue": {"$sum": "$total"},
"order_count": {"$sum": 1},
"avg_order": {"$avg": "$total"},
}},
{"$sort": {"total_revenue": -1}},
]
for doc in orders.aggregate(pipeline):
print(f"{doc['_id']}: ${doc['total_revenue']:.2f} ({doc['order_count']} orders)")
# Top customers by spending
top_customers = [
{"$group": {
"_id": "$customer.email",
"name": {"$first": "$customer.name"},
"total_spent": {"$sum": "$total"},
"orders": {"$sum": 1},
}},
{"$sort": {"total_spent": -1}},
{"$limit": 10},
{"$project": {
"_id": 0,
"email": "$_id",
"name": 1,
"total_spent": {"$round": ["$total_spent", 2]},
"orders": 1,
}},
]
# Unwind arrays — analyze individual items
popular_products = [
{"$unwind": "$items"},
{"$group": {
"_id": "$items.product",
"total_qty": {"$sum": "$items.qty"},
"total_revenue": {"$sum": {"$multiply": ["$items.qty", "$items.price"]}},
}},
{"$sort": {"total_revenue": -1}},
{"$limit": 5},
]
# Date-based aggregation — monthly revenue
monthly_revenue = [
{"$group": {
"_id": {
"year": {"$year": "$created_at"},
"month": {"$month": "$created_at"},
},
"revenue": {"$sum": "$total"},
"orders": {"$sum": 1},
}},
{"$sort": {"_id.year": 1, "_id.month": 1}},
]
# $lookup — JOIN equivalent
orders_with_reviews = [
{"$lookup": {
"from": "reviews",
"localField": "order_id",
"foreignField": "order_id",
"as": "reviews",
}},
{"$match": {"reviews": {"$ne": []}}}, # only orders with reviews
]
Indexing — Make Queries Fast
# Single field index
users.create_index("email", unique=True)
# Compound index (order matters!)
orders.create_index([("customer.email", 1), ("created_at", -1)])
# Text index for search
db.articles.create_index([("title", "text"), ("content", "text")])
results = db.articles.find({"$text": {"$search": "python mongodb"}})
# TTL index — auto-delete old documents
db.sessions.create_index("created_at", expireAfterSeconds=3600) # 1h
# Partial index — only index matching documents
users.create_index(
"email",
partialFilterExpression={"is_active": True}
)
# Check existing indexes
for idx in users.list_indexes():
print(f" {idx['name']}: {idx['key']}")
# Explain a query (check if index is used)
explanation = users.find({"email": "alice@example.com"}).explain()
print(explanation["queryPlanner"]["winningPlan"])
Async with Motor
For async Python apps (FastAPI, aiohttp), use Motor — the async MongoDB driver.
import motor.motor_asyncio
import asyncio
async def main():
# Connect
client = motor.motor_asyncio.AsyncIOMotorClient(
"mongodb://admin:secret@localhost:27017"
)
db = client["myapp"]
users = db["users"]
# Insert
result = await users.insert_one({
"name": "Frank",
"email": "frank@example.com",
"role": "user",
})
print(f"Inserted: {result.inserted_id}")
# Find
user = await users.find_one({"email": "frank@example.com"})
print(user["name"])
# Find many (async iteration)
async for doc in users.find({"role": "user"}).sort("name").limit(10):
print(f" {doc['name']}")
# Aggregate
pipeline = [
{"$group": {"_id": "$role", "count": {"$sum": 1}}},
]
async for doc in users.aggregate(pipeline):
print(f" {doc['_id']}: {doc['count']}")
# Update
await users.update_one(
{"email": "frank@example.com"},
{"$set": {"role": "admin"}}
)
# Delete
await users.delete_one({"email": "frank@example.com"})
asyncio.run(main())
Motor + FastAPI
# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, EmailStr
import motor.motor_asyncio
app = FastAPI()
client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017")
db = client["myapp"]
class UserCreate(BaseModel):
name: str
email: EmailStr
class UserResponse(BaseModel):
name: str
email: str
role: str
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(data: UserCreate):
if await db.users.find_one({"email": data.email}):
raise HTTPException(400, "Email already exists")
user = {**data.model_dump(), "role": "user"}
await db.users.insert_one(user)
return user
@app.get("/users/{email}", response_model=UserResponse)
async def get_user(email: str):
user = await db.users.find_one({"email": email}, {"_id": 0})
if not user:
raise HTTPException(404, "User not found")
return user
Beanie ODM — Pydantic Models for MongoDB
Beanie gives you Pydantic validation + MongoDB persistence — like SQLAlchemy for MongoDB.
from beanie import Document, Indexed, init_beanie
from pydantic import EmailStr, Field
from datetime import datetime, timezone
from typing import Optional
import motor.motor_asyncio
class User(Document):
name: str
email: Indexed(EmailStr, unique=True)
role: str = "user"
skills: list[str] = []
profile: Optional[dict] = None
login_count: int = 0
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class Settings:
name = "users" # collection name
class Article(Document):
title: str
content: str
author: User # can embed other documents
tags: list[str] = []
views: int = 0
published: bool = False
class Settings:
name = "articles"
# Initialize
async def init():
client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017")
await init_beanie(database=client["myapp"], document_models=[User, Article])
# CRUD with Beanie
async def demo():
await init()
# Create
user = User(name="Grace", email="grace@example.com", skills=["python", "mongo"])
await user.insert()
# Read
grace = await User.find_one(User.email == "grace@example.com")
print(grace.name)
# Find many
python_devs = await User.find(User.skills == "python").to_list()
# Update
grace.role = "admin"
grace.login_count += 1
await grace.save()
# Or atomic update
await User.find_one(User.email == "grace@example.com").update(
{"$inc": {"login_count": 1}}
)
# Aggregation
role_counts = await User.aggregate(
[{"$group": {"_id": "$role", "count": {"$sum": 1}}}]
).to_list()
# Delete
await grace.delete()
Transactions
MongoDB supports multi-document ACID transactions (replica set required).
async def transfer_credits(from_email: str, to_email: str, amount: int):
"""Atomic credit transfer between users."""
async with await client.start_session() as session:
async with session.start_transaction():
# Deduct from sender
sender = await db.users.find_one_and_update(
{"email": from_email, "credits": {"$gte": amount}},
{"$inc": {"credits": -amount}},
session=session,
)
if not sender:
raise ValueError("Insufficient credits")
# Add to receiver
await db.users.update_one(
{"email": to_email},
{"$inc": {"credits": amount}},
session=session,
)
# Log the transfer
await db.transfers.insert_one({
"from": from_email,
"to": to_email,
"amount": amount,
"timestamp": datetime.now(timezone.utc),
}, session=session)
# Transaction auto-commits when context manager exits
# If any operation fails, everything rolls back
Production Patterns
Connection pooling
# PyMongo handles pooling automatically
client = MongoClient(
"mongodb://admin:secret@localhost:27017",
maxPoolSize=50, # max concurrent connections
minPoolSize=10, # keep warm connections
maxIdleTimeMS=30000, # close idle after 30s
connectTimeoutMS=5000, # connection timeout
serverSelectionTimeoutMS=5000,
)
Change streams — react to data changes
async def watch_orders():
"""React to new orders in real-time."""
async with db.orders.watch(
[{"$match": {"operationType": "insert"}}],
full_document="updateLookup",
) as stream:
async for change in stream:
order = change["fullDocument"]
print(f"New order: {order['order_id']} — ${order['total']}")
await send_notification(order["customer"]["email"])
Bulk operations
from pymongo import InsertOne, UpdateOne, DeleteOne
# Batch operations for performance
operations = [
InsertOne({"name": "User1", "email": "u1@x.com"}),
InsertOne({"name": "User2", "email": "u2@x.com"}),
UpdateOne({"email": "alice@example.com"}, {"$set": {"role": "admin"}}),
DeleteOne({"email": "old@example.com"}),
]
result = users.bulk_write(operations, ordered=False) # parallel execution
print(f"Inserted: {result.inserted_count}, Modified: {result.modified_count}")
MongoDB vs SQL — Quick Reference
| SQL | MongoDB |
|---|---|
| Database | Database |
| Table | Collection |
| Row | Document |
| Column | Field |
| JOIN | $lookup / embed |
| WHERE | find({filter}) |
| GROUP BY | $group |
| ALTER TABLE | Not needed (schema-less) |
| INDEX | create_index() |
🚀 Want production-ready MongoDB scripts, API templates, and data pipelines?
Related Articles
- Python Database Operations — SQLite, PostgreSQL & SQLAlchemy
- Python + Redis — Caching, Queues & Real-Time Data
- Build a REST API with FastAPI — combine with MongoDB backend
- Python Async Programming — Motor async patterns
- Build a Data Pipeline in Python — ETL with MongoDB as source/sink
- Python Microservices — database-per-service with MongoDB
Need a MongoDB-backed API or data pipeline built? I build Python apps, APIs, and automation tools. Reach out on Telegram →