Python Microservices — Build, Deploy & Scale
Microservices aren't about splitting a monolith into tiny pieces and hoping for the best. They're about building independent services that can be deployed, scaled, and maintained separately. This guide covers the patterns that actually work — with Python code you can run today.
We'll build a small e-commerce system: an Order Service, an Inventory Service, and a Notification Service. Each runs independently, communicates through well-defined interfaces, and can be deployed in Docker.
When Microservices Make Sense
Before we build anything — an honest assessment:
- Use microservices when: different parts scale independently, teams work on separate services, you need polyglot (mix Python + Go + Rust), you have clear domain boundaries
- Stick with a monolith when: small team (<5 devs), early-stage product, unclear domain boundaries, you don't have DevOps capacity
Project Structure
ecommerce/
├── services/
│ ├── orders/
│ │ ├── app/
│ │ │ ├── __init__.py
│ │ │ ├── main.py # FastAPI app
│ │ │ ├── models.py # SQLAlchemy models
│ │ │ ├── schemas.py # Pydantic schemas
│ │ │ ├── service.py # Business logic
│ │ │ └── events.py # Event publishing
│ │ ├── Dockerfile
│ │ └── requirements.txt
│ ├── inventory/
│ │ ├── app/
│ │ │ ├── main.py
│ │ │ ├── models.py
│ │ │ └── service.py
│ │ ├── Dockerfile
│ │ └── requirements.txt
│ └── notifications/
│ ├── app/
│ │ ├── main.py
│ │ └── handlers.py
│ ├── Dockerfile
│ └── requirements.txt
├── shared/
│ └── events.py # Shared event schemas
├── docker-compose.yml
└── README.md
Service 1: Order Service (REST API)
The Order Service exposes a FastAPI REST API and publishes events when orders are created.
# services/orders/app/schemas.py
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
class OrderStatus(str, Enum):
pending = "pending"
confirmed = "confirmed"
shipped = "shipped"
cancelled = "cancelled"
class OrderItem(BaseModel):
product_id: str
quantity: int = Field(..., gt=0)
unit_price: float = Field(..., gt=0)
class CreateOrder(BaseModel):
customer_email: str
items: list[OrderItem] = Field(..., min_length=1)
class OrderResponse(BaseModel):
id: str
customer_email: str
items: list[OrderItem]
status: OrderStatus
total: float
created_at: datetime
model_config = {"from_attributes": True}
# services/orders/app/service.py
import uuid
from datetime import datetime, timezone
from .schemas import CreateOrder, OrderStatus
class OrderService:
"""Business logic for orders. No framework dependencies."""
def __init__(self, db_session, event_publisher):
self.db = db_session
self.events = event_publisher
async def create_order(self, data: CreateOrder) -> dict:
order_id = str(uuid.uuid4())
total = sum(item.quantity * item.unit_price for item in data.items)
order = {
"id": order_id,
"customer_email": data.customer_email,
"items": [item.model_dump() for item in data.items],
"status": OrderStatus.pending,
"total": total,
"created_at": datetime.now(timezone.utc),
}
# Save to DB (simplified)
await self.db.save("orders", order)
# Publish event — other services react
await self.events.publish("order.created", {
"order_id": order_id,
"customer_email": data.customer_email,
"items": order["items"],
"total": total,
})
return order
async def get_order(self, order_id: str) -> dict | None:
return await self.db.get("orders", order_id)
async def update_status(self, order_id: str, status: OrderStatus):
order = await self.db.get("orders", order_id)
if not order:
raise ValueError(f"Order {order_id} not found")
order["status"] = status
await self.db.save("orders", order)
await self.events.publish("order.status_changed", {
"order_id": order_id,
"new_status": status,
"customer_email": order["customer_email"],
})
return order
# services/orders/app/main.py
from fastapi import FastAPI, HTTPException
from .schemas import CreateOrder, OrderResponse
from .service import OrderService
from .events import RedisEventPublisher
from .db import get_db
app = FastAPI(title="Order Service", version="1.0.0")
def get_order_service():
return OrderService(
db_session=get_db(),
event_publisher=RedisEventPublisher(),
)
@app.post("/orders", response_model=OrderResponse, status_code=201)
async def create_order(data: CreateOrder):
svc = get_order_service()
return await svc.create_order(data)
@app.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(order_id: str):
svc = get_order_service()
order = await svc.get_order(order_id)
if not order:
raise HTTPException(404, "Order not found")
return order
@app.get("/health")
def health():
return {"service": "orders", "status": "healthy"}
Inter-Service Communication
Services need to talk. Two main patterns:
| Pattern | When to Use | Tools |
|---|---|---|
| Synchronous (HTTP/gRPC) | Need immediate response | httpx, grpcio |
| Asynchronous (Events) | Fire-and-forget, decoupling | Redis Pub/Sub, RabbitMQ, Kafka |
Event publishing with Redis
# services/orders/app/events.py
import json
import redis.asyncio as redis
class RedisEventPublisher:
"""Publish events to Redis Pub/Sub channels."""
def __init__(self, url: str = "redis://redis:6379"):
self.redis = redis.from_url(url)
async def publish(self, event_type: str, data: dict):
message = json.dumps({
"event": event_type,
"data": data,
"timestamp": __import__("datetime").datetime.now(
__import__("datetime").timezone.utc
).isoformat(),
}, default=str)
await self.redis.publish(event_type, message)
async def close(self):
await self.redis.close()
class RedisEventSubscriber:
"""Subscribe to events from Redis Pub/Sub."""
def __init__(self, url: str = "redis://redis:6379"):
self.redis = redis.from_url(url)
self.pubsub = self.redis.pubsub()
self._handlers: dict[str, list] = {}
def on(self, event_type: str):
"""Decorator to register event handlers."""
def decorator(fn):
self._handlers.setdefault(event_type, []).append(fn)
return fn
return decorator
async def start(self, *event_types: str):
"""Subscribe and start processing events."""
await self.pubsub.subscribe(*event_types)
async for message in self.pubsub.listen():
if message["type"] != "message":
continue
channel = message["channel"]
if isinstance(channel, bytes):
channel = channel.decode()
data = json.loads(message["data"])
handlers = self._handlers.get(channel, [])
for handler in handlers:
try:
await handler(data["data"])
except Exception as e:
print(f"Error in handler for {channel}: {e}")
gRPC for synchronous calls
When the Order Service needs to check inventory before confirming an order, use gRPC — it's faster than HTTP and provides strong typing.
# shared/inventory.proto
syntax = "proto3";
package inventory;
service InventoryService {
rpc CheckStock (StockRequest) returns (StockResponse);
rpc ReserveItems (ReserveRequest) returns (ReserveResponse);
}
message StockRequest {
string product_id = 1;
}
message StockResponse {
string product_id = 1;
int32 available = 2;
bool in_stock = 3;
}
message ReserveRequest {
string order_id = 1;
repeated ReserveItem items = 2;
}
message ReserveItem {
string product_id = 1;
int32 quantity = 2;
}
message ReserveResponse {
bool success = 1;
string message = 2;
}
# Generate Python code from proto
# pip install grpcio grpcio-tools
# python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. inventory.proto
# services/inventory/app/grpc_server.py
import grpc
from concurrent import futures
import inventory_pb2
import inventory_pb2_grpc
class InventoryServicer(inventory_pb2_grpc.InventoryServiceServicer):
"""gRPC implementation of inventory service."""
def __init__(self, db):
self.db = db
def CheckStock(self, request, context):
product = self.db.get_product(request.product_id)
if not product:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"Product {request.product_id} not found")
return inventory_pb2.StockResponse()
return inventory_pb2.StockResponse(
product_id=request.product_id,
available=product["stock"],
in_stock=product["stock"] > 0,
)
def ReserveItems(self, request, context):
for item in request.items:
product = self.db.get_product(item.product_id)
if not product or product["stock"] < item.quantity:
return inventory_pb2.ReserveResponse(
success=False,
message=f"Insufficient stock for {item.product_id}",
)
# Reserve (decrement stock)
for item in request.items:
self.db.decrement_stock(item.product_id, item.quantity)
return inventory_pb2.ReserveResponse(
success=True,
message=f"Reserved items for order {request.order_id}",
)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
inventory_pb2_grpc.add_InventoryServiceServicer_to_server(
InventoryServicer(db=get_db()), server
)
server.add_insecure_port("[::]:50051")
server.start()
print("Inventory gRPC server on :50051")
server.wait_for_termination()
# services/orders/app/inventory_client.py
import grpc
import inventory_pb2
import inventory_pb2_grpc
class InventoryClient:
"""gRPC client for inventory service."""
def __init__(self, host: str = "inventory:50051"):
self.channel = grpc.insecure_channel(host)
self.stub = inventory_pb2_grpc.InventoryServiceStub(self.channel)
def check_stock(self, product_id: str) -> dict:
try:
resp = self.stub.CheckStock(
inventory_pb2.StockRequest(product_id=product_id),
timeout=5.0,
)
return {
"product_id": resp.product_id,
"available": resp.available,
"in_stock": resp.in_stock,
}
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
return None
raise
def reserve(self, order_id: str, items: list[dict]) -> bool:
req = inventory_pb2.ReserveRequest(
order_id=order_id,
items=[
inventory_pb2.ReserveItem(
product_id=i["product_id"],
quantity=i["quantity"],
)
for i in items
],
)
resp = self.stub.ReserveItems(req, timeout=10.0)
return resp.success
Service 3: Notification Service (Event Consumer)
This service doesn't expose an API — it listens for events and sends notifications.
# services/notifications/app/main.py
import asyncio
from .handlers import setup_handlers
from .events import RedisEventSubscriber
async def main():
subscriber = RedisEventSubscriber()
setup_handlers(subscriber)
print("📬 Notification service started, listening for events...")
await subscriber.start(
"order.created",
"order.status_changed",
)
if __name__ == "__main__":
asyncio.run(main())
# services/notifications/app/handlers.py
import smtplib
from email.mime.text import MIMEText
def setup_handlers(subscriber):
"""Register event handlers for notifications."""
@subscriber.on("order.created")
async def on_order_created(data: dict):
email = data["customer_email"]
order_id = data["order_id"]
total = data["total"]
send_email(
to=email,
subject=f"Order {order_id[:8]} confirmed!",
body=f"Your order for ${total:.2f} has been received. "
f"We'll notify you when it ships.",
)
print(f"📧 Order confirmation sent to {email}")
@subscriber.on("order.status_changed")
async def on_status_changed(data: dict):
status = data["new_status"]
email = data["customer_email"]
order_id = data["order_id"]
messages = {
"confirmed": "Your order has been confirmed!",
"shipped": "Your order is on its way! 🚚",
"cancelled": "Your order has been cancelled.",
}
if status in messages:
send_email(
to=email,
subject=f"Order {order_id[:8]} — {status}",
body=messages[status],
)
print(f"📧 Status update ({status}) sent to {email}")
def send_email(to: str, subject: str, body: str):
"""Send email via SMTP (simplified)."""
msg = MIMEText(body)
msg["Subject"] = subject
msg["To"] = to
msg["From"] = "orders@example.com"
# In production: use env vars, connection pooling, retry
try:
with smtplib.SMTP("mailhog:1025") as server:
server.send_message(msg)
except Exception as e:
print(f"⚠️ Email failed: {e}")
Docker Compose — Run Everything
See our Docker guide for Dockerfile details.
# docker-compose.yml
services:
# --- Infrastructure ---
redis:
image: redis:7-alpine
ports: ["6379:6379"]
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: ecommerce
POSTGRES_PASSWORD: postgres
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
mailhog:
image: mailhog/mailhog
ports:
- "8025:8025" # Web UI
- "1025:1025" # SMTP
# --- Services ---
orders:
build: ./services/orders
ports: ["8001:8000"]
environment:
DATABASE_URL: postgresql://postgres:postgres@postgres:5432/ecommerce
REDIS_URL: redis://redis:6379
depends_on:
redis: { condition: service_healthy }
postgres: { condition: service_healthy }
inventory:
build: ./services/inventory
ports:
- "8002:8000" # REST
- "50051:50051" # gRPC
environment:
DATABASE_URL: postgresql://postgres:postgres@postgres:5432/ecommerce
depends_on:
postgres: { condition: service_healthy }
notifications:
build: ./services/notifications
environment:
REDIS_URL: redis://redis:6379
SMTP_HOST: mailhog
SMTP_PORT: "1025"
depends_on:
redis: { condition: service_healthy }
volumes:
pgdata:
# Start everything
docker compose up -d
# Check health
curl http://localhost:8001/health # orders
curl http://localhost:8002/health # inventory
# Create an order
curl -X POST http://localhost:8001/orders \
-H "Content-Type: application/json" \
-d '{
"customer_email": "alice@example.com",
"items": [{"product_id": "WIDGET-1", "quantity": 2, "unit_price": 29.99}]
}'
# Check email at http://localhost:8025 (MailHog UI)
Service Discovery & API Gateway
Simple approach: Docker Compose DNS
# Services reference each other by container name
# orders → inventory:50051 (gRPC)
# orders → redis:6379 (events)
# notifications → redis:6379 (events)
# No service discovery needed!
Production: API Gateway with nginx
# nginx.conf — route requests to services
upstream orders {
server orders:8000;
}
upstream inventory {
server inventory:8000;
}
server {
listen 80;
location /api/orders {
proxy_pass http://orders;
proxy_set_header Host $host;
proxy_set_header X-Request-ID $request_id;
}
location /api/inventory {
proxy_pass http://inventory;
proxy_set_header Host $host;
proxy_set_header X-Request-ID $request_id;
}
location /health {
return 200 '{"gateway": "healthy"}';
add_header Content-Type application/json;
}
}
Resilience Patterns
Circuit breaker
import time
from enum import Enum
from functools import wraps
class CircuitState(Enum):
CLOSED = "closed" # Normal — requests flow
OPEN = "open" # Tripped — fast-fail
HALF_OPEN = "half_open" # Testing — allow one request
class CircuitBreaker:
"""Prevent cascading failures between services."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
expected_exceptions: tuple = (Exception,),
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exceptions = expected_exceptions
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0.0
self.success_count = 0
def __call__(self, fn):
@wraps(fn)
async def wrapper(*args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise CircuitBreakerOpen(
f"Circuit open for {fn.__name__}. "
f"Retry after {self.recovery_timeout}s"
)
try:
result = await fn(*args, **kwargs)
self._on_success()
return result
except self.expected_exceptions as e:
self._on_failure()
raise
return wrapper
def _on_success(self):
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= 3: # 3 successes to fully close
self.state = CircuitState.CLOSED
self.failure_count = 0
else:
self.failure_count = 0
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class CircuitBreakerOpen(Exception):
pass
# --- Usage ---
inventory_breaker = CircuitBreaker(
failure_threshold=3,
recovery_timeout=15.0,
expected_exceptions=(ConnectionError, TimeoutError),
)
@inventory_breaker
async def check_inventory(product_id: str):
"""Call inventory service with circuit breaker protection."""
async with httpx.AsyncClient() as client:
resp = await client.get(
f"http://inventory:8000/stock/{product_id}",
timeout=5.0,
)
resp.raise_for_status()
return resp.json()
Retry with exponential backoff
import asyncio
import random
async def retry_async(
fn,
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
jitter: bool = True,
):
"""Retry with exponential backoff + jitter."""
last_error = None
for attempt in range(1, max_attempts + 1):
try:
return await fn()
except Exception as e:
last_error = e
if attempt == max_attempts:
break
delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
if jitter:
delay *= (0.5 + random.random()) # 50-150% of delay
print(f"Attempt {attempt} failed: {e}. Retrying in {delay:.1f}s...")
await asyncio.sleep(delay)
raise last_error
Observability
In microservices, you can't just tail -f app.log. You need distributed tracing and structured logging. See our logging & monitoring guide for details.
# Structured logging — every service uses the same format
import structlog
import uuid
logger = structlog.get_logger()
class RequestContext:
"""Propagate trace IDs across service calls."""
def __init__(self, request_id: str = None):
self.request_id = request_id or str(uuid.uuid4())
def log(self, event: str, **kwargs):
logger.info(
event,
request_id=self.request_id,
service="orders",
**kwargs,
)
# Middleware: extract/inject trace ID
from fastapi import Request
@app.middleware("http")
async def trace_middleware(request: Request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
request.state.ctx = RequestContext(request_id)
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
Testing Microservices
Test at three levels. See our testing guide for pytest fundamentals.
# 1. Unit tests — test service logic in isolation
import pytest
from unittest.mock import AsyncMock
from app.service import OrderService
from app.schemas import CreateOrder, OrderItem
@pytest.mark.asyncio
async def test_create_order():
mock_db = AsyncMock()
mock_events = AsyncMock()
svc = OrderService(db_session=mock_db, event_publisher=mock_events)
order = await svc.create_order(CreateOrder(
customer_email="test@example.com",
items=[OrderItem(product_id="W-1", quantity=2, unit_price=10.0)],
))
assert order["total"] == 20.0
assert order["status"] == "pending"
mock_events.publish.assert_called_once()
# 2. Integration tests — test with real Redis/Postgres
@pytest.mark.asyncio
async def test_event_published(redis_client):
"""Verify events reach Redis."""
pubsub = redis_client.pubsub()
await pubsub.subscribe("order.created")
# Create order...
# Assert event received on channel
# 3. Contract tests — verify API schema
def test_order_response_schema():
"""Ensure response matches the contract other services expect."""
from app.schemas import OrderResponse
data = {
"id": "abc-123",
"customer_email": "a@b.com",
"items": [{"product_id": "W-1", "quantity": 1, "unit_price": 5.0}],
"status": "pending",
"total": 5.0,
"created_at": "2026-01-01T00:00:00Z",
}
# If this fails, you broke the contract
OrderResponse.model_validate(data)
Production Checklist
- Health checks — every service has /health that checks dependencies (DB, Redis)
- Graceful shutdown — handle SIGTERM, drain connections, finish in-flight requests
- Idempotency — events can be delivered twice; make handlers idempotent (check "already processed")
- Dead letter queues — failed events go somewhere you can inspect and retry
- Timeouts everywhere — every outbound call has a timeout (HTTP, gRPC, DB)
- Circuit breakers — protect against cascading failures
- Distributed tracing — propagate request IDs across all service calls
- Centralized logging — aggregate logs with ELK, Loki, or Datadog
- Database per service — each service owns its data; no shared databases
- Schema registry — version your event schemas; don't break consumers
🚀 Want production-ready microservice templates, API tools, and automation scripts?
Related Articles
- Build a REST API with FastAPI — the foundation for HTTP microservices
- Dockerize Python Apps — containerize each service
- Python Async Programming — asyncio for high-performance services
- Python Design Patterns — Repository, Factory, Observer patterns used here
- Python Logging & Monitoring — observability for distributed systems
- Python Database Operations — database-per-service patterns
Need help designing or building a microservice architecture? I build Python APIs, distributed systems, and automation tools. Reach out on Telegram →