Python + AWS Lambda — Build Serverless Functions

March 2026 · 22 min read · Python, AWS, Serverless, Cloud

AWS Lambda lets you run Python code without managing servers. You pay only for what you use, it scales automatically, and it integrates with everything in AWS. This guide takes you from a basic handler to a production-ready serverless API with DynamoDB, S3 triggers, layers, and CI/CD.

How Lambda Works

The execution model is simple:

  1. Something triggers your function (HTTP request, S3 upload, cron schedule, SQS message)
  2. AWS spins up a container with your code
  3. Your handler runs and returns a response
  4. The container stays warm for a few minutes (reused for next invocation)
  5. If idle too long, the container is destroyed (cold start next time)
LimitValue
Max execution time15 minutes
Memory128 MB — 10,240 MB
Package size (zip)50 MB (250 MB unzipped)
Package size (container)10 GB
Concurrent executions1,000 (default, can increase)
Temp storage (/tmp)512 MB — 10,240 MB

Your First Lambda Function

Project setup with SAM CLI

# Install SAM CLI
pip install aws-sam-cli

# Create project
sam init --runtime python3.12 --name my-api --app-template hello-world

# Project structure
my-api/
├── template.yaml          # SAM/CloudFormation template
├── hello_world/
│   ├── __init__.py
│   ├── app.py             # Lambda handler
│   └── requirements.txt
├── tests/
│   └── unit/
│       └── test_handler.py
└── samconfig.toml

The handler

# hello_world/app.py
import json


def lambda_handler(event, context):
    """
    Lambda entry point.

    Args:
        event: Trigger data (API Gateway request, S3 event, etc.)
        context: Runtime info (function name, memory, time remaining)

    Returns:
        dict: API Gateway response format
    """
    return {
        "statusCode": 200,
        "headers": {
            "Content-Type": "application/json",
            "Access-Control-Allow-Origin": "*",
        },
        "body": json.dumps({
            "message": "Hello from Lambda!",
            "function": context.function_name,
            "memory_mb": context.memory_limit_in_mb,
            "remaining_ms": context.get_remaining_time_in_millis(),
        }),
    }
# Run locally
sam local start-api
# → http://127.0.0.1:3000/hello

# Invoke directly
sam local invoke HelloWorldFunction --event events/event.json

REST API with API Gateway

Build a CRUD API for a task manager — similar to our FastAPI guide, but serverless.

SAM template

# template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Globals:
  Function:
    Timeout: 30
    Runtime: python3.12
    MemorySize: 256
    Environment:
      Variables:
        TABLE_NAME: !Ref TasksTable
        LOG_LEVEL: INFO

Resources:
  # --- API ---
  TasksApi:
    Type: AWS::Serverless::Api
    Properties:
      StageName: prod
      Cors:
        AllowOrigin: "'*'"
        AllowMethods: "'GET,POST,PUT,DELETE,OPTIONS'"
        AllowHeaders: "'Content-Type,Authorization'"

  # --- Functions ---
  CreateTask:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: functions/
      Handler: tasks.create_handler
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref TasksTable
      Events:
        Api:
          Type: Api
          Properties:
            RestApiId: !Ref TasksApi
            Path: /tasks
            Method: POST

  ListTasks:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: functions/
      Handler: tasks.list_handler
      Policies:
        - DynamoDBReadPolicy:
            TableName: !Ref TasksTable
      Events:
        Api:
          Type: Api
          Properties:
            RestApiId: !Ref TasksApi
            Path: /tasks
            Method: GET

  GetTask:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: functions/
      Handler: tasks.get_handler
      Policies:
        - DynamoDBReadPolicy:
            TableName: !Ref TasksTable
      Events:
        Api:
          Type: Api
          Properties:
            RestApiId: !Ref TasksApi
            Path: /tasks/{task_id}
            Method: GET

  UpdateTask:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: functions/
      Handler: tasks.update_handler
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref TasksTable
      Events:
        Api:
          Type: Api
          Properties:
            RestApiId: !Ref TasksApi
            Path: /tasks/{task_id}
            Method: PUT

  DeleteTask:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: functions/
      Handler: tasks.delete_handler
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref TasksTable
      Events:
        Api:
          Type: Api
          Properties:
            RestApiId: !Ref TasksApi
            Path: /tasks/{task_id}
            Method: DELETE

  # --- DynamoDB ---
  TasksTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: tasks
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: id
          AttributeType: S
      KeySchema:
        - AttributeName: id
          KeyType: HASH

Outputs:
  ApiUrl:
    Value: !Sub "https://${TasksApi}.execute-api.${AWS::Region}.amazonaws.com/prod"

Handler code

# functions/tasks.py
import json
import os
import uuid
import logging
from datetime import datetime, timezone
from decimal import Decimal

import boto3
from boto3.dynamodb.conditions import Key

logger = logging.getLogger()
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))

# Initialize outside handler — reused across warm invocations
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["TABLE_NAME"])


class DecimalEncoder(json.JSONEncoder):
    """DynamoDB returns Decimal — convert to float for JSON."""
    def default(self, obj):
        if isinstance(obj, Decimal):
            return float(obj)
        return super().default(obj)


def response(status_code: int, body: dict) -> dict:
    return {
        "statusCode": status_code,
        "headers": {
            "Content-Type": "application/json",
            "Access-Control-Allow-Origin": "*",
        },
        "body": json.dumps(body, cls=DecimalEncoder),
    }


def create_handler(event, context):
    """POST /tasks — create a new task."""
    try:
        data = json.loads(event.get("body", "{}"))
    except json.JSONDecodeError:
        return response(400, {"error": "Invalid JSON"})

    title = data.get("title", "").strip()
    if not title:
        return response(400, {"error": "title is required"})

    task = {
        "id": str(uuid.uuid4()),
        "title": title,
        "description": data.get("description", ""),
        "completed": False,
        "priority": data.get("priority", "medium"),
        "created_at": datetime.now(timezone.utc).isoformat(),
        "updated_at": datetime.now(timezone.utc).isoformat(),
    }

    table.put_item(Item=task)
    logger.info("Task created", extra={"task_id": task["id"]})
    return response(201, task)


def list_handler(event, context):
    """GET /tasks — list all tasks."""
    result = table.scan(Limit=100)
    tasks = sorted(result.get("Items", []), key=lambda t: t["created_at"], reverse=True)
    return response(200, {"tasks": tasks, "count": len(tasks)})


def get_handler(event, context):
    """GET /tasks/{task_id} — get a single task."""
    task_id = event["pathParameters"]["task_id"]
    result = table.get_item(Key={"id": task_id})
    task = result.get("Item")

    if not task:
        return response(404, {"error": "Task not found"})
    return response(200, task)


def update_handler(event, context):
    """PUT /tasks/{task_id} — update a task."""
    task_id = event["pathParameters"]["task_id"]

    try:
        data = json.loads(event.get("body", "{}"))
    except json.JSONDecodeError:
        return response(400, {"error": "Invalid JSON"})

    # Build update expression dynamically
    update_parts = []
    values = {}
    names = {}

    allowed_fields = {"title", "description", "completed", "priority"}
    for field in allowed_fields:
        if field in data:
            placeholder = f"#{field}"
            value_key = f":{field}"
            update_parts.append(f"{placeholder} = {value_key}")
            values[value_key] = data[field]
            names[placeholder] = field

    if not update_parts:
        return response(400, {"error": "No valid fields to update"})

    # Always update timestamp
    update_parts.append("#updated_at = :updated_at")
    values[":updated_at"] = datetime.now(timezone.utc).isoformat()
    names["#updated_at"] = "updated_at"

    try:
        result = table.update_item(
            Key={"id": task_id},
            UpdateExpression="SET " + ", ".join(update_parts),
            ExpressionAttributeValues=values,
            ExpressionAttributeNames=names,
            ReturnValues="ALL_NEW",
            ConditionExpression="attribute_exists(id)",
        )
        return response(200, result["Attributes"])
    except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
        return response(404, {"error": "Task not found"})


def delete_handler(event, context):
    """DELETE /tasks/{task_id} — delete a task."""
    task_id = event["pathParameters"]["task_id"]

    try:
        table.delete_item(
            Key={"id": task_id},
            ConditionExpression="attribute_exists(id)",
        )
        return response(204, {})
    except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
        return response(404, {"error": "Task not found"})
💡 Performance tip: Initialize AWS clients (boto3) outside the handler function. They persist across warm invocations, saving ~100ms per request.

S3 Event Triggers

Process files automatically when they're uploaded to S3:

# functions/process_upload.py
import boto3
import json
import csv
import io
import logging

s3 = boto3.client("s3")
logger = logging.getLogger()


def handler(event, context):
    """Triggered when a file is uploaded to S3."""
    for record in event["Records"]:
        bucket = record["s3"]["bucket"]["name"]
        key = record["s3"]["object"]["key"]
        size = record["s3"]["object"]["size"]

        logger.info(f"Processing {key} ({size} bytes) from {bucket}")

        # Download file
        obj = s3.get_object(Bucket=bucket, Key=key)
        content = obj["Body"].read().decode("utf-8")

        if key.endswith(".csv"):
            process_csv(bucket, key, content)
        elif key.endswith(".json"):
            process_json(bucket, key, content)
        else:
            logger.warning(f"Unsupported file type: {key}")


def process_csv(bucket: str, key: str, content: str):
    """Parse CSV and store results."""
    reader = csv.DictReader(io.StringIO(content))
    rows = list(reader)
    logger.info(f"Parsed {len(rows)} rows from {key}")

    # Store results (e.g., write to DynamoDB or another S3 path)
    result = json.dumps({"source": key, "row_count": len(rows), "sample": rows[:3]})
    s3.put_object(
        Bucket=bucket,
        Key=f"processed/{key.replace('.csv', '.json')}",
        Body=result,
        ContentType="application/json",
    )


def process_json(bucket: str, key: str, content: str):
    data = json.loads(content)
    logger.info(f"Parsed JSON with {len(data) if isinstance(data, list) else 1} items")
# template.yaml — S3 trigger
  ProcessUpload:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: functions/
      Handler: process_upload.handler
      Timeout: 300
      MemorySize: 512
      Policies:
        - S3ReadPolicy:
            BucketName: !Ref UploadBucket
        - S3CrudPolicy:
            BucketName: !Ref UploadBucket
      Events:
        S3Upload:
          Type: S3
          Properties:
            Bucket: !Ref UploadBucket
            Events: s3:ObjectCreated:*
            Filter:
              S3Key:
                Rules:
                  - Name: prefix
                    Value: uploads/

  UploadBucket:
    Type: AWS::S3::Bucket

Scheduled Functions (Cron)

# functions/cleanup.py
import boto3
from datetime import datetime, timedelta, timezone
import logging

logger = logging.getLogger()
dynamodb = boto3.resource("dynamodb")


def handler(event, context):
    """Run daily — archive completed tasks older than 30 days."""
    table = dynamodb.Table("tasks")
    cutoff = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()

    # Scan for old completed tasks
    result = table.scan(
        FilterExpression="completed = :c AND created_at < :cutoff",
        ExpressionAttributeValues={
            ":c": True,
            ":cutoff": cutoff,
        },
    )

    archived = 0
    with table.batch_writer() as batch:
        for item in result.get("Items", []):
            batch.delete_item(Key={"id": item["id"]})
            archived += 1

    logger.info(f"Archived {archived} tasks older than 30 days")
    return {"archived": archived}
# template.yaml — cron trigger
  CleanupFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: functions/
      Handler: cleanup.handler
      Timeout: 300
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref TasksTable
      Events:
        DailyCleanup:
          Type: Schedule
          Properties:
            Schedule: cron(0 3 * * ? *)  # 3 AM UTC daily
            Description: Archive old completed tasks

Lambda Layers — Share Dependencies

# Create a layer for shared dependencies
mkdir -p layers/common/python
pip install requests boto3-stubs pydantic -t layers/common/python/

# template.yaml
  CommonLayer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: common-deps
      ContentUri: layers/common/
      CompatibleRuntimes:
        - python3.12
    Metadata:
      BuildMethod: python3.12

  # Reference in function
  CreateTask:
    Type: AWS::Serverless::Function
    Properties:
      Layers:
        - !Ref CommonLayer
      # ...

Docker-based Lambda

For large dependencies (pandas, numpy, ML models), use container images:

# Dockerfile
FROM public.ecr.aws/lambda/python:3.12

# Install heavy dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy function code
COPY functions/ ${LAMBDA_TASK_ROOT}/

CMD ["tasks.create_handler"]
# template.yaml — container function
  MLFunction:
    Type: AWS::Serverless::Function
    Properties:
      PackageType: Image
      MemorySize: 1024
      Timeout: 60
    Metadata:
      DockerTag: python3.12
      DockerContext: .
      Dockerfile: Dockerfile

Testing Lambda Functions

Test locally without deploying. See our testing guide for pytest fundamentals.

# tests/test_tasks.py
import json
import os
import pytest
import boto3
from moto import mock_dynamodb


@pytest.fixture
def dynamodb_table():
    """Create a mock DynamoDB table."""
    with mock_dynamodb():
        client = boto3.resource("dynamodb", region_name="us-east-1")
        table = client.create_table(
            TableName="tasks",
            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
            BillingMode="PAY_PER_REQUEST",
        )
        os.environ["TABLE_NAME"] = "tasks"
        yield table


def make_event(method="GET", path="/tasks", body=None, path_params=None):
    """Build an API Gateway event."""
    event = {
        "httpMethod": method,
        "path": path,
        "headers": {"Content-Type": "application/json"},
        "pathParameters": path_params or {},
        "body": json.dumps(body) if body else None,
    }
    return event


class FakeContext:
    function_name = "test"
    memory_limit_in_mb = 256
    def get_remaining_time_in_millis(self):
        return 30000


def test_create_task(dynamodb_table):
    from functions.tasks import create_handler

    event = make_event("POST", "/tasks", body={
        "title": "Test task",
        "priority": "high",
    })

    result = create_handler(event, FakeContext())
    assert result["statusCode"] == 201

    body = json.loads(result["body"])
    assert body["title"] == "Test task"
    assert body["priority"] == "high"
    assert body["completed"] is False


def test_create_task_no_title(dynamodb_table):
    from functions.tasks import create_handler

    event = make_event("POST", "/tasks", body={"description": "no title"})
    result = create_handler(event, FakeContext())
    assert result["statusCode"] == 400


def test_list_tasks(dynamodb_table):
    from functions.tasks import create_handler, list_handler

    # Create 3 tasks
    for i in range(3):
        create_handler(
            make_event("POST", body={"title": f"Task {i}"}),
            FakeContext(),
        )

    result = list_handler(make_event("GET", "/tasks"), FakeContext())
    body = json.loads(result["body"])
    assert body["count"] == 3


def test_get_task_not_found(dynamodb_table):
    from functions.tasks import get_handler

    event = make_event("GET", path_params={"task_id": "nonexistent"})
    result = get_handler(event, FakeContext())
    assert result["statusCode"] == 404
# Run tests
pip install moto pytest
pytest tests/ -v

Deploy

# Build and deploy
sam build
sam deploy --guided

# First deploy asks for config — stored in samconfig.toml
# Subsequent deploys:
sam deploy

# View logs
sam logs -n CreateTask --tail

# Test the deployed API
API_URL=$(sam list stack-outputs --output json | python3 -c "
import sys, json
outputs = json.load(sys.stdin)
print(next(o['OutputValue'] for o in outputs if 'Api' in o['OutputKey']))
")

curl -X POST "$API_URL/tasks" \
  -H "Content-Type: application/json" \
  -d '{"title": "Deployed task", "priority": "high"}'

Cold Start Optimization

# Lazy import pattern
def handler(event, context):
    # Only import pandas when this function actually runs
    import pandas as pd
    # ...

# Provisioned concurrency in template.yaml
  CreateTask:
    Type: AWS::Serverless::Function
    Properties:
      AutoPublishAlias: live
      ProvisionedConcurrencyConfig:
        ProvisionedConcurrentExecutions: 5

Cost Optimization

StrategyImpact
Use ARM64 (Graviton2)20% cheaper per ms
Right-size memory128 MB vs 1024 MB = 8x cheaper (but slower)
Batch processing1 invocation for 100 records vs 100 invocations
Reserved concurrencyPrevents runaway costs from traffic spikes
DynamoDB on-demandPay per request — great for variable traffic
API Gateway cachingCache responses for GET endpoints
💰 Free tier reminder: Lambda gives you 1M free requests and 400,000 GB-seconds per month. Most small projects run for free.

🚀 Want serverless templates, API tools, and 50+ Python automation scripts?

Get the AI Agent Toolkit →

Related Articles

Need serverless architecture for your project? I build Python APIs, automation tools, and cloud-native systems. Reach out on Telegram →