Python Error Handling — Build Resilient Applications

March 2026 · 20 min read · Python, Error Handling, Resilience

Most Python tutorials teach try/except and call it a day. Real-world applications need more: custom exception hierarchies, structured error responses, retry strategies, error boundaries, and graceful degradation. This guide covers the patterns that separate hobby scripts from production code.

Beyond Basic try/except

The anti-patterns

# ❌ Bare except — catches EVERYTHING including KeyboardInterrupt
try:
    result = risky_operation()
except:
    pass  # Silently swallows all errors

# ❌ Pokemon exception handling — gotta catch 'em all
try:
    data = fetch_data()
    processed = transform(data)
    save_to_db(processed)
except Exception as e:
    print(f"Something went wrong: {e}")  # Which operation failed?

# ❌ Exception as flow control
try:
    value = my_dict[key]
except KeyError:
    value = default  # Just use .get(key, default)!

The right patterns

# ✅ Specific exceptions, separate handlers
try:
    data = fetch_from_api(url)
except httpx.TimeoutException:
    logger.warning(f"API timeout for {url}, using cached data")
    data = get_cached(url)
except httpx.HTTPStatusError as e:
    if e.response.status_code == 429:
        logger.info("Rate limited, backing off")
        time.sleep(int(e.response.headers.get("Retry-After", 60)))
        data = fetch_from_api(url)  # retry once
    else:
        raise  # Re-raise unexpected HTTP errors

# ✅ Else and finally
try:
    conn = database.connect()
except ConnectionError:
    logger.error("Database unavailable")
    raise
else:
    # Only runs if NO exception — use for "success" logic
    logger.info("Connected to database")
    result = conn.execute(query)
finally:
    # ALWAYS runs — cleanup
    conn.close() if 'conn' in locals() else None
💡 Rule: Catch the most specific exception possible. If you catch Exception, you're probably hiding bugs. If you catch BaseException, you're definitely hiding bugs.

Custom Exception Hierarchies

Your application should define its own exceptions. This separates your error domain from Python's built-in exceptions and makes error handling predictable.

# app/exceptions.py

class AppError(Exception):
    """Base exception for our application.
    All custom exceptions inherit from this.
    """
    def __init__(self, message: str, code: str = "INTERNAL_ERROR", details: dict = None):
        self.message = message
        self.code = code
        self.details = details or {}
        super().__init__(self.message)


# --- Domain errors (business logic) ---
class ValidationError(AppError):
    """Input data is invalid."""
    def __init__(self, message: str, field: str = None):
        super().__init__(
            message=message,
            code="VALIDATION_ERROR",
            details={"field": field} if field else {},
        )


class NotFoundError(AppError):
    """Requested resource doesn't exist."""
    def __init__(self, resource: str, identifier: str):
        super().__init__(
            message=f"{resource} '{identifier}' not found",
            code="NOT_FOUND",
            details={"resource": resource, "identifier": identifier},
        )


class ConflictError(AppError):
    """Operation conflicts with current state."""
    def __init__(self, message: str):
        super().__init__(message=message, code="CONFLICT")


class AuthenticationError(AppError):
    """User is not authenticated."""
    def __init__(self, message: str = "Authentication required"):
        super().__init__(message=message, code="UNAUTHORIZED")


class PermissionError(AppError):
    """User lacks required permissions."""
    def __init__(self, action: str, resource: str):
        super().__init__(
            message=f"Permission denied: cannot {action} {resource}",
            code="FORBIDDEN",
            details={"action": action, "resource": resource},
        )


# --- Infrastructure errors ---
class ExternalServiceError(AppError):
    """External API/service is unavailable or returned an error."""
    def __init__(self, service: str, message: str, retry_after: int = None):
        super().__init__(
            message=f"{service}: {message}",
            code="EXTERNAL_SERVICE_ERROR",
            details={"service": service, "retry_after": retry_after},
        )


class DatabaseError(AppError):
    """Database operation failed."""
    def __init__(self, operation: str, message: str):
        super().__init__(
            message=f"Database {operation} failed: {message}",
            code="DATABASE_ERROR",
            details={"operation": operation},
        )

Using the hierarchy

from app.exceptions import NotFoundError, ValidationError, ExternalServiceError


class UserService:
    def get_user(self, user_id: int) -> User:
        user = self.repo.find_by_id(user_id)
        if not user:
            raise NotFoundError("User", str(user_id))
        return user

    def create_user(self, email: str, name: str) -> User:
        if not email or "@" not in email:
            raise ValidationError("Invalid email format", field="email")

        existing = self.repo.find_by_email(email)
        if existing:
            raise ConflictError(f"User with email {email} already exists")

        return self.repo.create(User(email=email, name=name))

    def enrich_profile(self, user_id: int) -> dict:
        user = self.get_user(user_id)
        try:
            profile = self.github_client.get_profile(user.github_username)
        except httpx.TimeoutException:
            raise ExternalServiceError("GitHub", "Request timed out", retry_after=30)
        except httpx.HTTPStatusError as e:
            raise ExternalServiceError("GitHub", f"HTTP {e.response.status_code}")
        return {**user.to_dict(), "github": profile}

Structured Error Responses (APIs)

When building APIs with FastAPI, convert your exceptions into consistent JSON responses:

# app/error_handlers.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from app.exceptions import AppError, NotFoundError, ValidationError, AuthenticationError

ERROR_STATUS_CODES = {
    "VALIDATION_ERROR": 422,
    "NOT_FOUND": 404,
    "CONFLICT": 409,
    "UNAUTHORIZED": 401,
    "FORBIDDEN": 403,
    "EXTERNAL_SERVICE_ERROR": 502,
    "DATABASE_ERROR": 500,
    "INTERNAL_ERROR": 500,
}


def register_error_handlers(app: FastAPI):

    @app.exception_handler(AppError)
    async def app_error_handler(request: Request, exc: AppError):
        status_code = ERROR_STATUS_CODES.get(exc.code, 500)
        return JSONResponse(
            status_code=status_code,
            content={
                "error": {
                    "code": exc.code,
                    "message": exc.message,
                    "details": exc.details,
                }
            },
        )

    @app.exception_handler(Exception)
    async def unhandled_error_handler(request: Request, exc: Exception):
        # Log the full traceback — never expose it to the client
        import traceback
        logger.error(
            "Unhandled exception",
            path=request.url.path,
            method=request.method,
            error=str(exc),
            traceback=traceback.format_exc(),
        )
        return JSONResponse(
            status_code=500,
            content={
                "error": {
                    "code": "INTERNAL_ERROR",
                    "message": "An unexpected error occurred",
                    "details": {},
                }
            },
        )

Now every error returns a consistent shape:

# 404
{
    "error": {
        "code": "NOT_FOUND",
        "message": "User '42' not found",
        "details": {"resource": "User", "identifier": "42"}
    }
}

# 422
{
    "error": {
        "code": "VALIDATION_ERROR",
        "message": "Invalid email format",
        "details": {"field": "email"}
    }
}

Retry Strategies

Network calls fail. Databases hiccup. APIs rate-limit you. Retry logic is essential.

import time
import random
import functools
from typing import Type


def retry(
    max_attempts: int = 3,
    exceptions: tuple[Type[Exception], ...] = (Exception,),
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    backoff_factor: float = 2.0,
    jitter: bool = True,
    on_retry: callable = None,
):
    """Decorator: retry with exponential backoff and jitter."""

    def decorator(fn):
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(1, max_attempts + 1):
                try:
                    return fn(*args, **kwargs)
                except exceptions as e:
                    last_exception = e

                    if attempt == max_attempts:
                        break

                    delay = min(base_delay * (backoff_factor ** (attempt - 1)), max_delay)
                    if jitter:
                        delay *= 0.5 + random.random()

                    if on_retry:
                        on_retry(attempt, max_attempts, e, delay)

                    time.sleep(delay)

            raise last_exception

        return wrapper
    return decorator


# --- Usage ---
@retry(
    max_attempts=3,
    exceptions=(ConnectionError, TimeoutError),
    base_delay=1.0,
    on_retry=lambda attempt, max_a, err, delay: print(
        f"Attempt {attempt}/{max_a} failed: {err}. Retrying in {delay:.1f}s"
    ),
)
def call_external_api(endpoint: str) -> dict:
    resp = httpx.get(endpoint, timeout=10.0)
    resp.raise_for_status()
    return resp.json()

Async retry

import asyncio

def async_retry(max_attempts=3, exceptions=(Exception,), base_delay=1.0):
    """Async version of retry decorator."""
    def decorator(fn):
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            last_exc = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return await fn(*args, **kwargs)
                except exceptions as e:
                    last_exc = e
                    if attempt < max_attempts:
                        delay = base_delay * (2 ** (attempt - 1)) * (0.5 + random.random())
                        await asyncio.sleep(delay)
            raise last_exc
        return wrapper
    return decorator


@async_retry(max_attempts=3, exceptions=(httpx.TimeoutException,))
async def fetch_data(url: str):
    async with httpx.AsyncClient() as client:
        resp = await client.get(url, timeout=5.0)
        return resp.json()

Error Boundaries

An error boundary contains failures — preventing one bad operation from crashing the entire system.

from contextlib import contextmanager
import logging

logger = logging.getLogger(__name__)


@contextmanager
def error_boundary(
    operation: str,
    fallback=None,
    suppress: tuple[Type[Exception], ...] = (Exception,),
    log_level: int = logging.ERROR,
):
    """Contain errors within a boundary. Return fallback on failure."""
    try:
        yield
    except suppress as e:
        logger.log(log_level, f"Error in {operation}: {e}", exc_info=True)
        if fallback is not None:
            return fallback


# --- Usage ---
def process_batch(items: list[dict]) -> list[dict]:
    """Process items — failures in individual items don't kill the batch."""
    results = []

    for item in items:
        with error_boundary(f"processing item {item.get('id')}", fallback=None):
            processed = transform(item)
            validate(processed)
            results.append(processed)

    success_rate = len(results) / len(items) * 100
    logger.info(f"Batch complete: {len(results)}/{len(items)} ({success_rate:.0f}%)")

    if success_rate < 50:
        raise RuntimeError(
            f"Batch failure rate too high: {100 - success_rate:.0f}%. "
            f"Aborting to prevent data quality issues."
        )

    return results

Error boundary for external services

class ServiceDegradation:
    """Graceful degradation when external services fail."""

    def __init__(self):
        self._fallbacks: dict[str, callable] = {}

    def register_fallback(self, service: str, fallback: callable):
        self._fallbacks[service] = fallback

    @contextmanager
    def call(self, service: str):
        """Try the primary service, fall back on failure."""
        try:
            yield
        except Exception as e:
            logger.warning(f"{service} unavailable: {e}")
            fallback = self._fallbacks.get(service)
            if fallback:
                logger.info(f"Using fallback for {service}")
                return fallback()
            raise ExternalServiceError(service, str(e))


# --- Usage ---
degradation = ServiceDegradation()
degradation.register_fallback("recommendations", lambda: [])  # empty recs
degradation.register_fallback("weather", lambda: {"temp": "N/A"})

with degradation.call("recommendations"):
    recs = recommendation_engine.get_for_user(user_id)
# If recommendation engine is down → returns [], app continues

Result Type Pattern

Instead of raising exceptions, return a Result object. Popular in Rust and increasingly in Python for operations that commonly fail.

from dataclasses import dataclass
from typing import TypeVar, Generic

T = TypeVar("T")
E = TypeVar("E")


@dataclass(frozen=True)
class Ok(Generic[T]):
    value: T

    def is_ok(self) -> bool:
        return True

    def is_err(self) -> bool:
        return False

    def unwrap(self) -> T:
        return self.value

    def unwrap_or(self, default: T) -> T:
        return self.value


@dataclass(frozen=True)
class Err(Generic[E]):
    error: E

    def is_ok(self) -> bool:
        return False

    def is_err(self) -> bool:
        return True

    def unwrap(self):
        raise ValueError(f"Called unwrap on Err: {self.error}")

    def unwrap_or(self, default):
        return default


Result = Ok | Err


# --- Usage ---
def parse_config(path: str) -> Result:
    """Parse config file — returns Ok(config) or Err(message)."""
    from pathlib import Path
    import json

    config_file = Path(path)
    if not config_file.exists():
        return Err(f"Config file not found: {path}")

    try:
        data = json.loads(config_file.read_text())
    except json.JSONDecodeError as e:
        return Err(f"Invalid JSON in {path}: {e}")

    required = ["database_url", "secret_key"]
    missing = [k for k in required if k not in data]
    if missing:
        return Err(f"Missing required keys: {missing}")

    return Ok(data)


# Caller decides how to handle
result = parse_config("config.json")

if result.is_ok():
    config = result.unwrap()
    print(f"Loaded {len(config)} settings")
else:
    print(f"Config error: {result.error}")
    config = {"database_url": "sqlite:///default.db", "secret_key": "dev-only"}

# Or with unwrap_or:
config = parse_config("config.json").unwrap_or(default_config)

Logging Errors Properly

See our logging guide for full details. Key error-logging patterns:

import logging
import traceback

logger = logging.getLogger(__name__)


# ✅ Log with exc_info for full traceback
try:
    process_payment(order)
except PaymentError as e:
    logger.error("Payment failed for order %s", order.id, exc_info=True)
    # exc_info=True includes the full traceback in logs


# ✅ Structured context
try:
    sync_inventory(warehouse_id)
except Exception as e:
    logger.error(
        "Inventory sync failed",
        extra={
            "warehouse_id": warehouse_id,
            "error_type": type(e).__name__,
            "error_message": str(e),
            "retry_count": attempt,
        },
        exc_info=True,
    )


# ✅ Different levels for different severities
except ValidationError as e:
    logger.warning("Validation failed: %s", e)     # Expected, user error
except DatabaseError as e:
    logger.error("Database error: %s", e)           # Unexpected, needs attention
except Exception as e:
    logger.critical("Unhandled error: %s", e)       # Something is very wrong

Exception Groups (Python 3.11+)

Handle multiple errors from concurrent operations:

import asyncio


async def fetch_all(urls: list[str]) -> dict:
    """Fetch multiple URLs, collect all errors."""

    async def fetch_one(url):
        async with httpx.AsyncClient() as client:
            resp = await client.get(url, timeout=10.0)
            resp.raise_for_status()
            return url, resp.json()

    results = {}
    tasks = [asyncio.create_task(fetch_one(url)) for url in urls]

    # gather with return_exceptions collects all results/errors
    outcomes = await asyncio.gather(*tasks, return_exceptions=True)

    errors = []
    for url, outcome in zip(urls, outcomes):
        if isinstance(outcome, Exception):
            errors.append(outcome)
        else:
            results[outcome[0]] = outcome[1]

    if errors:
        # Raise all errors as a group
        raise ExceptionGroup(
            f"{len(errors)} of {len(urls)} fetches failed",
            errors,
        )

    return results


# Handle with except*
try:
    data = asyncio.run(fetch_all(["https://api1.com", "https://api2.com"]))
except* httpx.TimeoutException as eg:
    print(f"{len(eg.exceptions)} timeouts")
except* httpx.HTTPStatusError as eg:
    for e in eg.exceptions:
        print(f"HTTP error: {e.response.status_code}")

Production Patterns Summary

PatternWhen to UseExample
Custom exceptionsDomain-specific errorsNotFoundError("User", "42")
Error hierarchyCatch groups of related errorsexcept AppError
Retry with backoffTransient failures (network, rate limits)@retry(max_attempts=3)
Error boundaryIsolate failures in batch processingwith error_boundary("op"):
Result typeOperations that commonly failOk(value) / Err(msg)
Graceful degradationExternal service outagesFallback to cache/defaults
Structured responsesAPI error consistencyJSON with code/message/details
Exception groupsConcurrent operation errorsexcept* TimeoutError

🚀 Want production-ready Python tools with proper error handling built in?

Get the AI Agent Toolkit →

Related Articles

Need resilient Python applications built for production? I build APIs, automation tools, and distributed systems. Reach out on Telegram →