Every developer starts with print("here") debugging. It works — until you're debugging a production issue at 3 AM, your app processes 10,000 requests per second, and you need to find the one that failed. Good logging is the difference between "I found the bug in 5 minutes" and "I've been staring at this for 3 hours."
Python's built-in logging module is powerful but underused. Most developers either ignore it entirely or configure it badly. By the end of this guide, you'll have production-grade observability that makes debugging feel almost pleasant.
Here's what proper logging gives you over print():
The standard library gives you everything you need to start. Let's build up from the basics:
import logging
# The simplest setup — configure the root logger
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
logger.debug("This won't show — level is INFO")
logger.info("Application started")
logger.warning("Disk space below 10%%")
logger.error("Failed to connect to database")
logger.critical("Data corruption detected — shutting down")
Output:
2026-03-26 10:30:00 [INFO] __main__: Application started
2026-03-26 10:30:00 [WARNING] __main__: Disk space below 10%
2026-03-26 10:30:00 [ERROR] __main__: Failed to connect to database
2026-03-26 10:30:00 [CRITICAL] __main__: Data corruption detected — shutting down
Choose the right level for each message. This matters more than you think — a noisy logger is as useless as no logger:
# DEBUG (10) — Detailed diagnostic information
# Only enable in development or when hunting a specific bug
logger.debug("Processing item %d of %d", current, total)
logger.debug("SQL query: %s with params %s", query, params)
# INFO (20) — Confirmation things are working as expected
# The "normal operation" log level
logger.info("Server started on port %d", port)
logger.info("Processed %d orders in %.2fs", count, elapsed)
# WARNING (30) — Something unexpected but not broken
# The app continues working, but you should look at this
logger.warning("API rate limit at 80%% — throttling")
logger.warning("Config file not found, using defaults")
# ERROR (40) — Something failed, but the app continues
# A specific operation failed; other operations may still work
logger.error("Payment processing failed for order %s", order_id)
logger.error("Failed to send email: %s", str(exc))
# CRITICAL (50) — The application itself is in trouble
# Usually followed by a shutdown or restart
logger.critical("Database connection pool exhausted")
logger.critical("Out of memory — cannot allocate buffer")
Never use f-strings or .format() in log messages. Use the %-style formatting that the logging module provides:
# ❌ Bad — string is always formatted, even if level is filtered out
logger.debug(f"Processing user {user_id} with data {big_dict}")
# ✅ Good — string is only formatted if the message will be emitted
logger.debug("Processing user %s with data %s", user_id, big_dict)
# ✅ Also good for extra context
logger.info("Request completed", extra={"user_id": user_id, "duration_ms": 42})
This isn't just style — it's performance. With %-style, Python skips string formatting entirely when the log level is filtered out. With f-strings, the formatting happens regardless.
Handlers determine the destination of your log messages. You can attach multiple handlers to a single logger — send errors to a file while keeping debug output in the console:
import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
import sys
def setup_logging(log_dir: str = "logs", level: str = "INFO"):
"""Configure production-ready logging with multiple handlers."""
logger = logging.getLogger()
logger.setLevel(getattr(logging, level.upper()))
# Format for file logs — detailed
file_fmt = logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s:%(funcName)s:%(lineno)d — %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
# Format for console — concise
console_fmt = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S"
)
# Console handler — INFO and above
console = logging.StreamHandler(sys.stdout)
console.setLevel(logging.INFO)
console.setFormatter(console_fmt)
logger.addHandler(console)
# File handler — ALL levels, rotates at 10MB, keeps 5 backups
file_handler = RotatingFileHandler(
f"{log_dir}/app.log",
maxBytes=10 * 1024 * 1024, # 10 MB
backupCount=5,
encoding="utf-8"
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(file_fmt)
logger.addHandler(file_handler)
# Error file — only ERROR and CRITICAL
error_handler = RotatingFileHandler(
f"{log_dir}/error.log",
maxBytes=5 * 1024 * 1024,
backupCount=10,
encoding="utf-8"
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(file_fmt)
logger.addHandler(error_handler)
return logger
For applications that run continuously, rotating logs by time is often more practical than by size:
from logging.handlers import TimedRotatingFileHandler
# Rotate daily at midnight, keep 30 days of logs
handler = TimedRotatingFileHandler(
"logs/app.log",
when="midnight",
interval=1,
backupCount=30,
encoding="utf-8",
utc=True # Use UTC timestamps in filenames
)
handler.suffix = "%Y-%m-%d" # Produces: app.log.2026-03-26
from logging.handlers import SocketHandler, SysLogHandler, HTTPHandler
# Send to a syslog server (Linux)
syslog = SysLogHandler(address=("logserver.internal", 514))
logger.addHandler(syslog)
# Send to an HTTP endpoint (webhook, log aggregator)
http = HTTPHandler(
host="logs.example.com",
url="/api/v1/ingest",
method="POST",
secure=True
)
logger.addHandler(http)
A good formatter balances human readability with machine parseability. Here are the most useful format variables:
# Available fields in format strings:
# %(asctime)s — Timestamp (2026-03-26 10:30:00)
# %(name)s — Logger name (usually module path)
# %(levelname)s — Level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
# %(funcName)s — Function name that called the logger
# %(lineno)d — Line number in source file
# %(filename)s — Source filename
# %(module)s — Module name (filename without extension)
# %(process)d — Process ID
# %(thread)d — Thread ID
# %(threadName)s — Thread name
# %(message)s — The actual log message
# Development format — maximum detail
dev_format = (
"%(asctime)s.%(msecs)03d [%(levelname)-8s] "
"%(name)s:%(funcName)s:%(lineno)d — %(message)s"
)
# Production format — clean and structured
prod_format = (
"%(asctime)s [%(levelname)s] %(name)s — %(message)s"
)
Add color to your development logs without any dependencies:
class ColorFormatter(logging.Formatter):
"""Adds ANSI color codes to log levels for terminal output."""
COLORS = {
"DEBUG": "\033[36m", # Cyan
"INFO": "\033[32m", # Green
"WARNING": "\033[33m", # Yellow
"ERROR": "\033[31m", # Red
"CRITICAL": "\033[41m", # Red background
}
RESET = "\033[0m"
def format(self, record):
color = self.COLORS.get(record.levelname, "")
record.levelname = f"{color}{record.levelname:<8}{self.RESET}"
return super().format(record)
# Usage
console = logging.StreamHandler()
console.setFormatter(ColorFormatter("%(asctime)s %(levelname)s %(message)s"))
logger.addHandler(console)
Plain text logs are great for humans reading a terminal. But when you're processing millions of log lines through Elasticsearch, Datadog, or CloudWatch, you need structured data. JSON logging makes your logs machine-parseable while remaining human-readable:
import json
import logging
from datetime import datetime, timezone
class JSONFormatter(logging.Formatter):
"""Outputs log records as single-line JSON objects."""
def format(self, record):
log_data = {
"timestamp": datetime.fromtimestamp(
record.created, tz=timezone.utc
).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# Include exception info if present
if record.exc_info and record.exc_info[0]:
log_data["exception"] = {
"type": record.exc_info[0].__name__,
"message": str(record.exc_info[1]),
"traceback": self.formatException(record.exc_info),
}
# Include extra fields passed via `extra={}` or LoggerAdapter
for key in ("user_id", "request_id", "duration_ms", "status_code",
"method", "path", "ip"):
value = getattr(record, key, None)
if value is not None:
log_data[key] = value
return json.dumps(log_data, default=str, ensure_ascii=False)
# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("myapp")
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Usage
logger.info("User logged in", extra={"user_id": "u_123", "ip": "192.168.1.1"})
logger.error("Payment failed", extra={"user_id": "u_456", "duration_ms": 2340})
Output:
{"timestamp": "2026-03-26T10:30:00.123456+00:00", "level": "INFO", "logger": "myapp", "message": "User logged in", "module": "auth", "function": "login", "line": 42, "user_id": "u_123", "ip": "192.168.1.1"}
{"timestamp": "2026-03-26T10:30:01.567890+00:00", "level": "ERROR", "logger": "myapp", "message": "Payment failed", "module": "billing", "function": "process_payment", "line": 87, "user_id": "u_456", "duration_ms": 2340}
# Using python-json-logger (pip install python-json-logger)
from pythonjsonlogger import jsonlogger
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
"%(asctime)s %(levelname)s %(name)s %(message)s",
rename_fields={"asctime": "timestamp", "levelname": "level"},
datefmt="%Y-%m-%dT%H:%M:%S%z"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
structlog is the gold standard for Python logging. It makes structured, context-rich logging feel natural instead of bolted-on:
# pip install structlog
import structlog
# Configure structlog for development (colored, pretty-printed)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="iso"),
structlog.dev.ConsoleRenderer() # Pretty console output
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
log = structlog.get_logger()
# Clean, expressive API
log.info("server.started", host="0.0.0.0", port=8000)
log.warning("rate_limit.approaching", current=450, max=500, user="u_123")
log.error("payment.failed", order_id="ord_789", reason="card_declined")
For production, switch to JSON output with one config change:
import structlog
import logging
# Production config — JSON output, stdlib integration
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="iso"),
# Convert to stdlib for handler routing
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
# Use stdlib formatter to render JSON
formatter = structlog.stdlib.ProcessorFormatter(
processor=structlog.processors.JSONRenderer()
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root = logging.getLogger()
root.addHandler(handler)
root.setLevel(logging.INFO)
The most useful logs include context — who made the request, what request ID it belongs to, which tenant or account is involved. Here's how to propagate context automatically:
import contextvars
import structlog
import uuid
# Define context variables
request_id_var = contextvars.ContextVar("request_id", default=None)
user_id_var = contextvars.ContextVar("user_id", default=None)
def bind_request_context(request_id: str = None, user_id: str = None):
"""Bind context that will appear in all subsequent log messages."""
if request_id:
structlog.contextvars.bind_contextvars(request_id=request_id)
request_id_var.set(request_id)
if user_id:
structlog.contextvars.bind_contextvars(user_id=user_id)
user_id_var.set(user_id)
def clear_request_context():
"""Clear context at the end of a request."""
structlog.contextvars.clear_contextvars()
request_id_var.set(None)
user_id_var.set(None)
# Middleware example (FastAPI)
from starlette.middleware.base import BaseHTTPMiddleware
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
bind_request_context(request_id=request_id)
log = structlog.get_logger()
log.info("request.start", method=request.method, path=request.url.path)
try:
response = await call_next(request)
log.info("request.end", status=response.status_code)
response.headers["X-Request-ID"] = request_id
return response
except Exception as exc:
log.error("request.error", error=str(exc), exc_info=True)
raise
finally:
clear_request_context()
Now every log message within a request automatically includes the request ID and user ID — no need to pass them through every function call.
import logging
class RequestLogger(logging.LoggerAdapter):
"""Automatically adds request context to all log messages."""
def process(self, msg, kwargs):
extra = kwargs.setdefault("extra", {})
extra.update(self.extra)
return msg, kwargs
# Per-request logger with context
def handle_request(request):
logger = RequestLogger(
logging.getLogger(__name__),
{"request_id": request.id, "user_id": request.user.id}
)
logger.info("Processing request") # Includes request_id and user_id
logger.error("Something failed") # Also includes context
Logging isn't just for errors — it's your best tool for understanding application performance. Here's how to track timing, resource usage, and bottlenecks:
import time
import functools
import logging
logger = logging.getLogger(__name__)
def log_timing(func=None, *, level=logging.INFO, threshold_ms=None):
"""Log function execution time. Optionally only log if above threshold."""
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = fn(*args, **kwargs)
return result
except Exception:
logger.exception("Function %s raised an exception", fn.__name__)
raise
finally:
elapsed_ms = (time.perf_counter() - start) * 1000
if threshold_ms is None or elapsed_ms >= threshold_ms:
logger.log(
level,
"Function %s completed in %.2f ms",
fn.__name__, elapsed_ms
)
return wrapper
if func is not None:
return decorator(func)
return decorator
# Usage
@log_timing
def process_data(items):
"""Always logs timing."""
return [transform(item) for item in items]
@log_timing(threshold_ms=100, level=logging.WARNING)
def query_database(sql):
"""Only logs if query takes longer than 100ms."""
return db.execute(sql)
import asyncio
import time
import functools
import logging
logger = logging.getLogger(__name__)
def log_async_timing(func=None, *, level=logging.INFO, threshold_ms=None):
"""Log async function execution time."""
def decorator(fn):
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = await fn(*args, **kwargs)
return result
except Exception:
logger.exception("Async function %s raised", fn.__name__)
raise
finally:
elapsed_ms = (time.perf_counter() - start) * 1000
if threshold_ms is None or elapsed_ms >= threshold_ms:
logger.log(
level,
"Async %s completed in %.2f ms",
fn.__name__, elapsed_ms
)
return wrapper
if func is not None:
return decorator(func)
return decorator
@log_async_timing(threshold_ms=500)
async def fetch_user_profile(user_id: str):
async with httpx.AsyncClient() as client:
resp = await client.get(f"https://api.example.com/users/{user_id}")
return resp.json()
import time
import logging
from contextlib import contextmanager
logger = logging.getLogger(__name__)
@contextmanager
def log_block(name: str, level=logging.INFO):
"""Time an arbitrary code block."""
start = time.perf_counter()
logger.log(level, "Starting: %s", name)
try:
yield
except Exception:
elapsed = (time.perf_counter() - start) * 1000
logger.error("Failed: %s after %.2f ms", name, elapsed)
raise
else:
elapsed = (time.perf_counter() - start) * 1000
logger.log(level, "Completed: %s in %.2f ms", name, elapsed)
# Usage
with log_block("data import"):
data = load_csv("big_file.csv")
processed = transform(data)
save_to_db(processed)
# Output: Starting: data import
# Completed: data import in 3421.56 ms
import os
import psutil
import logging
logger = logging.getLogger("monitor")
def log_resource_usage():
"""Log current memory and CPU usage."""
process = psutil.Process(os.getpid())
mem = process.memory_info()
cpu = process.cpu_percent(interval=0.1)
logger.info(
"Resource usage — RSS: %.1f MB, VMS: %.1f MB, CPU: %.1f%%, Threads: %d",
mem.rss / 1024 / 1024,
mem.vms / 1024 / 1024,
cpu,
process.num_threads()
)
def log_if_memory_high(threshold_mb: float = 500):
"""Warn if memory usage exceeds threshold."""
process = psutil.Process(os.getpid())
rss_mb = process.memory_info().rss / 1024 / 1024
if rss_mb > threshold_mb:
logger.warning(
"High memory usage: %.1f MB (threshold: %.1f MB)",
rss_mb, threshold_mb
)
Health checks are essential for any production application — they tell your load balancer, orchestrator, or monitoring system whether your app is alive and ready to serve traffic:
import asyncio
import time
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Awaitable
logger = logging.getLogger("health")
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class CheckResult:
name: str
status: HealthStatus
message: str = ""
duration_ms: float = 0
@dataclass
class HealthChecker:
checks: dict[str, Callable[[], Awaitable[CheckResult]]] = field(
default_factory=dict
)
def register(self, name: str):
"""Decorator to register a health check."""
def decorator(func):
self.checks[name] = func
return func
return decorator
async def run_all(self) -> dict:
"""Run all health checks and return aggregate status."""
results = []
overall = HealthStatus.HEALTHY
for name, check_fn in self.checks.items():
start = time.perf_counter()
try:
result = await asyncio.wait_for(check_fn(), timeout=5.0)
result.duration_ms = (time.perf_counter() - start) * 1000
results.append(result)
except asyncio.TimeoutError:
results.append(CheckResult(
name=name,
status=HealthStatus.UNHEALTHY,
message="Check timed out (>5s)",
duration_ms=5000
))
except Exception as e:
results.append(CheckResult(
name=name,
status=HealthStatus.UNHEALTHY,
message=str(e),
duration_ms=(time.perf_counter() - start) * 1000
))
# Overall status: worst of all checks
for r in results:
if r.status == HealthStatus.UNHEALTHY:
overall = HealthStatus.UNHEALTHY
break
elif r.status == HealthStatus.DEGRADED:
overall = HealthStatus.DEGRADED
status_data = {
"status": overall.value,
"checks": {
r.name: {
"status": r.status.value,
"message": r.message,
"duration_ms": round(r.duration_ms, 2)
}
for r in results
}
}
logger.info("Health check completed: %s", overall.value)
return status_data
# Usage with FastAPI
health = HealthChecker()
@health.register("database")
async def check_database():
try:
await db.execute("SELECT 1")
return CheckResult("database", HealthStatus.HEALTHY, "Connected")
except Exception as e:
return CheckResult("database", HealthStatus.UNHEALTHY, str(e))
@health.register("redis")
async def check_redis():
try:
pong = await redis.ping()
return CheckResult("redis", HealthStatus.HEALTHY, f"PONG={pong}")
except Exception as e:
return CheckResult("redis", HealthStatus.DEGRADED, str(e))
@health.register("disk")
async def check_disk():
import shutil
usage = shutil.disk_usage("/")
free_pct = (usage.free / usage.total) * 100
if free_pct < 5:
return CheckResult("disk", HealthStatus.UNHEALTHY,
f"Only {free_pct:.1f}% free")
elif free_pct < 15:
return CheckResult("disk", HealthStatus.DEGRADED,
f"{free_pct:.1f}% free")
return CheckResult("disk", HealthStatus.HEALTHY, f"{free_pct:.1f}% free")
# FastAPI endpoint
from fastapi import FastAPI
app = FastAPI()
@app.get("/health")
async def health_endpoint():
return await health.run_all()
Response:
{
"status": "healthy",
"checks": {
"database": {"status": "healthy", "message": "Connected", "duration_ms": 1.23},
"redis": {"status": "healthy", "message": "PONG=True", "duration_ms": 0.45},
"disk": {"status": "healthy", "message": "67.3% free", "duration_ms": 0.02}
}
}
Logging errors is the first step. Tracking them systematically — with frequency, context, and alerting — is what separates production-ready apps from "it works on my laptop" apps:
import logging
import traceback
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from threading import Lock
@dataclass
class ErrorRecord:
error_type: str
message: str
count: int = 1
first_seen: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
last_seen: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
sample_traceback: str = ""
class ErrorTracker:
"""Track error frequency and patterns."""
def __init__(self, alert_threshold: int = 10, window_minutes: int = 5):
self.errors: dict[str, ErrorRecord] = {}
self.alert_threshold = alert_threshold
self.window_minutes = window_minutes
self._lock = Lock()
self.logger = logging.getLogger("error_tracker")
def record(self, exc: Exception, context: dict = None):
"""Record an error occurrence."""
key = f"{type(exc).__name__}:{str(exc)[:100]}"
tb = traceback.format_exception(type(exc), exc, exc.__traceback__)
with self._lock:
if key in self.errors:
record = self.errors[key]
record.count += 1
record.last_seen = datetime.now(timezone.utc)
else:
record = ErrorRecord(
error_type=type(exc).__name__,
message=str(exc),
sample_traceback="".join(tb)
)
self.errors[key] = record
# Check if we need to alert
if record.count == self.alert_threshold:
self.logger.critical(
"Error threshold reached: %s occurred %d times. "
"First: %s, Latest: %s. Context: %s",
key, record.count,
record.first_seen.isoformat(),
record.last_seen.isoformat(),
context or {}
)
def get_summary(self) -> list[dict]:
"""Get error summary sorted by frequency."""
cutoff = datetime.now(timezone.utc) - timedelta(
minutes=self.window_minutes
)
with self._lock:
recent = [
{
"type": r.error_type,
"message": r.message[:200],
"count": r.count,
"first_seen": r.first_seen.isoformat(),
"last_seen": r.last_seen.isoformat(),
}
for r in self.errors.values()
if r.last_seen >= cutoff
]
return sorted(recent, key=lambda x: x["count"], reverse=True)
# Global tracker
error_tracker = ErrorTracker(alert_threshold=5, window_minutes=10)
# Usage in exception handling
try:
result = process_payment(order)
except PaymentError as e:
error_tracker.record(e, context={"order_id": order.id})
logger.error("Payment failed for order %s: %s", order.id, e)
raise
# ❌ Bad — loses traceback information
try:
risky_operation()
except Exception as e:
logger.error(f"Error: {e}")
# ❌ Bad — catches silently
try:
risky_operation()
except Exception:
pass
# ✅ Good — preserves full traceback
try:
risky_operation()
except Exception:
logger.exception("risky_operation failed")
# logger.exception() automatically includes traceback
# ✅ Good — with context for debugging
try:
result = api_client.post(url, data=payload)
except RequestError as e:
logger.error(
"API call failed: url=%s status=%s body=%s",
url, getattr(e, 'status_code', 'N/A'), str(e),
exc_info=True # Include traceback
)
raise
In production, you need log files that don't eat your disk, and a way to search across multiple files efficiently:
import logging
import gzip
import os
import shutil
from logging.handlers import RotatingFileHandler
class CompressedRotatingFileHandler(RotatingFileHandler):
"""RotatingFileHandler that compresses rotated files with gzip."""
def doRollover(self):
super().doRollover()
# Compress the just-rotated file
for i in range(self.backupCount, 0, -1):
source = f"{self.baseFilename}.{i}"
if os.path.exists(source) and not source.endswith('.gz'):
with open(source, 'rb') as f_in:
with gzip.open(f"{source}.gz", 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(source)
# Saves disk space — compressed logs are typically 90% smaller
handler = CompressedRotatingFileHandler(
"logs/app.log",
maxBytes=50 * 1024 * 1024, # 50 MB before rotation
backupCount=20, # Keep 20 rotated files
encoding="utf-8"
)
"""
Simple log shipping to a central server.
For production, use tools like Filebeat, Fluentd, or Vector.
"""
import json
import queue
import threading
import logging
import urllib.request
logger = logging.getLogger(__name__)
class AsyncHTTPHandler(logging.Handler):
"""Non-blocking HTTP log handler with batching."""
def __init__(self, url: str, batch_size: int = 50, flush_interval: float = 5.0):
super().__init__()
self.url = url
self.batch_size = batch_size
self.flush_interval = flush_interval
self._queue: queue.Queue = queue.Queue(maxsize=10000)
self._thread = threading.Thread(target=self._worker, daemon=True)
self._thread.start()
def emit(self, record):
try:
log_entry = {
"timestamp": record.created,
"level": record.levelname,
"logger": record.name,
"message": self.format(record),
}
self._queue.put_nowait(log_entry)
except queue.Full:
pass # Drop logs rather than blocking the application
def _worker(self):
batch = []
while True:
try:
entry = self._queue.get(timeout=self.flush_interval)
batch.append(entry)
if len(batch) >= self.batch_size:
self._flush(batch)
batch = []
except queue.Empty:
if batch:
self._flush(batch)
batch = []
def _flush(self, batch):
try:
data = json.dumps(batch).encode("utf-8")
req = urllib.request.Request(
self.url,
data=data,
headers={"Content-Type": "application/json"},
method="POST"
)
urllib.request.urlopen(req, timeout=5)
except Exception as e:
# Don't log here — infinite recursion risk
pass
Metrics complement logs. Logs tell you what happened; metrics tell you how the system is performing. Here's a lightweight metrics collector you can use without any external dependencies:
import time
import threading
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class MetricsCollector:
"""Simple in-process metrics without external dependencies."""
_counters: dict[str, int] = field(default_factory=lambda: defaultdict(int))
_gauges: dict[str, float] = field(default_factory=dict)
_histograms: dict[str, list] = field(
default_factory=lambda: defaultdict(list)
)
_lock: threading.Lock = field(default_factory=threading.Lock)
def increment(self, name: str, value: int = 1, tags: dict = None):
"""Increment a counter (e.g., requests_total, errors_total)."""
key = self._make_key(name, tags)
with self._lock:
self._counters[key] += value
def gauge(self, name: str, value: float, tags: dict = None):
"""Set a gauge value (e.g., active_connections, queue_size)."""
key = self._make_key(name, tags)
with self._lock:
self._gauges[key] = value
def observe(self, name: str, value: float, tags: dict = None):
"""Record a histogram observation (e.g., request_duration_ms)."""
key = self._make_key(name, tags)
with self._lock:
hist = self._histograms[key]
hist.append(value)
# Keep last 1000 observations to bound memory
if len(hist) > 1000:
self._histograms[key] = hist[-1000:]
def get_snapshot(self) -> dict:
"""Get a snapshot of all metrics."""
with self._lock:
snapshot = {
"counters": dict(self._counters),
"gauges": dict(self._gauges),
"histograms": {}
}
for key, values in self._histograms.items():
if values:
sorted_vals = sorted(values)
snapshot["histograms"][key] = {
"count": len(values),
"min": sorted_vals[0],
"max": sorted_vals[-1],
"avg": sum(values) / len(values),
"p50": sorted_vals[len(values) // 2],
"p95": sorted_vals[int(len(values) * 0.95)],
"p99": sorted_vals[int(len(values) * 0.99)],
}
return snapshot
@staticmethod
def _make_key(name: str, tags: dict = None) -> str:
if not tags:
return name
tag_str = ",".join(f"{k}={v}" for k, v in sorted(tags.items()))
return f"{name}{{{tag_str}}}"
# Global metrics instance
metrics = MetricsCollector()
# Usage in your application
metrics.increment("http_requests_total", tags={"method": "GET", "path": "/api/users"})
metrics.increment("http_requests_total", tags={"method": "POST", "path": "/api/orders"})
metrics.observe("http_request_duration_ms", 45.2, tags={"path": "/api/users"})
metrics.gauge("active_connections", 42)
metrics.gauge("queue_size", 156)
# Expose via endpoint
@app.get("/metrics")
async def metrics_endpoint():
return metrics.get_snapshot()
import logging.config
import yaml
# logging_config.yaml
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json": {
"()": "pythonjsonlogger.jsonlogger.JsonFormatter",
"format": "%(asctime)s %(levelname)s %(name)s %(message)s",
},
"simple": {
"format": "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "simple",
"stream": "ext://sys.stdout",
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"formatter": "json",
"filename": "logs/app.log",
"maxBytes": 10485760,
"backupCount": 5,
},
"error_file": {
"class": "logging.handlers.RotatingFileHandler",
"formatter": "json",
"filename": "logs/error.log",
"maxBytes": 5242880,
"backupCount": 10,
"level": "ERROR",
},
},
"loggers": {
"myapp": {"level": "INFO", "handlers": ["console", "file"]},
"myapp.db": {"level": "WARNING"}, # Quiet down DB logs
"uvicorn": {"level": "WARNING"},
"httpx": {"level": "WARNING"},
},
"root": {
"level": "INFO",
"handlers": ["console", "file", "error_file"],
},
}
logging.config.dictConfig(LOGGING_CONFIG)
import os
import logging
def configure_logging():
"""Set up logging based on environment."""
env = os.getenv("APP_ENV", "development")
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
if env == "production":
# JSON logs, no debug, ship to aggregator
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.root.addHandler(handler)
logging.root.setLevel(getattr(logging, log_level))
# Silence noisy libraries
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("asyncio").setLevel(logging.WARNING)
elif env == "development":
# Colorful console output, debug enabled
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
elif env == "testing":
# Minimal output during tests
logging.basicConfig(level=logging.WARNING)
# ❌ Don't log sensitive data
logger.info("User login: email=%s password=%s", email, password)
# ✅ Do redact or omit sensitive fields
logger.info("User login: email=%s", email)
# ❌ Don't use the root logger directly
logging.info("Something happened")
# ✅ Do use named loggers
logger = logging.getLogger(__name__)
logger.info("Something happened")
# ❌ Don't catch and log without re-raising (swallows errors)
try:
process()
except Exception:
logger.exception("Failed")
# Error is now invisible to the caller!
# ✅ Do re-raise or handle meaningfully
try:
process()
except Exception:
logger.exception("Failed")
raise # Or return an error response
# ❌ Don't log in tight loops (kills performance)
for item in million_items:
logger.debug("Processing %s", item)
# ✅ Do log summaries or use sampling
processed = 0
for item in million_items:
process(item)
processed += 1
if processed % 10000 == 0:
logger.info("Processed %d/%d items", processed, len(million_items))
logger.info("Completed: %d items processed", processed)
import uuid
def create_correlation_id() -> str:
"""Generate a unique ID to trace requests across services."""
return str(uuid.uuid4())
# Pass correlation ID in HTTP headers
async def call_downstream_service(client, url, correlation_id):
headers = {"X-Correlation-ID": correlation_id}
logger.info("Calling downstream: %s", url,
extra={"correlation_id": correlation_id})
response = await client.get(url, headers=headers)
return response
# Extract in receiving service
def extract_correlation_id(request) -> str:
return request.headers.get("X-Correlation-ID", create_correlation_id())
Let's put it all together — a FastAPI application with structured logging, health checks, metrics, and performance tracking:
"""
observable_app.py — Production-ready FastAPI with full observability.
"""
import os
import time
import uuid
import logging
import logging.config
from contextlib import asynccontextmanager
import structlog
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
# ─── Logging Configuration ───
def setup_logging():
env = os.getenv("APP_ENV", "development")
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
if env == "production":
renderer = structlog.processors.JSONRenderer()
else:
renderer = structlog.dev.ConsoleRenderer()
formatter = structlog.stdlib.ProcessorFormatter(processor=renderer)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
root.setLevel(logging.INFO)
# Quiet noisy loggers
for name in ("uvicorn.access", "httpx", "asyncio"):
logging.getLogger(name).setLevel(logging.WARNING)
# ─── Metrics ───
from collections import defaultdict
from threading import Lock
class AppMetrics:
def __init__(self):
self._lock = Lock()
self.requests_total = defaultdict(int)
self.request_durations = defaultdict(list)
self.errors_total = defaultdict(int)
self.active_requests = 0
def record_request(self, method, path, status, duration_ms):
with self._lock:
key = f"{method} {path} {status}"
self.requests_total[key] += 1
self.request_durations[path].append(duration_ms)
if status >= 500:
self.errors_total[key] += 1
def snapshot(self):
with self._lock:
return {
"requests": dict(self.requests_total),
"errors": dict(self.errors_total),
"active": self.active_requests,
"latency": {
path: {
"count": len(vals),
"avg_ms": round(sum(vals) / len(vals), 2),
"p99_ms": round(
sorted(vals)[int(len(vals) * 0.99)], 2
) if vals else 0,
}
for path, vals in self.request_durations.items()
if vals
},
}
metrics = AppMetrics()
# ─── App Setup ───
@asynccontextmanager
async def lifespan(app: FastAPI):
setup_logging()
log = structlog.get_logger()
log.info("app.starting", env=os.getenv("APP_ENV", "development"))
yield
log.info("app.shutdown")
app = FastAPI(title="Observable App", lifespan=lifespan)
# ─── Middleware ───
@app.middleware("http")
async def observability_middleware(request: Request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=request_id,
method=request.method,
path=request.url.path,
)
log = structlog.get_logger()
start = time.perf_counter()
metrics.active_requests += 1
try:
response = await call_next(request)
duration_ms = (time.perf_counter() - start) * 1000
log.info(
"request.completed",
status=response.status_code,
duration_ms=round(duration_ms, 2),
)
metrics.record_request(
request.method, request.url.path,
response.status_code, duration_ms
)
response.headers["X-Request-ID"] = request_id
response.headers["X-Response-Time"] = f"{duration_ms:.2f}ms"
return response
except Exception as exc:
duration_ms = (time.perf_counter() - start) * 1000
log.error("request.failed", error=str(exc), duration_ms=duration_ms,
exc_info=True)
metrics.record_request(
request.method, request.url.path, 500, duration_ms
)
return JSONResponse(
status_code=500,
content={"error": "Internal server error", "request_id": request_id}
)
finally:
metrics.active_requests -= 1
# ─── Endpoints ───
@app.get("/health")
async def health():
return {
"status": "healthy",
"uptime": time.time(),
"metrics": metrics.snapshot(),
}
@app.get("/metrics")
async def metrics_endpoint():
return metrics.snapshot()
@app.get("/api/example")
async def example():
log = structlog.get_logger()
log.info("example.processing", step="start")
# Simulate work
await asyncio.sleep(0.01)
log.info("example.done", items_processed=42)
return {"result": "ok", "items": 42}
Want 50+ production-ready Python scripts like this?
The AI Agent Toolkit includes logging configs, monitoring scripts, health check templates, and more — ready to drop into your projects.
Get the Toolkit — $19 →AI Agent Toolkit — 50+ Python scripts, prompts & templates for $19