Python Microservices — Build, Deploy & Scale

March 2026 · 25 min read · Python, Microservices, Architecture, Docker

Microservices aren't about splitting a monolith into tiny pieces and hoping for the best. They're about building independent services that can be deployed, scaled, and maintained separately. This guide covers the patterns that actually work — with Python code you can run today.

We'll build a small e-commerce system: an Order Service, an Inventory Service, and a Notification Service. Each runs independently, communicates through well-defined interfaces, and can be deployed in Docker.

When Microservices Make Sense

Before we build anything — an honest assessment:

💡 Rule of thumb: Start with a well-structured monolith. Extract services only when you hit a real scaling or organizational bottleneck. Premature microservices are the #1 architecture mistake.

Project Structure

ecommerce/
├── services/
│   ├── orders/
│   │   ├── app/
│   │   │   ├── __init__.py
│   │   │   ├── main.py        # FastAPI app
│   │   │   ├── models.py      # SQLAlchemy models
│   │   │   ├── schemas.py     # Pydantic schemas
│   │   │   ├── service.py     # Business logic
│   │   │   └── events.py      # Event publishing
│   │   ├── Dockerfile
│   │   └── requirements.txt
│   ├── inventory/
│   │   ├── app/
│   │   │   ├── main.py
│   │   │   ├── models.py
│   │   │   └── service.py
│   │   ├── Dockerfile
│   │   └── requirements.txt
│   └── notifications/
│       ├── app/
│       │   ├── main.py
│       │   └── handlers.py
│       ├── Dockerfile
│       └── requirements.txt
├── shared/
│   └── events.py              # Shared event schemas
├── docker-compose.yml
└── README.md

Service 1: Order Service (REST API)

The Order Service exposes a FastAPI REST API and publishes events when orders are created.

# services/orders/app/schemas.py
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum


class OrderStatus(str, Enum):
    pending = "pending"
    confirmed = "confirmed"
    shipped = "shipped"
    cancelled = "cancelled"


class OrderItem(BaseModel):
    product_id: str
    quantity: int = Field(..., gt=0)
    unit_price: float = Field(..., gt=0)


class CreateOrder(BaseModel):
    customer_email: str
    items: list[OrderItem] = Field(..., min_length=1)


class OrderResponse(BaseModel):
    id: str
    customer_email: str
    items: list[OrderItem]
    status: OrderStatus
    total: float
    created_at: datetime

    model_config = {"from_attributes": True}
# services/orders/app/service.py
import uuid
from datetime import datetime, timezone

from .schemas import CreateOrder, OrderStatus


class OrderService:
    """Business logic for orders. No framework dependencies."""

    def __init__(self, db_session, event_publisher):
        self.db = db_session
        self.events = event_publisher

    async def create_order(self, data: CreateOrder) -> dict:
        order_id = str(uuid.uuid4())
        total = sum(item.quantity * item.unit_price for item in data.items)

        order = {
            "id": order_id,
            "customer_email": data.customer_email,
            "items": [item.model_dump() for item in data.items],
            "status": OrderStatus.pending,
            "total": total,
            "created_at": datetime.now(timezone.utc),
        }

        # Save to DB (simplified)
        await self.db.save("orders", order)

        # Publish event — other services react
        await self.events.publish("order.created", {
            "order_id": order_id,
            "customer_email": data.customer_email,
            "items": order["items"],
            "total": total,
        })

        return order

    async def get_order(self, order_id: str) -> dict | None:
        return await self.db.get("orders", order_id)

    async def update_status(self, order_id: str, status: OrderStatus):
        order = await self.db.get("orders", order_id)
        if not order:
            raise ValueError(f"Order {order_id} not found")

        order["status"] = status
        await self.db.save("orders", order)

        await self.events.publish("order.status_changed", {
            "order_id": order_id,
            "new_status": status,
            "customer_email": order["customer_email"],
        })
        return order
# services/orders/app/main.py
from fastapi import FastAPI, HTTPException
from .schemas import CreateOrder, OrderResponse
from .service import OrderService
from .events import RedisEventPublisher
from .db import get_db

app = FastAPI(title="Order Service", version="1.0.0")


def get_order_service():
    return OrderService(
        db_session=get_db(),
        event_publisher=RedisEventPublisher(),
    )


@app.post("/orders", response_model=OrderResponse, status_code=201)
async def create_order(data: CreateOrder):
    svc = get_order_service()
    return await svc.create_order(data)


@app.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(order_id: str):
    svc = get_order_service()
    order = await svc.get_order(order_id)
    if not order:
        raise HTTPException(404, "Order not found")
    return order


@app.get("/health")
def health():
    return {"service": "orders", "status": "healthy"}

Inter-Service Communication

Services need to talk. Two main patterns:

PatternWhen to UseTools
Synchronous (HTTP/gRPC)Need immediate responsehttpx, grpcio
Asynchronous (Events)Fire-and-forget, decouplingRedis Pub/Sub, RabbitMQ, Kafka

Event publishing with Redis

# services/orders/app/events.py
import json
import redis.asyncio as redis


class RedisEventPublisher:
    """Publish events to Redis Pub/Sub channels."""

    def __init__(self, url: str = "redis://redis:6379"):
        self.redis = redis.from_url(url)

    async def publish(self, event_type: str, data: dict):
        message = json.dumps({
            "event": event_type,
            "data": data,
            "timestamp": __import__("datetime").datetime.now(
                __import__("datetime").timezone.utc
            ).isoformat(),
        }, default=str)
        await self.redis.publish(event_type, message)

    async def close(self):
        await self.redis.close()


class RedisEventSubscriber:
    """Subscribe to events from Redis Pub/Sub."""

    def __init__(self, url: str = "redis://redis:6379"):
        self.redis = redis.from_url(url)
        self.pubsub = self.redis.pubsub()
        self._handlers: dict[str, list] = {}

    def on(self, event_type: str):
        """Decorator to register event handlers."""
        def decorator(fn):
            self._handlers.setdefault(event_type, []).append(fn)
            return fn
        return decorator

    async def start(self, *event_types: str):
        """Subscribe and start processing events."""
        await self.pubsub.subscribe(*event_types)

        async for message in self.pubsub.listen():
            if message["type"] != "message":
                continue

            channel = message["channel"]
            if isinstance(channel, bytes):
                channel = channel.decode()

            data = json.loads(message["data"])
            handlers = self._handlers.get(channel, [])

            for handler in handlers:
                try:
                    await handler(data["data"])
                except Exception as e:
                    print(f"Error in handler for {channel}: {e}")

gRPC for synchronous calls

When the Order Service needs to check inventory before confirming an order, use gRPC — it's faster than HTTP and provides strong typing.

# shared/inventory.proto
syntax = "proto3";

package inventory;

service InventoryService {
    rpc CheckStock (StockRequest) returns (StockResponse);
    rpc ReserveItems (ReserveRequest) returns (ReserveResponse);
}

message StockRequest {
    string product_id = 1;
}

message StockResponse {
    string product_id = 1;
    int32 available = 2;
    bool in_stock = 3;
}

message ReserveRequest {
    string order_id = 1;
    repeated ReserveItem items = 2;
}

message ReserveItem {
    string product_id = 1;
    int32 quantity = 2;
}

message ReserveResponse {
    bool success = 1;
    string message = 2;
}
# Generate Python code from proto
# pip install grpcio grpcio-tools
# python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. inventory.proto
# services/inventory/app/grpc_server.py
import grpc
from concurrent import futures
import inventory_pb2
import inventory_pb2_grpc


class InventoryServicer(inventory_pb2_grpc.InventoryServiceServicer):
    """gRPC implementation of inventory service."""

    def __init__(self, db):
        self.db = db

    def CheckStock(self, request, context):
        product = self.db.get_product(request.product_id)
        if not product:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"Product {request.product_id} not found")
            return inventory_pb2.StockResponse()

        return inventory_pb2.StockResponse(
            product_id=request.product_id,
            available=product["stock"],
            in_stock=product["stock"] > 0,
        )

    def ReserveItems(self, request, context):
        for item in request.items:
            product = self.db.get_product(item.product_id)
            if not product or product["stock"] < item.quantity:
                return inventory_pb2.ReserveResponse(
                    success=False,
                    message=f"Insufficient stock for {item.product_id}",
                )

        # Reserve (decrement stock)
        for item in request.items:
            self.db.decrement_stock(item.product_id, item.quantity)

        return inventory_pb2.ReserveResponse(
            success=True,
            message=f"Reserved items for order {request.order_id}",
        )


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    inventory_pb2_grpc.add_InventoryServiceServicer_to_server(
        InventoryServicer(db=get_db()), server
    )
    server.add_insecure_port("[::]:50051")
    server.start()
    print("Inventory gRPC server on :50051")
    server.wait_for_termination()
# services/orders/app/inventory_client.py
import grpc
import inventory_pb2
import inventory_pb2_grpc


class InventoryClient:
    """gRPC client for inventory service."""

    def __init__(self, host: str = "inventory:50051"):
        self.channel = grpc.insecure_channel(host)
        self.stub = inventory_pb2_grpc.InventoryServiceStub(self.channel)

    def check_stock(self, product_id: str) -> dict:
        try:
            resp = self.stub.CheckStock(
                inventory_pb2.StockRequest(product_id=product_id),
                timeout=5.0,
            )
            return {
                "product_id": resp.product_id,
                "available": resp.available,
                "in_stock": resp.in_stock,
            }
        except grpc.RpcError as e:
            if e.code() == grpc.StatusCode.NOT_FOUND:
                return None
            raise

    def reserve(self, order_id: str, items: list[dict]) -> bool:
        req = inventory_pb2.ReserveRequest(
            order_id=order_id,
            items=[
                inventory_pb2.ReserveItem(
                    product_id=i["product_id"],
                    quantity=i["quantity"],
                )
                for i in items
            ],
        )
        resp = self.stub.ReserveItems(req, timeout=10.0)
        return resp.success

Service 3: Notification Service (Event Consumer)

This service doesn't expose an API — it listens for events and sends notifications.

# services/notifications/app/main.py
import asyncio
from .handlers import setup_handlers
from .events import RedisEventSubscriber


async def main():
    subscriber = RedisEventSubscriber()
    setup_handlers(subscriber)

    print("📬 Notification service started, listening for events...")
    await subscriber.start(
        "order.created",
        "order.status_changed",
    )


if __name__ == "__main__":
    asyncio.run(main())
# services/notifications/app/handlers.py
import smtplib
from email.mime.text import MIMEText


def setup_handlers(subscriber):
    """Register event handlers for notifications."""

    @subscriber.on("order.created")
    async def on_order_created(data: dict):
        email = data["customer_email"]
        order_id = data["order_id"]
        total = data["total"]

        send_email(
            to=email,
            subject=f"Order {order_id[:8]} confirmed!",
            body=f"Your order for ${total:.2f} has been received. "
                 f"We'll notify you when it ships.",
        )
        print(f"📧 Order confirmation sent to {email}")

    @subscriber.on("order.status_changed")
    async def on_status_changed(data: dict):
        status = data["new_status"]
        email = data["customer_email"]
        order_id = data["order_id"]

        messages = {
            "confirmed": "Your order has been confirmed!",
            "shipped": "Your order is on its way! 🚚",
            "cancelled": "Your order has been cancelled.",
        }

        if status in messages:
            send_email(
                to=email,
                subject=f"Order {order_id[:8]} — {status}",
                body=messages[status],
            )
            print(f"📧 Status update ({status}) sent to {email}")


def send_email(to: str, subject: str, body: str):
    """Send email via SMTP (simplified)."""
    msg = MIMEText(body)
    msg["Subject"] = subject
    msg["To"] = to
    msg["From"] = "orders@example.com"

    # In production: use env vars, connection pooling, retry
    try:
        with smtplib.SMTP("mailhog:1025") as server:
            server.send_message(msg)
    except Exception as e:
        print(f"⚠️ Email failed: {e}")

Docker Compose — Run Everything

See our Docker guide for Dockerfile details.

# docker-compose.yml
services:
  # --- Infrastructure ---
  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s

  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: ecommerce
      POSTGRES_PASSWORD: postgres
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s

  mailhog:
    image: mailhog/mailhog
    ports:
      - "8025:8025"  # Web UI
      - "1025:1025"  # SMTP

  # --- Services ---
  orders:
    build: ./services/orders
    ports: ["8001:8000"]
    environment:
      DATABASE_URL: postgresql://postgres:postgres@postgres:5432/ecommerce
      REDIS_URL: redis://redis:6379
    depends_on:
      redis: { condition: service_healthy }
      postgres: { condition: service_healthy }

  inventory:
    build: ./services/inventory
    ports:
      - "8002:8000"   # REST
      - "50051:50051"  # gRPC
    environment:
      DATABASE_URL: postgresql://postgres:postgres@postgres:5432/ecommerce
    depends_on:
      postgres: { condition: service_healthy }

  notifications:
    build: ./services/notifications
    environment:
      REDIS_URL: redis://redis:6379
      SMTP_HOST: mailhog
      SMTP_PORT: "1025"
    depends_on:
      redis: { condition: service_healthy }

volumes:
  pgdata:
# Start everything
docker compose up -d

# Check health
curl http://localhost:8001/health  # orders
curl http://localhost:8002/health  # inventory

# Create an order
curl -X POST http://localhost:8001/orders \
  -H "Content-Type: application/json" \
  -d '{
    "customer_email": "alice@example.com",
    "items": [{"product_id": "WIDGET-1", "quantity": 2, "unit_price": 29.99}]
  }'

# Check email at http://localhost:8025 (MailHog UI)

Service Discovery & API Gateway

Simple approach: Docker Compose DNS

# Services reference each other by container name
# orders → inventory:50051 (gRPC)
# orders → redis:6379 (events)
# notifications → redis:6379 (events)
# No service discovery needed!

Production: API Gateway with nginx

# nginx.conf — route requests to services
upstream orders {
    server orders:8000;
}
upstream inventory {
    server inventory:8000;
}

server {
    listen 80;

    location /api/orders {
        proxy_pass http://orders;
        proxy_set_header Host $host;
        proxy_set_header X-Request-ID $request_id;
    }

    location /api/inventory {
        proxy_pass http://inventory;
        proxy_set_header Host $host;
        proxy_set_header X-Request-ID $request_id;
    }

    location /health {
        return 200 '{"gateway": "healthy"}';
        add_header Content-Type application/json;
    }
}

Resilience Patterns

Circuit breaker

import time
from enum import Enum
from functools import wraps


class CircuitState(Enum):
    CLOSED = "closed"      # Normal — requests flow
    OPEN = "open"          # Tripped — fast-fail
    HALF_OPEN = "half_open"  # Testing — allow one request


class CircuitBreaker:
    """Prevent cascading failures between services."""

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        expected_exceptions: tuple = (Exception,),
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exceptions = expected_exceptions
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = 0.0
        self.success_count = 0

    def __call__(self, fn):
        @wraps(fn)
        async def wrapper(*args, **kwargs):
            if self.state == CircuitState.OPEN:
                if time.time() - self.last_failure_time > self.recovery_timeout:
                    self.state = CircuitState.HALF_OPEN
                    self.success_count = 0
                else:
                    raise CircuitBreakerOpen(
                        f"Circuit open for {fn.__name__}. "
                        f"Retry after {self.recovery_timeout}s"
                    )

            try:
                result = await fn(*args, **kwargs)
                self._on_success()
                return result
            except self.expected_exceptions as e:
                self._on_failure()
                raise

        return wrapper

    def _on_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= 3:  # 3 successes to fully close
                self.state = CircuitState.CLOSED
                self.failure_count = 0
        else:
            self.failure_count = 0

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN


class CircuitBreakerOpen(Exception):
    pass


# --- Usage ---
inventory_breaker = CircuitBreaker(
    failure_threshold=3,
    recovery_timeout=15.0,
    expected_exceptions=(ConnectionError, TimeoutError),
)

@inventory_breaker
async def check_inventory(product_id: str):
    """Call inventory service with circuit breaker protection."""
    async with httpx.AsyncClient() as client:
        resp = await client.get(
            f"http://inventory:8000/stock/{product_id}",
            timeout=5.0,
        )
        resp.raise_for_status()
        return resp.json()

Retry with exponential backoff

import asyncio
import random


async def retry_async(
    fn,
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0,
    jitter: bool = True,
):
    """Retry with exponential backoff + jitter."""
    last_error = None

    for attempt in range(1, max_attempts + 1):
        try:
            return await fn()
        except Exception as e:
            last_error = e
            if attempt == max_attempts:
                break

            delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
            if jitter:
                delay *= (0.5 + random.random())  # 50-150% of delay

            print(f"Attempt {attempt} failed: {e}. Retrying in {delay:.1f}s...")
            await asyncio.sleep(delay)

    raise last_error

Observability

In microservices, you can't just tail -f app.log. You need distributed tracing and structured logging. See our logging & monitoring guide for details.

# Structured logging — every service uses the same format
import structlog
import uuid

logger = structlog.get_logger()


class RequestContext:
    """Propagate trace IDs across service calls."""

    def __init__(self, request_id: str = None):
        self.request_id = request_id or str(uuid.uuid4())

    def log(self, event: str, **kwargs):
        logger.info(
            event,
            request_id=self.request_id,
            service="orders",
            **kwargs,
        )


# Middleware: extract/inject trace ID
from fastapi import Request

@app.middleware("http")
async def trace_middleware(request: Request, call_next):
    request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
    request.state.ctx = RequestContext(request_id)

    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    return response

Testing Microservices

Test at three levels. See our testing guide for pytest fundamentals.

# 1. Unit tests — test service logic in isolation
import pytest
from unittest.mock import AsyncMock

from app.service import OrderService
from app.schemas import CreateOrder, OrderItem


@pytest.mark.asyncio
async def test_create_order():
    mock_db = AsyncMock()
    mock_events = AsyncMock()

    svc = OrderService(db_session=mock_db, event_publisher=mock_events)

    order = await svc.create_order(CreateOrder(
        customer_email="test@example.com",
        items=[OrderItem(product_id="W-1", quantity=2, unit_price=10.0)],
    ))

    assert order["total"] == 20.0
    assert order["status"] == "pending"
    mock_events.publish.assert_called_once()


# 2. Integration tests — test with real Redis/Postgres
@pytest.mark.asyncio
async def test_event_published(redis_client):
    """Verify events reach Redis."""
    pubsub = redis_client.pubsub()
    await pubsub.subscribe("order.created")

    # Create order...
    # Assert event received on channel


# 3. Contract tests — verify API schema
def test_order_response_schema():
    """Ensure response matches the contract other services expect."""
    from app.schemas import OrderResponse
    data = {
        "id": "abc-123",
        "customer_email": "a@b.com",
        "items": [{"product_id": "W-1", "quantity": 1, "unit_price": 5.0}],
        "status": "pending",
        "total": 5.0,
        "created_at": "2026-01-01T00:00:00Z",
    }
    # If this fails, you broke the contract
    OrderResponse.model_validate(data)

Production Checklist

🚀 Want production-ready microservice templates, API tools, and automation scripts?

Get the AI Agent Toolkit →

Related Articles

Need help designing or building a microservice architecture? I build Python APIs, distributed systems, and automation tools. Reach out on Telegram →