Python + MongoDB — Build NoSQL Applications

March 2026 · 22 min read · Python, MongoDB, NoSQL, PyMongo

MongoDB is the most popular NoSQL database for a reason — flexible schemas, powerful queries, and natural fit for Python's dict-based data model. This guide covers everything from basic CRUD to production-grade patterns with aggregation pipelines, async operations, and ODM frameworks.

When to Use MongoDB

For relational data with strict schemas, see our SQLite/PostgreSQL guide. For caching and simple key-value, see our Redis guide.

Setup

# Install PyMongo (sync) and Motor (async)
pip install pymongo motor beanie

# Run MongoDB with Docker
docker run -d --name mongo \
  -p 27017:27017 \
  -e MONGO_INITDB_ROOT_USERNAME=admin \
  -e MONGO_INITDB_ROOT_PASSWORD=secret \
  -v mongodata:/data/db \
  mongo:7

# Or use MongoDB Atlas (free tier):
# https://cloud.mongodb.com → Create Cluster → Get connection string

PyMongo Basics — Synchronous CRUD

Connect and create

from pymongo import MongoClient
from datetime import datetime, timezone

# Connect
client = MongoClient("mongodb://admin:secret@localhost:27017")
db = client["myapp"]          # database
users = db["users"]           # collection

# Insert one
user = {
    "name": "Alice Chen",
    "email": "alice@example.com",
    "role": "admin",
    "skills": ["python", "mongodb", "fastapi"],
    "profile": {
        "bio": "Full-stack developer",
        "location": "San Francisco",
    },
    "created_at": datetime.now(timezone.utc),
}
result = users.insert_one(user)
print(f"Inserted: {result.inserted_id}")  # ObjectId('...')


# Insert many
new_users = [
    {"name": "Bob", "email": "bob@example.com", "role": "user", "skills": ["javascript"]},
    {"name": "Carol", "email": "carol@example.com", "role": "editor", "skills": ["python", "data"]},
    {"name": "Dave", "email": "dave@example.com", "role": "user", "skills": ["rust", "go"]},
]
result = users.insert_many(new_users)
print(f"Inserted {len(result.inserted_ids)} users")

Read (query)

# Find one
alice = users.find_one({"email": "alice@example.com"})
print(alice["name"])  # Alice Chen

# Find many with filter
python_devs = users.find({"skills": "python"})
for dev in python_devs:
    print(f"  {dev['name']} — {dev['skills']}")

# Query operators
from datetime import timedelta

# Users created in last 7 days
recent = users.find({
    "created_at": {"$gte": datetime.now(timezone.utc) - timedelta(days=7)}
})

# Users with 3+ skills
skilled = users.find({
    "skills": {"$size": 3}  # exact size
})

# OR query
admins_or_editors = users.find({
    "$or": [
        {"role": "admin"},
        {"role": "editor"},
    ]
})

# Nested field query
sf_users = users.find({"profile.location": "San Francisco"})

# Projection — only return specific fields
names_only = users.find(
    {"role": "user"},
    {"name": 1, "email": 1, "_id": 0}  # 1=include, 0=exclude
)

# Sorting + pagination
page = users.find().sort("name", 1).skip(20).limit(10)

# Count
total = users.count_documents({"role": "user"})
print(f"Total users: {total}")
💡 Performance tip: Always use find() with a filter and projection. Fetching all fields from all documents is a quick way to kill performance. Index your query fields.

Update

# Update one field
users.update_one(
    {"email": "alice@example.com"},
    {"$set": {"role": "superadmin", "updated_at": datetime.now(timezone.utc)}}
)

# Add to array
users.update_one(
    {"email": "bob@example.com"},
    {"$push": {"skills": "typescript"}}
)

# Remove from array
users.update_one(
    {"email": "bob@example.com"},
    {"$pull": {"skills": "javascript"}}
)

# Increment a counter
users.update_one(
    {"email": "alice@example.com"},
    {"$inc": {"login_count": 1}}
)

# Update many
users.update_many(
    {"role": "user"},
    {"$set": {"is_verified": False}}
)

# Upsert — update if exists, insert if not
users.update_one(
    {"email": "eve@example.com"},
    {"$set": {"name": "Eve", "role": "user"}},
    upsert=True
)

Delete

# Delete one
users.delete_one({"email": "dave@example.com"})

# Delete many
result = users.delete_many({"role": "inactive"})
print(f"Deleted {result.deleted_count} users")

# Delete with condition
users.delete_many({
    "created_at": {"$lt": datetime(2025, 1, 1, tzinfo=timezone.utc)}
})

Data Modeling — Embed vs Reference

The biggest design decision in MongoDB: when to embed documents vs when to reference them.

Embed (denormalize) — for tightly coupled data

# Good: order with items — always read together
order = {
    "order_id": "ORD-001",
    "customer": {
        "name": "Alice",
        "email": "alice@example.com",
    },
    "items": [
        {"product": "Widget A", "qty": 2, "price": 29.99},
        {"product": "Widget B", "qty": 1, "price": 49.99},
    ],
    "total": 109.97,
    "status": "shipped",
    "created_at": datetime.now(timezone.utc),
}
# One read = all data. No JOINs needed.

Reference — for loosely coupled, large, or shared data

# Good: blog post referencing author — authors shared across posts
post = {
    "title": "MongoDB Best Practices",
    "content": "...",
    "author_id": ObjectId("65a1b2c3d4e5f6a7b8c9d0e1"),  # reference
    "tags": ["mongodb", "python"],
}

# Lookup author separately
author = db.authors.find_one({"_id": post["author_id"]})
Embed whenReference when
Data is read togetherData is shared across documents
One-to-few relationshipOne-to-many (thousands+)
Child data is smallChild data is large or grows unbounded
Atomic updates neededIndependent update patterns

Aggregation Pipelines

MongoDB's most powerful feature. Pipelines transform data through stages — like Unix pipes for your database.

# Setup: orders collection
orders = db["orders"]

# Revenue by status
pipeline = [
    {"$group": {
        "_id": "$status",
        "total_revenue": {"$sum": "$total"},
        "order_count": {"$sum": 1},
        "avg_order": {"$avg": "$total"},
    }},
    {"$sort": {"total_revenue": -1}},
]
for doc in orders.aggregate(pipeline):
    print(f"{doc['_id']}: ${doc['total_revenue']:.2f} ({doc['order_count']} orders)")


# Top customers by spending
top_customers = [
    {"$group": {
        "_id": "$customer.email",
        "name": {"$first": "$customer.name"},
        "total_spent": {"$sum": "$total"},
        "orders": {"$sum": 1},
    }},
    {"$sort": {"total_spent": -1}},
    {"$limit": 10},
    {"$project": {
        "_id": 0,
        "email": "$_id",
        "name": 1,
        "total_spent": {"$round": ["$total_spent", 2]},
        "orders": 1,
    }},
]


# Unwind arrays — analyze individual items
popular_products = [
    {"$unwind": "$items"},
    {"$group": {
        "_id": "$items.product",
        "total_qty": {"$sum": "$items.qty"},
        "total_revenue": {"$sum": {"$multiply": ["$items.qty", "$items.price"]}},
    }},
    {"$sort": {"total_revenue": -1}},
    {"$limit": 5},
]


# Date-based aggregation — monthly revenue
monthly_revenue = [
    {"$group": {
        "_id": {
            "year": {"$year": "$created_at"},
            "month": {"$month": "$created_at"},
        },
        "revenue": {"$sum": "$total"},
        "orders": {"$sum": 1},
    }},
    {"$sort": {"_id.year": 1, "_id.month": 1}},
]


# $lookup — JOIN equivalent
orders_with_reviews = [
    {"$lookup": {
        "from": "reviews",
        "localField": "order_id",
        "foreignField": "order_id",
        "as": "reviews",
    }},
    {"$match": {"reviews": {"$ne": []}}},  # only orders with reviews
]

Indexing — Make Queries Fast

# Single field index
users.create_index("email", unique=True)

# Compound index (order matters!)
orders.create_index([("customer.email", 1), ("created_at", -1)])

# Text index for search
db.articles.create_index([("title", "text"), ("content", "text")])
results = db.articles.find({"$text": {"$search": "python mongodb"}})

# TTL index — auto-delete old documents
db.sessions.create_index("created_at", expireAfterSeconds=3600)  # 1h

# Partial index — only index matching documents
users.create_index(
    "email",
    partialFilterExpression={"is_active": True}
)

# Check existing indexes
for idx in users.list_indexes():
    print(f"  {idx['name']}: {idx['key']}")

# Explain a query (check if index is used)
explanation = users.find({"email": "alice@example.com"}).explain()
print(explanation["queryPlanner"]["winningPlan"])
🎯 Index rules: Index fields you filter and sort on. Compound indexes support queries on any prefix (index on [a, b, c] covers queries on a, a+b, and a+b+c). Use .explain() to verify your queries use indexes.

Async with Motor

For async Python apps (FastAPI, aiohttp), use Motor — the async MongoDB driver.

import motor.motor_asyncio
import asyncio


async def main():
    # Connect
    client = motor.motor_asyncio.AsyncIOMotorClient(
        "mongodb://admin:secret@localhost:27017"
    )
    db = client["myapp"]
    users = db["users"]

    # Insert
    result = await users.insert_one({
        "name": "Frank",
        "email": "frank@example.com",
        "role": "user",
    })
    print(f"Inserted: {result.inserted_id}")

    # Find
    user = await users.find_one({"email": "frank@example.com"})
    print(user["name"])

    # Find many (async iteration)
    async for doc in users.find({"role": "user"}).sort("name").limit(10):
        print(f"  {doc['name']}")

    # Aggregate
    pipeline = [
        {"$group": {"_id": "$role", "count": {"$sum": 1}}},
    ]
    async for doc in users.aggregate(pipeline):
        print(f"  {doc['_id']}: {doc['count']}")

    # Update
    await users.update_one(
        {"email": "frank@example.com"},
        {"$set": {"role": "admin"}}
    )

    # Delete
    await users.delete_one({"email": "frank@example.com"})


asyncio.run(main())

Motor + FastAPI

# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, EmailStr
import motor.motor_asyncio

app = FastAPI()
client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017")
db = client["myapp"]


class UserCreate(BaseModel):
    name: str
    email: EmailStr


class UserResponse(BaseModel):
    name: str
    email: str
    role: str


@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(data: UserCreate):
    if await db.users.find_one({"email": data.email}):
        raise HTTPException(400, "Email already exists")

    user = {**data.model_dump(), "role": "user"}
    await db.users.insert_one(user)
    return user


@app.get("/users/{email}", response_model=UserResponse)
async def get_user(email: str):
    user = await db.users.find_one({"email": email}, {"_id": 0})
    if not user:
        raise HTTPException(404, "User not found")
    return user

Beanie ODM — Pydantic Models for MongoDB

Beanie gives you Pydantic validation + MongoDB persistence — like SQLAlchemy for MongoDB.

from beanie import Document, Indexed, init_beanie
from pydantic import EmailStr, Field
from datetime import datetime, timezone
from typing import Optional
import motor.motor_asyncio


class User(Document):
    name: str
    email: Indexed(EmailStr, unique=True)
    role: str = "user"
    skills: list[str] = []
    profile: Optional[dict] = None
    login_count: int = 0
    created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

    class Settings:
        name = "users"  # collection name


class Article(Document):
    title: str
    content: str
    author: User  # can embed other documents
    tags: list[str] = []
    views: int = 0
    published: bool = False

    class Settings:
        name = "articles"


# Initialize
async def init():
    client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017")
    await init_beanie(database=client["myapp"], document_models=[User, Article])


# CRUD with Beanie
async def demo():
    await init()

    # Create
    user = User(name="Grace", email="grace@example.com", skills=["python", "mongo"])
    await user.insert()

    # Read
    grace = await User.find_one(User.email == "grace@example.com")
    print(grace.name)

    # Find many
    python_devs = await User.find(User.skills == "python").to_list()

    # Update
    grace.role = "admin"
    grace.login_count += 1
    await grace.save()

    # Or atomic update
    await User.find_one(User.email == "grace@example.com").update(
        {"$inc": {"login_count": 1}}
    )

    # Aggregation
    role_counts = await User.aggregate(
        [{"$group": {"_id": "$role", "count": {"$sum": 1}}}]
    ).to_list()

    # Delete
    await grace.delete()

Transactions

MongoDB supports multi-document ACID transactions (replica set required).

async def transfer_credits(from_email: str, to_email: str, amount: int):
    """Atomic credit transfer between users."""
    async with await client.start_session() as session:
        async with session.start_transaction():
            # Deduct from sender
            sender = await db.users.find_one_and_update(
                {"email": from_email, "credits": {"$gte": amount}},
                {"$inc": {"credits": -amount}},
                session=session,
            )
            if not sender:
                raise ValueError("Insufficient credits")

            # Add to receiver
            await db.users.update_one(
                {"email": to_email},
                {"$inc": {"credits": amount}},
                session=session,
            )

            # Log the transfer
            await db.transfers.insert_one({
                "from": from_email,
                "to": to_email,
                "amount": amount,
                "timestamp": datetime.now(timezone.utc),
            }, session=session)

            # Transaction auto-commits when context manager exits
            # If any operation fails, everything rolls back

Production Patterns

Connection pooling

# PyMongo handles pooling automatically
client = MongoClient(
    "mongodb://admin:secret@localhost:27017",
    maxPoolSize=50,        # max concurrent connections
    minPoolSize=10,        # keep warm connections
    maxIdleTimeMS=30000,   # close idle after 30s
    connectTimeoutMS=5000, # connection timeout
    serverSelectionTimeoutMS=5000,
)

Change streams — react to data changes

async def watch_orders():
    """React to new orders in real-time."""
    async with db.orders.watch(
        [{"$match": {"operationType": "insert"}}],
        full_document="updateLookup",
    ) as stream:
        async for change in stream:
            order = change["fullDocument"]
            print(f"New order: {order['order_id']} — ${order['total']}")
            await send_notification(order["customer"]["email"])

Bulk operations

from pymongo import InsertOne, UpdateOne, DeleteOne

# Batch operations for performance
operations = [
    InsertOne({"name": "User1", "email": "u1@x.com"}),
    InsertOne({"name": "User2", "email": "u2@x.com"}),
    UpdateOne({"email": "alice@example.com"}, {"$set": {"role": "admin"}}),
    DeleteOne({"email": "old@example.com"}),
]

result = users.bulk_write(operations, ordered=False)  # parallel execution
print(f"Inserted: {result.inserted_count}, Modified: {result.modified_count}")

MongoDB vs SQL — Quick Reference

SQLMongoDB
DatabaseDatabase
TableCollection
RowDocument
ColumnField
JOIN$lookup / embed
WHEREfind({filter})
GROUP BY$group
ALTER TABLENot needed (schema-less)
INDEXcreate_index()

🚀 Want production-ready MongoDB scripts, API templates, and data pipelines?

Get the AI Agent Toolkit →

Related Articles

Need a MongoDB-backed API or data pipeline built? I build Python apps, APIs, and automation tools. Reach out on Telegram →