Python Design Patterns — Write Clean, Maintainable Code

March 2026 · 22 min read · Python, Architecture, Clean Code

Design patterns aren't academic exercises — they're battle-tested solutions to problems you hit every week. But most "design patterns in Python" tutorials translate Java patterns literally, ignoring Python's strengths. This guide covers the patterns that actually matter in Python, with idiomatic implementations you can drop into real projects.

We'll skip the ones Python makes irrelevant (looking at you, Abstract Factory with 6 classes) and focus on patterns that make your code genuinely better.

1. Singleton — One Instance, Guaranteed

Use when you need exactly one instance: database connections, config managers, loggers.

The Pythonic way (module-level)

# config.py — Python modules ARE singletons
import json
from pathlib import Path

class _Config:
    """Internal config class — instantiated once at module level."""

    def __init__(self):
        self._data = {}
        self._loaded = False

    def load(self, path: str = "config.json"):
        if self._loaded:
            return
        config_file = Path(path)
        if config_file.exists():
            self._data = json.loads(config_file.read_text())
        self._loaded = True

    def get(self, key: str, default=None):
        self.load()  # lazy load on first access
        return self._data.get(key, default)

    def __repr__(self):
        return f"Config({len(self._data)} keys)"


# THE singleton — import this, not the class
config = _Config()


# Usage (anywhere in your app):
# from config import config
# db_url = config.get("database_url", "sqlite:///default.db")
💡 Why not __new__? The metaclass / __new__ approach works but it's over-engineered for Python. Module-level instances are simpler, testable (you can mock config), and Pythonic. Django's settings uses this exact pattern.

Thread-safe singleton (when you need it)

import threading

class ConnectionPool:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            with cls._lock:
                # Double-checked locking
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
                    cls._instance._initialized = False
        return cls._instance

    def __init__(self, max_connections: int = 10):
        if self._initialized:
            return
        self._initialized = True
        self.max_connections = max_connections
        self._pool = []
        self._semaphore = threading.Semaphore(max_connections)
        print(f"Pool created with {max_connections} connections")

2. Factory — Create Objects Without Hardcoding Types

Use when the object type depends on runtime data: parsing different file formats, connecting to different databases, handling different API providers.

from abc import ABC, abstractmethod


# --- Product interface ---
class Notifier(ABC):
    @abstractmethod
    def send(self, to: str, message: str) -> bool:
        ...


# --- Concrete products ---
class EmailNotifier(Notifier):
    def __init__(self, smtp_host: str):
        self.smtp_host = smtp_host

    def send(self, to: str, message: str) -> bool:
        print(f"📧 Email to {to} via {self.smtp_host}: {message}")
        return True


class SlackNotifier(Notifier):
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    def send(self, to: str, message: str) -> bool:
        print(f"💬 Slack to #{to}: {message}")
        return True


class TelegramNotifier(Notifier):
    def __init__(self, bot_token: str):
        self.bot_token = bot_token

    def send(self, to: str, message: str) -> bool:
        print(f"✈️ Telegram to {to}: {message}")
        return True


# --- Factory (dict-based, Pythonic) ---
NOTIFIER_REGISTRY: dict[str, type[Notifier]] = {
    "email": EmailNotifier,
    "slack": SlackNotifier,
    "telegram": TelegramNotifier,
}


def create_notifier(channel: str, **kwargs) -> Notifier:
    """Factory function — create notifier by channel name."""
    cls = NOTIFIER_REGISTRY.get(channel)
    if cls is None:
        raise ValueError(
            f"Unknown channel: {channel}. "
            f"Available: {list(NOTIFIER_REGISTRY.keys())}"
        )
    return cls(**kwargs)


# --- Usage ---
notifier = create_notifier("slack", webhook_url="https://hooks.slack.com/...")
notifier.send("alerts", "Server is down!")
💡 Pro tip: Use a decorator to auto-register new notifiers:
def register_notifier(name: str):
    def decorator(cls):
        NOTIFIER_REGISTRY[name] = cls
        return cls
    return decorator

@register_notifier("discord")
class DiscordNotifier(Notifier):
    def send(self, to, message):
        print(f"🎮 Discord: {message}")
        return True

3. Strategy — Swap Algorithms at Runtime

Use when you have multiple ways to do the same thing and want to switch between them: compression algorithms, pricing strategies, search backends.

from abc import ABC, abstractmethod
from typing import Callable


# --- Strategy interface ---
class PricingStrategy(ABC):
    @abstractmethod
    def calculate(self, base_price: float, quantity: int) -> float:
        ...


class RegularPricing(PricingStrategy):
    def calculate(self, base_price: float, quantity: int) -> float:
        return base_price * quantity


class BulkPricing(PricingStrategy):
    """10% off for 10+, 20% off for 50+"""
    def calculate(self, base_price: float, quantity: int) -> float:
        if quantity >= 50:
            return base_price * quantity * 0.80
        elif quantity >= 10:
            return base_price * quantity * 0.90
        return base_price * quantity


class SubscriptionPricing(PricingStrategy):
    """Flat monthly rate with per-unit overage."""
    def __init__(self, monthly_fee: float, included_units: int, overage_rate: float):
        self.monthly_fee = monthly_fee
        self.included_units = included_units
        self.overage_rate = overage_rate

    def calculate(self, base_price: float, quantity: int) -> float:
        overage = max(0, quantity - self.included_units)
        return self.monthly_fee + (overage * self.overage_rate)


# --- Context ---
class Order:
    def __init__(self, pricing: PricingStrategy):
        self.pricing = pricing
        self.items: list[tuple[str, float, int]] = []

    def add_item(self, name: str, price: float, qty: int):
        self.items.append((name, price, qty))

    def total(self) -> float:
        return sum(
            self.pricing.calculate(price, qty)
            for _, price, qty in self.items
        )


# --- Usage: swap strategy without changing Order ---
regular = Order(RegularPricing())
regular.add_item("Widget", 10.0, 5)
print(f"Regular: ${regular.total():.2f}")  # $50.00

bulk = Order(BulkPricing())
bulk.add_item("Widget", 10.0, 50)
print(f"Bulk: ${bulk.total():.2f}")  # $400.00 (20% off)

Pythonic alternative: functions as strategies

# For simple strategies, just use callables
def sort_by_price(items):
    return sorted(items, key=lambda x: x["price"])

def sort_by_rating(items):
    return sorted(items, key=lambda x: -x["rating"])

def sort_by_relevance(items, query):
    return sorted(items, key=lambda x: query.lower() in x["name"].lower(), reverse=True)


# Strategy is just a function parameter
def display_products(items, sort_fn=sort_by_price):
    for item in sort_fn(items):
        print(f"  {item['name']}: ${item['price']} ⭐{item['rating']}")

4. Observer — React to Changes

Use when multiple parts of your system need to react to the same event: user signup triggers welcome email + analytics + Slack notification.

from collections import defaultdict
from typing import Callable, Any


class EventBus:
    """Simple pub/sub event system."""

    def __init__(self):
        self._subscribers: dict[str, list[Callable]] = defaultdict(list)

    def subscribe(self, event: str, callback: Callable):
        self._subscribers[event].append(callback)

    def unsubscribe(self, event: str, callback: Callable):
        self._subscribers[event].remove(callback)

    def emit(self, event: str, **data):
        for callback in self._subscribers.get(event, []):
            try:
                callback(**data)
            except Exception as e:
                print(f"Error in {callback.__name__} for '{event}': {e}")

    def on(self, event: str):
        """Decorator to subscribe a function to an event."""
        def decorator(fn):
            self.subscribe(event, fn)
            return fn
        return decorator


# --- Setup ---
bus = EventBus()


@bus.on("user.signup")
def send_welcome_email(email: str, name: str, **_):
    print(f"📧 Welcome email → {email}")


@bus.on("user.signup")
def track_analytics(email: str, **_):
    print(f"📊 Analytics: new signup {email}")


@bus.on("user.signup")
def notify_team(name: str, **_):
    print(f"💬 Slack: {name} just signed up!")


@bus.on("order.completed")
def send_receipt(email: str, amount: float, **_):
    print(f"🧾 Receipt → {email}: ${amount:.2f}")


# --- Trigger events ---
bus.emit("user.signup", email="alice@example.com", name="Alice")
# 📧 Welcome email → alice@example.com
# 📊 Analytics: new signup alice@example.com
# 💬 Slack: Alice just signed up!

bus.emit("order.completed", email="alice@example.com", amount=99.99)
# 🧾 Receipt → alice@example.com: $99.99
💡 Scaling up: For production async event systems, look at blinker (Flask uses it) or go full message broker with Redis Pub/Sub or RabbitMQ.

5. Decorator Pattern — Add Behavior Without Changing Code

Not to be confused with Python's @decorator syntax (though they're related). The pattern wraps objects to add functionality.

import time
import functools
import hashlib
import json


# --- Function decorators (most common in Python) ---
def retry(max_attempts: int = 3, delay: float = 1.0):
    """Retry a function on failure."""
    def decorator(fn):
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            last_error = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return fn(*args, **kwargs)
                except Exception as e:
                    last_error = e
                    if attempt < max_attempts:
                        print(f"Attempt {attempt} failed: {e}. Retrying in {delay}s...")
                        time.sleep(delay)
            raise last_error
        return wrapper
    return decorator


def cache(ttl_seconds: int = 300):
    """Simple TTL cache decorator."""
    def decorator(fn):
        _cache = {}

        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            key = hashlib.md5(
                json.dumps((args, sorted(kwargs.items())), default=str).encode()
            ).hexdigest()

            if key in _cache:
                result, timestamp = _cache[key]
                if time.time() - timestamp < ttl_seconds:
                    return result

            result = fn(*args, **kwargs)
            _cache[key] = (result, time.time())
            return result

        wrapper.clear_cache = lambda: _cache.clear()
        return wrapper
    return decorator


def timing(fn):
    """Log execution time."""
    @functools.wraps(fn)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = fn(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"⏱ {fn.__name__} took {elapsed:.3f}s")
        return result
    return wrapper


# --- Stack decorators ---
@timing
@retry(max_attempts=3, delay=0.5)
@cache(ttl_seconds=60)
def fetch_user(user_id: int) -> dict:
    """Cached + retried + timed API call."""
    import random
    if random.random() < 0.3:
        raise ConnectionError("API timeout")
    return {"id": user_id, "name": f"User {user_id}"}

6. Repository — Separate Data Access from Logic

Use when your business logic shouldn't know (or care) about the database. This is the pattern behind FastAPI and Django's data layer. See also our FastAPI guide and database operations guide.

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional


@dataclass
class Task:
    id: Optional[int] = None
    title: str = ""
    completed: bool = False
    created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))


# --- Repository interface ---
class TaskRepository(ABC):
    @abstractmethod
    def get(self, task_id: int) -> Optional[Task]: ...

    @abstractmethod
    def list(self, completed: Optional[bool] = None) -> list[Task]: ...

    @abstractmethod
    def create(self, task: Task) -> Task: ...

    @abstractmethod
    def update(self, task: Task) -> Task: ...

    @abstractmethod
    def delete(self, task_id: int) -> bool: ...


# --- In-memory implementation (for tests) ---
class InMemoryTaskRepo(TaskRepository):
    def __init__(self):
        self._store: dict[int, Task] = {}
        self._next_id = 1

    def get(self, task_id: int) -> Optional[Task]:
        return self._store.get(task_id)

    def list(self, completed: Optional[bool] = None) -> list[Task]:
        tasks = self._store.values()
        if completed is not None:
            tasks = [t for t in tasks if t.completed == completed]
        return sorted(tasks, key=lambda t: t.created_at, reverse=True)

    def create(self, task: Task) -> Task:
        task.id = self._next_id
        self._next_id += 1
        self._store[task.id] = task
        return task

    def update(self, task: Task) -> Task:
        if task.id not in self._store:
            raise ValueError(f"Task {task.id} not found")
        self._store[task.id] = task
        return task

    def delete(self, task_id: int) -> bool:
        return self._store.pop(task_id, None) is not None


# --- SQLAlchemy implementation (for production) ---
class SQLAlchemyTaskRepo(TaskRepository):
    def __init__(self, session):
        self.session = session

    def get(self, task_id: int):
        return self.session.query(TaskModel).get(task_id)

    def list(self, completed=None):
        q = self.session.query(TaskModel)
        if completed is not None:
            q = q.filter(TaskModel.completed == completed)
        return q.order_by(TaskModel.created_at.desc()).all()

    def create(self, task):
        db_task = TaskModel(**task.__dict__)
        self.session.add(db_task)
        self.session.commit()
        return db_task

    # ... update, delete similarly


# --- Business logic doesn't know about the database ---
class TaskService:
    def __init__(self, repo: TaskRepository):
        self.repo = repo

    def complete_task(self, task_id: int) -> Task:
        task = self.repo.get(task_id)
        if not task:
            raise ValueError("Task not found")
        task.completed = True
        return self.repo.update(task)

    def overdue_tasks(self) -> list[Task]:
        return [t for t in self.repo.list(completed=False)
                if (datetime.now(timezone.utc) - t.created_at).days > 7]


# Tests use InMemoryTaskRepo — fast, no DB needed
# Production uses SQLAlchemyTaskRepo — same interface

7. Pipeline / Chain of Responsibility

Use when data needs to pass through multiple processing steps: data cleaning, request middleware, text processing. This pairs well with ETL pipelines.

from typing import Callable, Any


class Pipeline:
    """Composable processing pipeline."""

    def __init__(self):
        self._steps: list[Callable] = []

    def add(self, step: Callable) -> "Pipeline":
        self._steps.append(step)
        return self  # fluent interface

    def run(self, data: Any) -> Any:
        result = data
        for step in self._steps:
            result = step(result)
            if result is None:
                raise ValueError(f"Step {step.__name__} returned None")
        return result

    def __or__(self, step: Callable) -> "Pipeline":
        """Enable pipe syntax: pipeline | step1 | step2"""
        self.add(step)
        return self


# --- Processing steps ---
def strip_whitespace(text: str) -> str:
    return " ".join(text.split())

def lowercase(text: str) -> str:
    return text.lower()

def remove_punctuation(text: str) -> str:
    import string
    return text.translate(str.maketrans("", "", string.punctuation))

def tokenize(text: str) -> list[str]:
    return text.split()

def remove_stopwords(tokens: list[str]) -> list[str]:
    stops = {"the", "a", "an", "is", "at", "in", "on", "and", "or", "to", "of"}
    return [t for t in tokens if t not in stops]


# --- Build and run ---
clean_text = Pipeline()
clean_text.add(strip_whitespace)
clean_text.add(lowercase)
clean_text.add(remove_punctuation)

result = clean_text.run("  Hello,  World!   This is  A TEST.  ")
print(result)  # "hello world this is a test"

# Or with pipe syntax:
nlp = Pipeline() | strip_whitespace | lowercase | remove_punctuation | tokenize | remove_stopwords
tokens = nlp.run("The Quick Brown Fox Jumps Over the Lazy Dog!")
print(tokens)  # ['quick', 'brown', 'fox', 'jumps', 'over', 'lazy', 'dog']

8. Builder — Construct Complex Objects Step by Step

Use when creating objects with many optional parameters. Python's keyword arguments reduce the need for this, but it's still valuable for complex configs.

from dataclasses import dataclass, field


@dataclass
class QueryBuilder:
    """Build SQL-like queries fluently."""
    _table: str = ""
    _columns: list[str] = field(default_factory=lambda: ["*"])
    _conditions: list[str] = field(default_factory=list)
    _order: list[str] = field(default_factory=list)
    _limit_val: int | None = None
    _offset_val: int | None = None

    def table(self, name: str) -> "QueryBuilder":
        self._table = name
        return self

    def select(self, *columns: str) -> "QueryBuilder":
        self._columns = list(columns)
        return self

    def where(self, condition: str) -> "QueryBuilder":
        self._conditions.append(condition)
        return self

    def order_by(self, column: str, desc: bool = False) -> "QueryBuilder":
        direction = "DESC" if desc else "ASC"
        self._order.append(f"{column} {direction}")
        return self

    def limit(self, n: int) -> "QueryBuilder":
        self._limit_val = n
        return self

    def offset(self, n: int) -> "QueryBuilder":
        self._offset_val = n
        return self

    def build(self) -> str:
        if not self._table:
            raise ValueError("Table name is required")

        parts = [f"SELECT {', '.join(self._columns)} FROM {self._table}"]

        if self._conditions:
            parts.append("WHERE " + " AND ".join(self._conditions))
        if self._order:
            parts.append("ORDER BY " + ", ".join(self._order))
        if self._limit_val is not None:
            parts.append(f"LIMIT {self._limit_val}")
        if self._offset_val is not None:
            parts.append(f"OFFSET {self._offset_val}")

        return " ".join(parts)


# --- Usage ---
query = (
    QueryBuilder()
    .table("users")
    .select("id", "name", "email")
    .where("is_active = TRUE")
    .where("created_at > '2026-01-01'")
    .order_by("created_at", desc=True)
    .limit(20)
    .offset(40)
    .build()
)
print(query)
# SELECT id, name, email FROM users
# WHERE is_active = TRUE AND created_at > '2026-01-01'
# ORDER BY created_at DESC LIMIT 20 OFFSET 40

9. Context Manager — Resource Management Done Right

Python's with statement is the cleanest resource management pattern in any language. Build your own for databases, temp files, timers, and locks.

import time
import tempfile
import shutil
from contextlib import contextmanager
from pathlib import Path


@contextmanager
def timer(label: str = ""):
    """Measure and print execution time."""
    start = time.perf_counter()
    yield
    elapsed = time.perf_counter() - start
    print(f"⏱ {label or 'Block'}: {elapsed:.3f}s")


@contextmanager
def temp_directory(prefix: str = "work_"):
    """Create a temp directory, auto-cleanup on exit."""
    path = Path(tempfile.mkdtemp(prefix=prefix))
    try:
        yield path
    finally:
        shutil.rmtree(path, ignore_errors=True)


@contextmanager
def atomic_write(filepath: str):
    """Write to temp file, rename on success (atomic)."""
    path = Path(filepath)
    tmp = path.with_suffix(".tmp")
    try:
        with open(tmp, "w") as f:
            yield f
        tmp.rename(path)  # atomic on same filesystem
    except Exception:
        tmp.unlink(missing_ok=True)
        raise


# --- Usage ---
with timer("data processing"):
    data = [i ** 2 for i in range(1_000_000)]

with temp_directory("export_") as tmpdir:
    output = tmpdir / "results.csv"
    output.write_text("id,value\n1,42\n")
    print(f"Wrote to {output}")
# Directory auto-deleted here

with atomic_write("config.json") as f:
    import json
    json.dump({"version": 2, "debug": False}, f)
# If json.dump fails, config.json is untouched

When to Use Which Pattern

ProblemPatternPython Approach
One instance of somethingSingletonModule-level instance
Create objects by type nameFactoryDict registry + function
Swap algorithms at runtimeStrategyPass functions / protocol classes
React to eventsObserverEventBus / signals
Add behavior to functionsDecorator@decorator syntax
Abstract data storageRepositoryABC + multiple implementations
Process data in stagesPipelineComposable callables
Build complex objectsBuilderFluent interface / dataclass
Manage resourcesContext Managerwith + @contextmanager
🎯 Rule of thumb: If a Java pattern requires 5+ classes, there's probably a 10-line Python equivalent using functions, decorators, or protocols. Start simple. Add abstractions when the code demands it, not before.

🚀 Want production-ready implementations of these patterns plus 50+ automation scripts?

Get the AI Agent Toolkit →

Related Articles

Need clean, well-architected Python code for your project? I build APIs, automation tools, and data pipelines with solid patterns. Reach out on Telegram →