Python Database Operations — SQLite, PostgreSQL & SQLAlchemy Guide

By OpenClaw Team · March 26, 2026 · 15 min read

Every serious Python application needs a database. Whether you're building an API backend, a data pipeline, or a CLI tool that tracks state — understanding database operations is essential. This guide covers everything from SQLite's zero-config simplicity to production PostgreSQL with SQLAlchemy ORM.

Table of Contents

1. SQLite — Zero-Config Database

SQLite ships with Python's standard library. No server, no installation, no configuration. It's perfect for prototypes, small apps, local caches, and embedded systems.

import sqlite3
from pathlib import Path


def create_database(db_path: str = "app.db"):
    """Create a database with a users table."""
    conn = sqlite3.connect(db_path)
    # Return rows as dictionaries instead of tuples
    conn.row_factory = sqlite3.Row

    conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            email TEXT UNIQUE NOT NULL,
            name TEXT NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            is_active BOOLEAN DEFAULT 1
        )
    """)

    conn.execute("""
        CREATE INDEX IF NOT EXISTS idx_users_email
        ON users(email)
    """)

    conn.commit()
    return conn


# Basic CRUD operations
conn = create_database()

# INSERT
conn.execute(
    "INSERT OR IGNORE INTO users (email, name) VALUES (?, ?)",
    ("alice@example.com", "Alice")
)
conn.commit()

# SELECT
user = conn.execute(
    "SELECT * FROM users WHERE email = ?",
    ("alice@example.com",)
).fetchone()

if user:
    print(f"Found: {user['name']} (id={user['id']})")

# UPDATE
conn.execute(
    "UPDATE users SET name = ? WHERE email = ?",
    ("Alice Smith", "alice@example.com")
)
conn.commit()

# DELETE
conn.execute("DELETE FROM users WHERE is_active = 0")
conn.commit()
Always use parameterized queries (? placeholders). Never use f-strings or string formatting to build SQL — that's how SQL injection happens.

2. SQLite Patterns for Real Projects

Context Manager Pattern

import sqlite3
from contextlib import contextmanager


@contextmanager
def get_db(db_path: str = "app.db"):
    """Database connection with automatic commit/rollback."""
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row
    conn.execute("PRAGMA journal_mode=WAL")  # Better concurrency
    conn.execute("PRAGMA foreign_keys=ON")   # Enforce FK constraints
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()


# Usage — auto-commits on success, rollbacks on error
with get_db() as db:
    db.execute("INSERT INTO users (email, name) VALUES (?, ?)",
               ("bob@example.com", "Bob"))
    # Auto-commits here

Bulk Operations

def bulk_insert_users(users: list[dict], db_path: str = "app.db"):
    """Insert many records efficiently."""
    with get_db(db_path) as db:
        db.executemany(
            "INSERT OR IGNORE INTO users (email, name) VALUES (:email, :name)",
            users
        )


# Insert 1000 users in one transaction
users = [
    {"email": f"user{i}@example.com", "name": f"User {i}"}
    for i in range(1000)
]
bulk_insert_users(users)  # Fast — single transaction

Key-Value Store

import json
import sqlite3


class KVStore:
    """Simple key-value store backed by SQLite."""

    def __init__(self, db_path: str = "kv.db"):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS kv (
                key TEXT PRIMARY KEY,
                value TEXT NOT NULL,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.conn.commit()

    def get(self, key: str, default=None):
        row = self.conn.execute(
            "SELECT value FROM kv WHERE key = ?", (key,)
        ).fetchone()
        if row is None:
            return default
        return json.loads(row[0])

    def set(self, key: str, value):
        self.conn.execute(
            """INSERT INTO kv (key, value, updated_at)
               VALUES (?, ?, CURRENT_TIMESTAMP)
               ON CONFLICT(key) DO UPDATE
               SET value = excluded.value,
                   updated_at = CURRENT_TIMESTAMP""",
            (key, json.dumps(value))
        )
        self.conn.commit()

    def delete(self, key: str):
        self.conn.execute("DELETE FROM kv WHERE key = ?", (key,))
        self.conn.commit()


# Usage
store = KVStore()
store.set("config", {"theme": "dark", "lang": "en"})
config = store.get("config")
print(config)  # {'theme': 'dark', 'lang': 'en'}

3. PostgreSQL with psycopg2

For production applications, PostgreSQL is the standard. psycopg2 is the most popular adapter — battle-tested and widely deployed.

import psycopg2
from psycopg2.extras import RealDictCursor, execute_values


def get_pg_connection(dsn: str = None):
    """Connect to PostgreSQL with sensible defaults."""
    conn = psycopg2.connect(
        dsn or "postgresql://user:pass@localhost:5432/mydb",
        cursor_factory=RealDictCursor,  # Return dicts, not tuples
    )
    conn.autocommit = False
    return conn


# Basic operations
conn = get_pg_connection()

try:
    with conn.cursor() as cur:
        # Create table with PostgreSQL-specific features
        cur.execute("""
            CREATE TABLE IF NOT EXISTS products (
                id SERIAL PRIMARY KEY,
                name TEXT NOT NULL,
                price NUMERIC(10, 2) NOT NULL,
                tags TEXT[] DEFAULT '{}',
                metadata JSONB DEFAULT '{}',
                created_at TIMESTAMPTZ DEFAULT NOW()
            )
        """)

        # INSERT with RETURNING
        cur.execute(
            """INSERT INTO products (name, price, tags, metadata)
               VALUES (%s, %s, %s, %s)
               RETURNING id, created_at""",
            ("Widget", 29.99, ["sale", "new"],
             '{"color": "blue", "weight": 150}')
        )
        result = cur.fetchone()
        print(f"Created product {result['id']} at {result['created_at']}")

        # UPSERT (INSERT ... ON CONFLICT)
        cur.execute(
            """INSERT INTO products (name, price)
               VALUES (%s, %s)
               ON CONFLICT (name) DO UPDATE
               SET price = EXCLUDED.price
               RETURNING id""",
            ("Widget", 24.99)
        )

        # JSONB queries
        cur.execute(
            "SELECT * FROM products WHERE metadata->>'color' = %s",
            ("blue",)
        )
        blue_products = cur.fetchall()

        # Array queries
        cur.execute(
            "SELECT * FROM products WHERE %s = ANY(tags)",
            ("sale",)
        )
        on_sale = cur.fetchall()

    conn.commit()

except Exception as e:
    conn.rollback()
    print(f"Error: {e}")
finally:
    conn.close()

Bulk Insert with execute_values

from psycopg2.extras import execute_values


def bulk_insert_products(products: list[dict]):
    """Fast bulk insert using execute_values."""
    conn = get_pg_connection()
    try:
        with conn.cursor() as cur:
            execute_values(
                cur,
                """INSERT INTO products (name, price, tags)
                   VALUES %s
                   ON CONFLICT (name) DO NOTHING""",
                [(p["name"], p["price"], p.get("tags", []))
                 for p in products],
                page_size=1000  # Batch size
            )
        conn.commit()
        print(f"Inserted {len(products)} products")
    finally:
        conn.close()


# 10x faster than individual INSERTs
products = [
    {"name": f"Product {i}", "price": 9.99 + i}
    for i in range(5000)
]
bulk_insert_products(products)
psycopg2 uses %s for placeholders, not ? like SQLite. Don't mix them up — it's a common source of bugs.

4. SQLAlchemy ORM

SQLAlchemy lets you work with databases using Python classes instead of raw SQL. It supports SQLite, PostgreSQL, MySQL, and more — switch databases by changing one URL.

from datetime import datetime
from sqlalchemy import create_engine, Column, Integer, String, Float, \
    Boolean, DateTime, ForeignKey, Index
from sqlalchemy.orm import declarative_base, sessionmaker, relationship


Base = declarative_base()


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String(255), unique=True, nullable=False, index=True)
    name = Column(String(100), nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)

    # Relationship
    orders = relationship("Order", back_populates="user",
                         cascade="all, delete-orphan")

    def __repr__(self):
        return f"<User {self.email}>"


class Order(Base):
    __tablename__ = "orders"

    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    total = Column(Float, nullable=False)
    status = Column(String(20), default="pending")
    created_at = Column(DateTime, default=datetime.utcnow)

    user = relationship("User", back_populates="orders")

    # Composite index for common queries
    __table_args__ = (
        Index("idx_orders_user_status", "user_id", "status"),
    )


# Setup
engine = create_engine(
    "sqlite:///app.db",
    echo=False,          # Set True for SQL logging
    pool_pre_ping=True,  # Verify connections before use
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

CRUD with SQLAlchemy

# CREATE
session = Session()

user = User(email="alice@example.com", name="Alice")
session.add(user)
session.commit()

# READ
user = session.query(User).filter_by(email="alice@example.com").first()
active_users = session.query(User).filter(User.is_active == True).all()

# Advanced queries
from sqlalchemy import func, and_, or_

# Count orders per user
results = session.query(
    User.name,
    func.count(Order.id).label("order_count"),
    func.sum(Order.total).label("total_spent")
).join(Order).group_by(User.name).having(
    func.count(Order.id) > 5
).all()

# Subquery
big_spenders = session.query(User).filter(
    User.id.in_(
        session.query(Order.user_id)
        .group_by(Order.user_id)
        .having(func.sum(Order.total) > 1000)
    )
).all()

# UPDATE
user.name = "Alice Smith"
session.commit()

# Bulk update
session.query(User).filter(
    User.created_at < datetime(2025, 1, 1)
).update({"is_active": False})
session.commit()

# DELETE
session.delete(user)
session.commit()

session.close()

Session Management Pattern

from contextlib import contextmanager


@contextmanager
def get_session():
    """Provide a transactional scope around operations."""
    session = Session()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()


# Clean, safe usage
with get_session() as s:
    user = User(email="bob@example.com", name="Bob")
    s.add(user)
    order = Order(user=user, total=49.99)
    s.add(order)
    # Auto-commits on exit, rollbacks on error

5. Database Migrations with Alembic

Alembic tracks schema changes so you can evolve your database safely — version control for your schema.

# Install and initialize
# pip install alembic
# alembic init migrations

# In alembic.ini, set your database URL:
# sqlalchemy.url = postgresql://user:pass@localhost/mydb

# In migrations/env.py, import your models:
# from myapp.models import Base
# target_metadata = Base.metadata

Creating and Running Migrations

# Auto-generate migration from model changes
# alembic revision --autogenerate -m "add users table"

# Manually create a migration
# alembic revision -m "add email index"

# Example migration file: migrations/versions/001_add_users.py
"""Add users table."""

from alembic import op
import sqlalchemy as sa


def upgrade():
    op.create_table(
        "users",
        sa.Column("id", sa.Integer(), primary_key=True),
        sa.Column("email", sa.String(255), unique=True, nullable=False),
        sa.Column("name", sa.String(100), nullable=False),
        sa.Column("created_at", sa.DateTime(), server_default=sa.func.now()),
    )
    op.create_index("idx_users_email", "users", ["email"])


def downgrade():
    op.drop_index("idx_users_email")
    op.drop_table("users")
# Run migrations
# alembic upgrade head          # Apply all pending
# alembic upgrade +1            # Apply next one
# alembic downgrade -1          # Rollback last one
# alembic current               # Show current version
# alembic history                # Show migration history
Always review auto-generated migrations. Alembic can't detect all changes (renamed columns, data migrations). Verify the SQL before running in production.

6. Connection Pooling

Opening a new database connection per request is slow. Connection pooling reuses connections, dramatically improving performance.

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool


# SQLAlchemy with connection pool
engine = create_engine(
    "postgresql://user:pass@localhost/mydb",
    poolclass=QueuePool,
    pool_size=10,           # Maintain 10 connections
    max_overflow=20,        # Allow 20 extra under load
    pool_timeout=30,        # Wait 30s for a connection
    pool_recycle=3600,      # Recycle connections after 1 hour
    pool_pre_ping=True,     # Verify connections are alive
    echo_pool="debug",      # Log pool events (dev only)
)


# Check pool status
pool = engine.pool
print(f"Pool size: {pool.size()}")
print(f"Checked in: {pool.checkedin()}")
print(f"Checked out: {pool.checkedout()}")
print(f"Overflow: {pool.overflow()}")

psycopg2 Connection Pool

from psycopg2 import pool as pg_pool
from contextlib import contextmanager


# Thread-safe connection pool
connection_pool = pg_pool.ThreadedConnectionPool(
    minconn=5,
    maxconn=20,
    dsn="postgresql://user:pass@localhost/mydb",
)


@contextmanager
def get_pooled_connection():
    """Get a connection from the pool, auto-return."""
    conn = connection_pool.getconn()
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        connection_pool.putconn(conn)


# Usage
with get_pooled_connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT count(*) FROM users")
        count = cur.fetchone()[0]
        print(f"Total users: {count}")

7. Transactions & Error Handling

Transactions ensure data integrity — either all operations succeed, or none do. Here's how to handle them properly.

import psycopg2
from psycopg2 import errors as pg_errors


def transfer_funds(from_id: int, to_id: int, amount: float):
    """Transfer money between accounts with proper transaction handling."""
    conn = get_pg_connection()
    try:
        with conn.cursor() as cur:
            # Lock rows to prevent race conditions
            cur.execute(
                "SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
                (from_id,)
            )
            sender = cur.fetchone()
            if not sender or sender["balance"] < amount:
                raise ValueError("Insufficient funds")

            # Debit sender
            cur.execute(
                "UPDATE accounts SET balance = balance - %s WHERE id = %s",
                (amount, from_id)
            )

            # Credit receiver
            cur.execute(
                "UPDATE accounts SET balance = balance + %s WHERE id = %s",
                (amount, to_id)
            )

            # Log the transfer
            cur.execute(
                """INSERT INTO transfers (from_id, to_id, amount)
                   VALUES (%s, %s, %s)""",
                (from_id, to_id, amount)
            )

        conn.commit()
        print(f"Transferred ${amount:.2f}: {from_id} → {to_id}")

    except ValueError as e:
        conn.rollback()
        print(f"Business error: {e}")

    except pg_errors.SerializationFailure:
        conn.rollback()
        print("Concurrent modification — retry")

    except pg_errors.DeadlockDetected:
        conn.rollback()
        print("Deadlock detected — retry")

    except Exception as e:
        conn.rollback()
        print(f"Unexpected error: {e}")
        raise

    finally:
        conn.close()

Savepoints for Partial Rollbacks

def process_batch(items: list[dict]):
    """Process items, skip failures without losing successes."""
    conn = get_pg_connection()
    processed = 0

    try:
        with conn.cursor() as cur:
            for item in items:
                # Create savepoint before each item
                cur.execute("SAVEPOINT item_save")
                try:
                    cur.execute(
                        """INSERT INTO items (name, value)
                           VALUES (%s, %s)""",
                        (item["name"], item["value"])
                    )
                    processed += 1
                except psycopg2.IntegrityError:
                    # Rollback just this item, continue with rest
                    cur.execute("ROLLBACK TO SAVEPOINT item_save")
                    print(f"Skipped duplicate: {item['name']}")

        conn.commit()
        print(f"Processed {processed}/{len(items)} items")

    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

8. Async Database Operations

For high-concurrency applications (web servers, bots), async database operations prevent blocking.

import asyncio
import aiosqlite


async def async_sqlite_demo():
    """SQLite with async/await."""
    async with aiosqlite.connect("async_app.db") as db:
        db.row_factory = aiosqlite.Row

        await db.execute("""
            CREATE TABLE IF NOT EXISTS events (
                id INTEGER PRIMARY KEY,
                type TEXT NOT NULL,
                data TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)

        # Insert
        await db.execute(
            "INSERT INTO events (type, data) VALUES (?, ?)",
            ("login", '{"user": "alice"}')
        )
        await db.commit()

        # Query
        async with db.execute(
            "SELECT * FROM events ORDER BY created_at DESC LIMIT 10"
        ) as cursor:
            async for row in cursor:
                print(f"{row['type']}: {row['data']}")


asyncio.run(async_sqlite_demo())

Async PostgreSQL with asyncpg

import asyncio
import asyncpg


async def async_pg_demo():
    """High-performance async PostgreSQL."""
    # Connection pool
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/mydb",
        min_size=5,
        max_size=20,
    )

    async with pool.acquire() as conn:
        # Create table
        await conn.execute("""
            CREATE TABLE IF NOT EXISTS metrics (
                id SERIAL PRIMARY KEY,
                name TEXT NOT NULL,
                value DOUBLE PRECISION NOT NULL,
                recorded_at TIMESTAMPTZ DEFAULT NOW()
            )
        """)

        # Insert with RETURNING
        row = await conn.fetchrow(
            """INSERT INTO metrics (name, value)
               VALUES ($1, $2)
               RETURNING id, recorded_at""",
            "cpu_usage", 45.2
        )
        print(f"Inserted metric {row['id']}")

        # Bulk insert (very fast)
        data = [(f"metric_{i}", float(i)) for i in range(1000)]
        await conn.executemany(
            "INSERT INTO metrics (name, value) VALUES ($1, $2)",
            data
        )

        # Query
        rows = await conn.fetch(
            """SELECT name, avg(value) as avg_value
               FROM metrics
               GROUP BY name
               ORDER BY avg_value DESC
               LIMIT 10"""
        )
        for row in rows:
            print(f"{row['name']}: {row['avg_value']:.1f}")

    await pool.close()


asyncio.run(async_pg_demo())
asyncpg vs psycopg2: asyncpg is 2-5x faster for async workloads. Use it for web servers (FastAPI, aiohttp). Use psycopg2 for scripts, data pipelines, and synchronous code.

9. Testing with In-Memory SQLite

In-memory SQLite databases are perfect for testing — fast, isolated, no cleanup needed.

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker


@pytest.fixture
def db_session():
    """Create a fresh in-memory database for each test."""
    engine = create_engine("sqlite:///:memory:")
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()

    yield session

    session.close()
    engine.dispose()


def test_create_user(db_session):
    user = User(email="test@example.com", name="Test User")
    db_session.add(user)
    db_session.commit()

    found = db_session.query(User).filter_by(email="test@example.com").first()
    assert found is not None
    assert found.name == "Test User"


def test_user_orders(db_session):
    user = User(email="buyer@example.com", name="Buyer")
    order1 = Order(user=user, total=25.00)
    order2 = Order(user=user, total=75.00)
    db_session.add_all([user, order1, order2])
    db_session.commit()

    assert len(user.orders) == 2
    assert sum(o.total for o in user.orders) == 100.00


def test_duplicate_email_rejected(db_session):
    db_session.add(User(email="one@example.com", name="One"))
    db_session.commit()

    db_session.add(User(email="one@example.com", name="Duplicate"))
    with pytest.raises(Exception):  # IntegrityError
        db_session.commit()

10. Performance Tips & Indexing

Index Strategy

from sqlalchemy import Index, text


# Single column index — for WHERE clauses
Index("idx_users_email", User.email)

# Composite index — for queries filtering on multiple columns
# Column ORDER matters: put high-cardinality columns first
Index("idx_orders_user_date", Order.user_id, Order.created_at)

# Partial index (PostgreSQL) — index only relevant rows
Index(
    "idx_active_users",
    User.email,
    postgresql_where=text("is_active = true")
)

# Covering index — includes extra columns to avoid table lookups
Index(
    "idx_orders_covering",
    Order.user_id, Order.status,
    postgresql_include=["total"]
)

Query Optimization

# BAD: N+1 query problem
users = session.query(User).all()
for user in users:
    print(user.orders)  # Separate query for each user!

# GOOD: Eager loading with joinedload
from sqlalchemy.orm import joinedload

users = session.query(User).options(
    joinedload(User.orders)
).all()
# Single query with JOIN — orders already loaded

# GOOD: Subquery loading for large datasets
from sqlalchemy.orm import subqueryload

users = session.query(User).options(
    subqueryload(User.orders)
).filter(User.is_active == True).all()
# Two queries total, regardless of user count

Batch Processing Large Datasets

def process_large_table(engine, batch_size: int = 1000):
    """Process millions of rows without loading all into memory."""
    Session = sessionmaker(bind=engine)

    offset = 0
    total = 0

    while True:
        session = Session()
        try:
            batch = session.query(User).filter(
                User.is_active == True
            ).order_by(
                User.id
            ).offset(offset).limit(batch_size).all()

            if not batch:
                break

            for user in batch:
                # Process each user
                pass

            total += len(batch)
            offset += batch_size
            print(f"Processed {total} users...")

        finally:
            session.close()

    print(f"Done: {total} users processed")


# Even better: use server-side cursor (PostgreSQL)
def stream_large_query(engine):
    """Stream results without loading all into memory."""
    with engine.connect() as conn:
        result = conn.execution_options(
            stream_results=True
        ).execute(
            text("SELECT * FROM users WHERE is_active = true")
        )

        for chunk in result.partitions(1000):
            for row in chunk:
                process_row(row)

11. Best Practices Checklist

50+ Production-Ready Python Scripts

Database utilities, API integrations, automation tools — all tested and documented. Save weeks of development time.

Get the AI Toolkit →