Every serious Python application needs a database. Whether you're building an API backend, a data pipeline, or a CLI tool that tracks state — understanding database operations is essential. This guide covers everything from SQLite's zero-config simplicity to production PostgreSQL with SQLAlchemy ORM.
SQLite ships with Python's standard library. No server, no installation, no configuration. It's perfect for prototypes, small apps, local caches, and embedded systems.
import sqlite3
from pathlib import Path
def create_database(db_path: str = "app.db"):
"""Create a database with a users table."""
conn = sqlite3.connect(db_path)
# Return rows as dictionaries instead of tuples
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT 1
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_users_email
ON users(email)
""")
conn.commit()
return conn
# Basic CRUD operations
conn = create_database()
# INSERT
conn.execute(
"INSERT OR IGNORE INTO users (email, name) VALUES (?, ?)",
("alice@example.com", "Alice")
)
conn.commit()
# SELECT
user = conn.execute(
"SELECT * FROM users WHERE email = ?",
("alice@example.com",)
).fetchone()
if user:
print(f"Found: {user['name']} (id={user['id']})")
# UPDATE
conn.execute(
"UPDATE users SET name = ? WHERE email = ?",
("Alice Smith", "alice@example.com")
)
conn.commit()
# DELETE
conn.execute("DELETE FROM users WHERE is_active = 0")
conn.commit()
? placeholders). Never use f-strings or string formatting to build SQL — that's how SQL injection happens.
import sqlite3
from contextlib import contextmanager
@contextmanager
def get_db(db_path: str = "app.db"):
"""Database connection with automatic commit/rollback."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL") # Better concurrency
conn.execute("PRAGMA foreign_keys=ON") # Enforce FK constraints
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
# Usage — auto-commits on success, rollbacks on error
with get_db() as db:
db.execute("INSERT INTO users (email, name) VALUES (?, ?)",
("bob@example.com", "Bob"))
# Auto-commits here
def bulk_insert_users(users: list[dict], db_path: str = "app.db"):
"""Insert many records efficiently."""
with get_db(db_path) as db:
db.executemany(
"INSERT OR IGNORE INTO users (email, name) VALUES (:email, :name)",
users
)
# Insert 1000 users in one transaction
users = [
{"email": f"user{i}@example.com", "name": f"User {i}"}
for i in range(1000)
]
bulk_insert_users(users) # Fast — single transaction
import json
import sqlite3
class KVStore:
"""Simple key-value store backed by SQLite."""
def __init__(self, db_path: str = "kv.db"):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS kv (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
def get(self, key: str, default=None):
row = self.conn.execute(
"SELECT value FROM kv WHERE key = ?", (key,)
).fetchone()
if row is None:
return default
return json.loads(row[0])
def set(self, key: str, value):
self.conn.execute(
"""INSERT INTO kv (key, value, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(key) DO UPDATE
SET value = excluded.value,
updated_at = CURRENT_TIMESTAMP""",
(key, json.dumps(value))
)
self.conn.commit()
def delete(self, key: str):
self.conn.execute("DELETE FROM kv WHERE key = ?", (key,))
self.conn.commit()
# Usage
store = KVStore()
store.set("config", {"theme": "dark", "lang": "en"})
config = store.get("config")
print(config) # {'theme': 'dark', 'lang': 'en'}
For production applications, PostgreSQL is the standard. psycopg2 is the most popular adapter — battle-tested and widely deployed.
import psycopg2
from psycopg2.extras import RealDictCursor, execute_values
def get_pg_connection(dsn: str = None):
"""Connect to PostgreSQL with sensible defaults."""
conn = psycopg2.connect(
dsn or "postgresql://user:pass@localhost:5432/mydb",
cursor_factory=RealDictCursor, # Return dicts, not tuples
)
conn.autocommit = False
return conn
# Basic operations
conn = get_pg_connection()
try:
with conn.cursor() as cur:
# Create table with PostgreSQL-specific features
cur.execute("""
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
price NUMERIC(10, 2) NOT NULL,
tags TEXT[] DEFAULT '{}',
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
)
""")
# INSERT with RETURNING
cur.execute(
"""INSERT INTO products (name, price, tags, metadata)
VALUES (%s, %s, %s, %s)
RETURNING id, created_at""",
("Widget", 29.99, ["sale", "new"],
'{"color": "blue", "weight": 150}')
)
result = cur.fetchone()
print(f"Created product {result['id']} at {result['created_at']}")
# UPSERT (INSERT ... ON CONFLICT)
cur.execute(
"""INSERT INTO products (name, price)
VALUES (%s, %s)
ON CONFLICT (name) DO UPDATE
SET price = EXCLUDED.price
RETURNING id""",
("Widget", 24.99)
)
# JSONB queries
cur.execute(
"SELECT * FROM products WHERE metadata->>'color' = %s",
("blue",)
)
blue_products = cur.fetchall()
# Array queries
cur.execute(
"SELECT * FROM products WHERE %s = ANY(tags)",
("sale",)
)
on_sale = cur.fetchall()
conn.commit()
except Exception as e:
conn.rollback()
print(f"Error: {e}")
finally:
conn.close()
from psycopg2.extras import execute_values
def bulk_insert_products(products: list[dict]):
"""Fast bulk insert using execute_values."""
conn = get_pg_connection()
try:
with conn.cursor() as cur:
execute_values(
cur,
"""INSERT INTO products (name, price, tags)
VALUES %s
ON CONFLICT (name) DO NOTHING""",
[(p["name"], p["price"], p.get("tags", []))
for p in products],
page_size=1000 # Batch size
)
conn.commit()
print(f"Inserted {len(products)} products")
finally:
conn.close()
# 10x faster than individual INSERTs
products = [
{"name": f"Product {i}", "price": 9.99 + i}
for i in range(5000)
]
bulk_insert_products(products)
%s for placeholders, not ? like SQLite. Don't mix them up — it's a common source of bugs.
SQLAlchemy lets you work with databases using Python classes instead of raw SQL. It supports SQLite, PostgreSQL, MySQL, and more — switch databases by changing one URL.
from datetime import datetime
from sqlalchemy import create_engine, Column, Integer, String, Float, \
Boolean, DateTime, ForeignKey, Index
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(255), unique=True, nullable=False, index=True)
name = Column(String(100), nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationship
orders = relationship("Order", back_populates="user",
cascade="all, delete-orphan")
def __repr__(self):
return f"<User {self.email}>"
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
total = Column(Float, nullable=False)
status = Column(String(20), default="pending")
created_at = Column(DateTime, default=datetime.utcnow)
user = relationship("User", back_populates="orders")
# Composite index for common queries
__table_args__ = (
Index("idx_orders_user_status", "user_id", "status"),
)
# Setup
engine = create_engine(
"sqlite:///app.db",
echo=False, # Set True for SQL logging
pool_pre_ping=True, # Verify connections before use
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
# CREATE
session = Session()
user = User(email="alice@example.com", name="Alice")
session.add(user)
session.commit()
# READ
user = session.query(User).filter_by(email="alice@example.com").first()
active_users = session.query(User).filter(User.is_active == True).all()
# Advanced queries
from sqlalchemy import func, and_, or_
# Count orders per user
results = session.query(
User.name,
func.count(Order.id).label("order_count"),
func.sum(Order.total).label("total_spent")
).join(Order).group_by(User.name).having(
func.count(Order.id) > 5
).all()
# Subquery
big_spenders = session.query(User).filter(
User.id.in_(
session.query(Order.user_id)
.group_by(Order.user_id)
.having(func.sum(Order.total) > 1000)
)
).all()
# UPDATE
user.name = "Alice Smith"
session.commit()
# Bulk update
session.query(User).filter(
User.created_at < datetime(2025, 1, 1)
).update({"is_active": False})
session.commit()
# DELETE
session.delete(user)
session.commit()
session.close()
from contextlib import contextmanager
@contextmanager
def get_session():
"""Provide a transactional scope around operations."""
session = Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# Clean, safe usage
with get_session() as s:
user = User(email="bob@example.com", name="Bob")
s.add(user)
order = Order(user=user, total=49.99)
s.add(order)
# Auto-commits on exit, rollbacks on error
Alembic tracks schema changes so you can evolve your database safely — version control for your schema.
# Install and initialize
# pip install alembic
# alembic init migrations
# In alembic.ini, set your database URL:
# sqlalchemy.url = postgresql://user:pass@localhost/mydb
# In migrations/env.py, import your models:
# from myapp.models import Base
# target_metadata = Base.metadata
# Auto-generate migration from model changes
# alembic revision --autogenerate -m "add users table"
# Manually create a migration
# alembic revision -m "add email index"
# Example migration file: migrations/versions/001_add_users.py
"""Add users table."""
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
"users",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("email", sa.String(255), unique=True, nullable=False),
sa.Column("name", sa.String(100), nullable=False),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.now()),
)
op.create_index("idx_users_email", "users", ["email"])
def downgrade():
op.drop_index("idx_users_email")
op.drop_table("users")
# Run migrations
# alembic upgrade head # Apply all pending
# alembic upgrade +1 # Apply next one
# alembic downgrade -1 # Rollback last one
# alembic current # Show current version
# alembic history # Show migration history
Opening a new database connection per request is slow. Connection pooling reuses connections, dramatically improving performance.
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# SQLAlchemy with connection pool
engine = create_engine(
"postgresql://user:pass@localhost/mydb",
poolclass=QueuePool,
pool_size=10, # Maintain 10 connections
max_overflow=20, # Allow 20 extra under load
pool_timeout=30, # Wait 30s for a connection
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True, # Verify connections are alive
echo_pool="debug", # Log pool events (dev only)
)
# Check pool status
pool = engine.pool
print(f"Pool size: {pool.size()}")
print(f"Checked in: {pool.checkedin()}")
print(f"Checked out: {pool.checkedout()}")
print(f"Overflow: {pool.overflow()}")
from psycopg2 import pool as pg_pool
from contextlib import contextmanager
# Thread-safe connection pool
connection_pool = pg_pool.ThreadedConnectionPool(
minconn=5,
maxconn=20,
dsn="postgresql://user:pass@localhost/mydb",
)
@contextmanager
def get_pooled_connection():
"""Get a connection from the pool, auto-return."""
conn = connection_pool.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
connection_pool.putconn(conn)
# Usage
with get_pooled_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT count(*) FROM users")
count = cur.fetchone()[0]
print(f"Total users: {count}")
Transactions ensure data integrity — either all operations succeed, or none do. Here's how to handle them properly.
import psycopg2
from psycopg2 import errors as pg_errors
def transfer_funds(from_id: int, to_id: int, amount: float):
"""Transfer money between accounts with proper transaction handling."""
conn = get_pg_connection()
try:
with conn.cursor() as cur:
# Lock rows to prevent race conditions
cur.execute(
"SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
(from_id,)
)
sender = cur.fetchone()
if not sender or sender["balance"] < amount:
raise ValueError("Insufficient funds")
# Debit sender
cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_id)
)
# Credit receiver
cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_id)
)
# Log the transfer
cur.execute(
"""INSERT INTO transfers (from_id, to_id, amount)
VALUES (%s, %s, %s)""",
(from_id, to_id, amount)
)
conn.commit()
print(f"Transferred ${amount:.2f}: {from_id} → {to_id}")
except ValueError as e:
conn.rollback()
print(f"Business error: {e}")
except pg_errors.SerializationFailure:
conn.rollback()
print("Concurrent modification — retry")
except pg_errors.DeadlockDetected:
conn.rollback()
print("Deadlock detected — retry")
except Exception as e:
conn.rollback()
print(f"Unexpected error: {e}")
raise
finally:
conn.close()
def process_batch(items: list[dict]):
"""Process items, skip failures without losing successes."""
conn = get_pg_connection()
processed = 0
try:
with conn.cursor() as cur:
for item in items:
# Create savepoint before each item
cur.execute("SAVEPOINT item_save")
try:
cur.execute(
"""INSERT INTO items (name, value)
VALUES (%s, %s)""",
(item["name"], item["value"])
)
processed += 1
except psycopg2.IntegrityError:
# Rollback just this item, continue with rest
cur.execute("ROLLBACK TO SAVEPOINT item_save")
print(f"Skipped duplicate: {item['name']}")
conn.commit()
print(f"Processed {processed}/{len(items)} items")
except Exception:
conn.rollback()
raise
finally:
conn.close()
For high-concurrency applications (web servers, bots), async database operations prevent blocking.
import asyncio
import aiosqlite
async def async_sqlite_demo():
"""SQLite with async/await."""
async with aiosqlite.connect("async_app.db") as db:
db.row_factory = aiosqlite.Row
await db.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY,
type TEXT NOT NULL,
data TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert
await db.execute(
"INSERT INTO events (type, data) VALUES (?, ?)",
("login", '{"user": "alice"}')
)
await db.commit()
# Query
async with db.execute(
"SELECT * FROM events ORDER BY created_at DESC LIMIT 10"
) as cursor:
async for row in cursor:
print(f"{row['type']}: {row['data']}")
asyncio.run(async_sqlite_demo())
import asyncio
import asyncpg
async def async_pg_demo():
"""High-performance async PostgreSQL."""
# Connection pool
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/mydb",
min_size=5,
max_size=20,
)
async with pool.acquire() as conn:
# Create table
await conn.execute("""
CREATE TABLE IF NOT EXISTS metrics (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
value DOUBLE PRECISION NOT NULL,
recorded_at TIMESTAMPTZ DEFAULT NOW()
)
""")
# Insert with RETURNING
row = await conn.fetchrow(
"""INSERT INTO metrics (name, value)
VALUES ($1, $2)
RETURNING id, recorded_at""",
"cpu_usage", 45.2
)
print(f"Inserted metric {row['id']}")
# Bulk insert (very fast)
data = [(f"metric_{i}", float(i)) for i in range(1000)]
await conn.executemany(
"INSERT INTO metrics (name, value) VALUES ($1, $2)",
data
)
# Query
rows = await conn.fetch(
"""SELECT name, avg(value) as avg_value
FROM metrics
GROUP BY name
ORDER BY avg_value DESC
LIMIT 10"""
)
for row in rows:
print(f"{row['name']}: {row['avg_value']:.1f}")
await pool.close()
asyncio.run(async_pg_demo())
In-memory SQLite databases are perfect for testing — fast, isolated, no cleanup needed.
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
@pytest.fixture
def db_session():
"""Create a fresh in-memory database for each test."""
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
engine.dispose()
def test_create_user(db_session):
user = User(email="test@example.com", name="Test User")
db_session.add(user)
db_session.commit()
found = db_session.query(User).filter_by(email="test@example.com").first()
assert found is not None
assert found.name == "Test User"
def test_user_orders(db_session):
user = User(email="buyer@example.com", name="Buyer")
order1 = Order(user=user, total=25.00)
order2 = Order(user=user, total=75.00)
db_session.add_all([user, order1, order2])
db_session.commit()
assert len(user.orders) == 2
assert sum(o.total for o in user.orders) == 100.00
def test_duplicate_email_rejected(db_session):
db_session.add(User(email="one@example.com", name="One"))
db_session.commit()
db_session.add(User(email="one@example.com", name="Duplicate"))
with pytest.raises(Exception): # IntegrityError
db_session.commit()
from sqlalchemy import Index, text
# Single column index — for WHERE clauses
Index("idx_users_email", User.email)
# Composite index — for queries filtering on multiple columns
# Column ORDER matters: put high-cardinality columns first
Index("idx_orders_user_date", Order.user_id, Order.created_at)
# Partial index (PostgreSQL) — index only relevant rows
Index(
"idx_active_users",
User.email,
postgresql_where=text("is_active = true")
)
# Covering index — includes extra columns to avoid table lookups
Index(
"idx_orders_covering",
Order.user_id, Order.status,
postgresql_include=["total"]
)
# BAD: N+1 query problem
users = session.query(User).all()
for user in users:
print(user.orders) # Separate query for each user!
# GOOD: Eager loading with joinedload
from sqlalchemy.orm import joinedload
users = session.query(User).options(
joinedload(User.orders)
).all()
# Single query with JOIN — orders already loaded
# GOOD: Subquery loading for large datasets
from sqlalchemy.orm import subqueryload
users = session.query(User).options(
subqueryload(User.orders)
).filter(User.is_active == True).all()
# Two queries total, regardless of user count
def process_large_table(engine, batch_size: int = 1000):
"""Process millions of rows without loading all into memory."""
Session = sessionmaker(bind=engine)
offset = 0
total = 0
while True:
session = Session()
try:
batch = session.query(User).filter(
User.is_active == True
).order_by(
User.id
).offset(offset).limit(batch_size).all()
if not batch:
break
for user in batch:
# Process each user
pass
total += len(batch)
offset += batch_size
print(f"Processed {total} users...")
finally:
session.close()
print(f"Done: {total} users processed")
# Even better: use server-side cursor (PostgreSQL)
def stream_large_query(engine):
"""Stream results without loading all into memory."""
with engine.connect() as conn:
result = conn.execution_options(
stream_results=True
).execute(
text("SELECT * FROM users WHERE is_active = true")
)
for chunk in result.partitions(1000):
for row in chunk:
process_row(row)
Database utilities, API integrations, automation tools — all tested and documented. Save weeks of development time.
Get the AI Toolkit →