Python Design Patterns — Write Clean, Maintainable Code
Design patterns aren't academic exercises — they're battle-tested solutions to problems you hit every week. But most "design patterns in Python" tutorials translate Java patterns literally, ignoring Python's strengths. This guide covers the patterns that actually matter in Python, with idiomatic implementations you can drop into real projects.
We'll skip the ones Python makes irrelevant (looking at you, Abstract Factory with 6 classes) and focus on patterns that make your code genuinely better.
1. Singleton — One Instance, Guaranteed
Use when you need exactly one instance: database connections, config managers, loggers.
The Pythonic way (module-level)
# config.py — Python modules ARE singletons
import json
from pathlib import Path
class _Config:
"""Internal config class — instantiated once at module level."""
def __init__(self):
self._data = {}
self._loaded = False
def load(self, path: str = "config.json"):
if self._loaded:
return
config_file = Path(path)
if config_file.exists():
self._data = json.loads(config_file.read_text())
self._loaded = True
def get(self, key: str, default=None):
self.load() # lazy load on first access
return self._data.get(key, default)
def __repr__(self):
return f"Config({len(self._data)} keys)"
# THE singleton — import this, not the class
config = _Config()
# Usage (anywhere in your app):
# from config import config
# db_url = config.get("database_url", "sqlite:///default.db")
Thread-safe singleton (when you need it)
import threading
class ConnectionPool:
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
# Double-checked locking
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, max_connections: int = 10):
if self._initialized:
return
self._initialized = True
self.max_connections = max_connections
self._pool = []
self._semaphore = threading.Semaphore(max_connections)
print(f"Pool created with {max_connections} connections")
2. Factory — Create Objects Without Hardcoding Types
Use when the object type depends on runtime data: parsing different file formats, connecting to different databases, handling different API providers.
from abc import ABC, abstractmethod
# --- Product interface ---
class Notifier(ABC):
@abstractmethod
def send(self, to: str, message: str) -> bool:
...
# --- Concrete products ---
class EmailNotifier(Notifier):
def __init__(self, smtp_host: str):
self.smtp_host = smtp_host
def send(self, to: str, message: str) -> bool:
print(f"📧 Email to {to} via {self.smtp_host}: {message}")
return True
class SlackNotifier(Notifier):
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send(self, to: str, message: str) -> bool:
print(f"💬 Slack to #{to}: {message}")
return True
class TelegramNotifier(Notifier):
def __init__(self, bot_token: str):
self.bot_token = bot_token
def send(self, to: str, message: str) -> bool:
print(f"✈️ Telegram to {to}: {message}")
return True
# --- Factory (dict-based, Pythonic) ---
NOTIFIER_REGISTRY: dict[str, type[Notifier]] = {
"email": EmailNotifier,
"slack": SlackNotifier,
"telegram": TelegramNotifier,
}
def create_notifier(channel: str, **kwargs) -> Notifier:
"""Factory function — create notifier by channel name."""
cls = NOTIFIER_REGISTRY.get(channel)
if cls is None:
raise ValueError(
f"Unknown channel: {channel}. "
f"Available: {list(NOTIFIER_REGISTRY.keys())}"
)
return cls(**kwargs)
# --- Usage ---
notifier = create_notifier("slack", webhook_url="https://hooks.slack.com/...")
notifier.send("alerts", "Server is down!")
def register_notifier(name: str):
def decorator(cls):
NOTIFIER_REGISTRY[name] = cls
return cls
return decorator
@register_notifier("discord")
class DiscordNotifier(Notifier):
def send(self, to, message):
print(f"🎮 Discord: {message}")
return True
3. Strategy — Swap Algorithms at Runtime
Use when you have multiple ways to do the same thing and want to switch between them: compression algorithms, pricing strategies, search backends.
from abc import ABC, abstractmethod
from typing import Callable
# --- Strategy interface ---
class PricingStrategy(ABC):
@abstractmethod
def calculate(self, base_price: float, quantity: int) -> float:
...
class RegularPricing(PricingStrategy):
def calculate(self, base_price: float, quantity: int) -> float:
return base_price * quantity
class BulkPricing(PricingStrategy):
"""10% off for 10+, 20% off for 50+"""
def calculate(self, base_price: float, quantity: int) -> float:
if quantity >= 50:
return base_price * quantity * 0.80
elif quantity >= 10:
return base_price * quantity * 0.90
return base_price * quantity
class SubscriptionPricing(PricingStrategy):
"""Flat monthly rate with per-unit overage."""
def __init__(self, monthly_fee: float, included_units: int, overage_rate: float):
self.monthly_fee = monthly_fee
self.included_units = included_units
self.overage_rate = overage_rate
def calculate(self, base_price: float, quantity: int) -> float:
overage = max(0, quantity - self.included_units)
return self.monthly_fee + (overage * self.overage_rate)
# --- Context ---
class Order:
def __init__(self, pricing: PricingStrategy):
self.pricing = pricing
self.items: list[tuple[str, float, int]] = []
def add_item(self, name: str, price: float, qty: int):
self.items.append((name, price, qty))
def total(self) -> float:
return sum(
self.pricing.calculate(price, qty)
for _, price, qty in self.items
)
# --- Usage: swap strategy without changing Order ---
regular = Order(RegularPricing())
regular.add_item("Widget", 10.0, 5)
print(f"Regular: ${regular.total():.2f}") # $50.00
bulk = Order(BulkPricing())
bulk.add_item("Widget", 10.0, 50)
print(f"Bulk: ${bulk.total():.2f}") # $400.00 (20% off)
Pythonic alternative: functions as strategies
# For simple strategies, just use callables
def sort_by_price(items):
return sorted(items, key=lambda x: x["price"])
def sort_by_rating(items):
return sorted(items, key=lambda x: -x["rating"])
def sort_by_relevance(items, query):
return sorted(items, key=lambda x: query.lower() in x["name"].lower(), reverse=True)
# Strategy is just a function parameter
def display_products(items, sort_fn=sort_by_price):
for item in sort_fn(items):
print(f" {item['name']}: ${item['price']} ⭐{item['rating']}")
4. Observer — React to Changes
Use when multiple parts of your system need to react to the same event: user signup triggers welcome email + analytics + Slack notification.
from collections import defaultdict
from typing import Callable, Any
class EventBus:
"""Simple pub/sub event system."""
def __init__(self):
self._subscribers: dict[str, list[Callable]] = defaultdict(list)
def subscribe(self, event: str, callback: Callable):
self._subscribers[event].append(callback)
def unsubscribe(self, event: str, callback: Callable):
self._subscribers[event].remove(callback)
def emit(self, event: str, **data):
for callback in self._subscribers.get(event, []):
try:
callback(**data)
except Exception as e:
print(f"Error in {callback.__name__} for '{event}': {e}")
def on(self, event: str):
"""Decorator to subscribe a function to an event."""
def decorator(fn):
self.subscribe(event, fn)
return fn
return decorator
# --- Setup ---
bus = EventBus()
@bus.on("user.signup")
def send_welcome_email(email: str, name: str, **_):
print(f"📧 Welcome email → {email}")
@bus.on("user.signup")
def track_analytics(email: str, **_):
print(f"📊 Analytics: new signup {email}")
@bus.on("user.signup")
def notify_team(name: str, **_):
print(f"💬 Slack: {name} just signed up!")
@bus.on("order.completed")
def send_receipt(email: str, amount: float, **_):
print(f"🧾 Receipt → {email}: ${amount:.2f}")
# --- Trigger events ---
bus.emit("user.signup", email="alice@example.com", name="Alice")
# 📧 Welcome email → alice@example.com
# 📊 Analytics: new signup alice@example.com
# 💬 Slack: Alice just signed up!
bus.emit("order.completed", email="alice@example.com", amount=99.99)
# 🧾 Receipt → alice@example.com: $99.99
5. Decorator Pattern — Add Behavior Without Changing Code
Not to be confused with Python's @decorator syntax (though they're related). The pattern wraps objects to add functionality.
import time
import functools
import hashlib
import json
# --- Function decorators (most common in Python) ---
def retry(max_attempts: int = 3, delay: float = 1.0):
"""Retry a function on failure."""
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
last_error = None
for attempt in range(1, max_attempts + 1):
try:
return fn(*args, **kwargs)
except Exception as e:
last_error = e
if attempt < max_attempts:
print(f"Attempt {attempt} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
raise last_error
return wrapper
return decorator
def cache(ttl_seconds: int = 300):
"""Simple TTL cache decorator."""
def decorator(fn):
_cache = {}
@functools.wraps(fn)
def wrapper(*args, **kwargs):
key = hashlib.md5(
json.dumps((args, sorted(kwargs.items())), default=str).encode()
).hexdigest()
if key in _cache:
result, timestamp = _cache[key]
if time.time() - timestamp < ttl_seconds:
return result
result = fn(*args, **kwargs)
_cache[key] = (result, time.time())
return result
wrapper.clear_cache = lambda: _cache.clear()
return wrapper
return decorator
def timing(fn):
"""Log execution time."""
@functools.wraps(fn)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = fn(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"⏱ {fn.__name__} took {elapsed:.3f}s")
return result
return wrapper
# --- Stack decorators ---
@timing
@retry(max_attempts=3, delay=0.5)
@cache(ttl_seconds=60)
def fetch_user(user_id: int) -> dict:
"""Cached + retried + timed API call."""
import random
if random.random() < 0.3:
raise ConnectionError("API timeout")
return {"id": user_id, "name": f"User {user_id}"}
6. Repository — Separate Data Access from Logic
Use when your business logic shouldn't know (or care) about the database. This is the pattern behind FastAPI and Django's data layer. See also our FastAPI guide and database operations guide.
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
@dataclass
class Task:
id: Optional[int] = None
title: str = ""
completed: bool = False
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
# --- Repository interface ---
class TaskRepository(ABC):
@abstractmethod
def get(self, task_id: int) -> Optional[Task]: ...
@abstractmethod
def list(self, completed: Optional[bool] = None) -> list[Task]: ...
@abstractmethod
def create(self, task: Task) -> Task: ...
@abstractmethod
def update(self, task: Task) -> Task: ...
@abstractmethod
def delete(self, task_id: int) -> bool: ...
# --- In-memory implementation (for tests) ---
class InMemoryTaskRepo(TaskRepository):
def __init__(self):
self._store: dict[int, Task] = {}
self._next_id = 1
def get(self, task_id: int) -> Optional[Task]:
return self._store.get(task_id)
def list(self, completed: Optional[bool] = None) -> list[Task]:
tasks = self._store.values()
if completed is not None:
tasks = [t for t in tasks if t.completed == completed]
return sorted(tasks, key=lambda t: t.created_at, reverse=True)
def create(self, task: Task) -> Task:
task.id = self._next_id
self._next_id += 1
self._store[task.id] = task
return task
def update(self, task: Task) -> Task:
if task.id not in self._store:
raise ValueError(f"Task {task.id} not found")
self._store[task.id] = task
return task
def delete(self, task_id: int) -> bool:
return self._store.pop(task_id, None) is not None
# --- SQLAlchemy implementation (for production) ---
class SQLAlchemyTaskRepo(TaskRepository):
def __init__(self, session):
self.session = session
def get(self, task_id: int):
return self.session.query(TaskModel).get(task_id)
def list(self, completed=None):
q = self.session.query(TaskModel)
if completed is not None:
q = q.filter(TaskModel.completed == completed)
return q.order_by(TaskModel.created_at.desc()).all()
def create(self, task):
db_task = TaskModel(**task.__dict__)
self.session.add(db_task)
self.session.commit()
return db_task
# ... update, delete similarly
# --- Business logic doesn't know about the database ---
class TaskService:
def __init__(self, repo: TaskRepository):
self.repo = repo
def complete_task(self, task_id: int) -> Task:
task = self.repo.get(task_id)
if not task:
raise ValueError("Task not found")
task.completed = True
return self.repo.update(task)
def overdue_tasks(self) -> list[Task]:
return [t for t in self.repo.list(completed=False)
if (datetime.now(timezone.utc) - t.created_at).days > 7]
# Tests use InMemoryTaskRepo — fast, no DB needed
# Production uses SQLAlchemyTaskRepo — same interface
7. Pipeline / Chain of Responsibility
Use when data needs to pass through multiple processing steps: data cleaning, request middleware, text processing. This pairs well with ETL pipelines.
from typing import Callable, Any
class Pipeline:
"""Composable processing pipeline."""
def __init__(self):
self._steps: list[Callable] = []
def add(self, step: Callable) -> "Pipeline":
self._steps.append(step)
return self # fluent interface
def run(self, data: Any) -> Any:
result = data
for step in self._steps:
result = step(result)
if result is None:
raise ValueError(f"Step {step.__name__} returned None")
return result
def __or__(self, step: Callable) -> "Pipeline":
"""Enable pipe syntax: pipeline | step1 | step2"""
self.add(step)
return self
# --- Processing steps ---
def strip_whitespace(text: str) -> str:
return " ".join(text.split())
def lowercase(text: str) -> str:
return text.lower()
def remove_punctuation(text: str) -> str:
import string
return text.translate(str.maketrans("", "", string.punctuation))
def tokenize(text: str) -> list[str]:
return text.split()
def remove_stopwords(tokens: list[str]) -> list[str]:
stops = {"the", "a", "an", "is", "at", "in", "on", "and", "or", "to", "of"}
return [t for t in tokens if t not in stops]
# --- Build and run ---
clean_text = Pipeline()
clean_text.add(strip_whitespace)
clean_text.add(lowercase)
clean_text.add(remove_punctuation)
result = clean_text.run(" Hello, World! This is A TEST. ")
print(result) # "hello world this is a test"
# Or with pipe syntax:
nlp = Pipeline() | strip_whitespace | lowercase | remove_punctuation | tokenize | remove_stopwords
tokens = nlp.run("The Quick Brown Fox Jumps Over the Lazy Dog!")
print(tokens) # ['quick', 'brown', 'fox', 'jumps', 'over', 'lazy', 'dog']
8. Builder — Construct Complex Objects Step by Step
Use when creating objects with many optional parameters. Python's keyword arguments reduce the need for this, but it's still valuable for complex configs.
from dataclasses import dataclass, field
@dataclass
class QueryBuilder:
"""Build SQL-like queries fluently."""
_table: str = ""
_columns: list[str] = field(default_factory=lambda: ["*"])
_conditions: list[str] = field(default_factory=list)
_order: list[str] = field(default_factory=list)
_limit_val: int | None = None
_offset_val: int | None = None
def table(self, name: str) -> "QueryBuilder":
self._table = name
return self
def select(self, *columns: str) -> "QueryBuilder":
self._columns = list(columns)
return self
def where(self, condition: str) -> "QueryBuilder":
self._conditions.append(condition)
return self
def order_by(self, column: str, desc: bool = False) -> "QueryBuilder":
direction = "DESC" if desc else "ASC"
self._order.append(f"{column} {direction}")
return self
def limit(self, n: int) -> "QueryBuilder":
self._limit_val = n
return self
def offset(self, n: int) -> "QueryBuilder":
self._offset_val = n
return self
def build(self) -> str:
if not self._table:
raise ValueError("Table name is required")
parts = [f"SELECT {', '.join(self._columns)} FROM {self._table}"]
if self._conditions:
parts.append("WHERE " + " AND ".join(self._conditions))
if self._order:
parts.append("ORDER BY " + ", ".join(self._order))
if self._limit_val is not None:
parts.append(f"LIMIT {self._limit_val}")
if self._offset_val is not None:
parts.append(f"OFFSET {self._offset_val}")
return " ".join(parts)
# --- Usage ---
query = (
QueryBuilder()
.table("users")
.select("id", "name", "email")
.where("is_active = TRUE")
.where("created_at > '2026-01-01'")
.order_by("created_at", desc=True)
.limit(20)
.offset(40)
.build()
)
print(query)
# SELECT id, name, email FROM users
# WHERE is_active = TRUE AND created_at > '2026-01-01'
# ORDER BY created_at DESC LIMIT 20 OFFSET 40
9. Context Manager — Resource Management Done Right
Python's with statement is the cleanest resource management pattern in any language. Build your own for databases, temp files, timers, and locks.
import time
import tempfile
import shutil
from contextlib import contextmanager
from pathlib import Path
@contextmanager
def timer(label: str = ""):
"""Measure and print execution time."""
start = time.perf_counter()
yield
elapsed = time.perf_counter() - start
print(f"⏱ {label or 'Block'}: {elapsed:.3f}s")
@contextmanager
def temp_directory(prefix: str = "work_"):
"""Create a temp directory, auto-cleanup on exit."""
path = Path(tempfile.mkdtemp(prefix=prefix))
try:
yield path
finally:
shutil.rmtree(path, ignore_errors=True)
@contextmanager
def atomic_write(filepath: str):
"""Write to temp file, rename on success (atomic)."""
path = Path(filepath)
tmp = path.with_suffix(".tmp")
try:
with open(tmp, "w") as f:
yield f
tmp.rename(path) # atomic on same filesystem
except Exception:
tmp.unlink(missing_ok=True)
raise
# --- Usage ---
with timer("data processing"):
data = [i ** 2 for i in range(1_000_000)]
with temp_directory("export_") as tmpdir:
output = tmpdir / "results.csv"
output.write_text("id,value\n1,42\n")
print(f"Wrote to {output}")
# Directory auto-deleted here
with atomic_write("config.json") as f:
import json
json.dump({"version": 2, "debug": False}, f)
# If json.dump fails, config.json is untouched
When to Use Which Pattern
| Problem | Pattern | Python Approach |
|---|---|---|
| One instance of something | Singleton | Module-level instance |
| Create objects by type name | Factory | Dict registry + function |
| Swap algorithms at runtime | Strategy | Pass functions / protocol classes |
| React to events | Observer | EventBus / signals |
| Add behavior to functions | Decorator | @decorator syntax |
| Abstract data storage | Repository | ABC + multiple implementations |
| Process data in stages | Pipeline | Composable callables |
| Build complex objects | Builder | Fluent interface / dataclass |
| Manage resources | Context Manager | with + @contextmanager |
🚀 Want production-ready implementations of these patterns plus 50+ automation scripts?
Related Articles
- Build a REST API with FastAPI — uses Repository, Factory, and Dependency Injection patterns
- Build a Data Pipeline in Python — Pipeline pattern in action with ETL
- Python Testing Guide — test your pattern implementations with pytest
- Build a Professional CLI Tool in Python — Builder and Strategy patterns for CLI apps
- Python Database Operations — Repository pattern with SQLAlchemy
Need clean, well-architected Python code for your project? I build APIs, automation tools, and data pipelines with solid patterns. Reach out on Telegram →