Python + AWS Lambda — Build Serverless Functions
AWS Lambda lets you run Python code without managing servers. You pay only for what you use, it scales automatically, and it integrates with everything in AWS. This guide takes you from a basic handler to a production-ready serverless API with DynamoDB, S3 triggers, layers, and CI/CD.
How Lambda Works
The execution model is simple:
- Something triggers your function (HTTP request, S3 upload, cron schedule, SQS message)
- AWS spins up a container with your code
- Your handler runs and returns a response
- The container stays warm for a few minutes (reused for next invocation)
- If idle too long, the container is destroyed (cold start next time)
| Limit | Value |
|---|---|
| Max execution time | 15 minutes |
| Memory | 128 MB — 10,240 MB |
| Package size (zip) | 50 MB (250 MB unzipped) |
| Package size (container) | 10 GB |
| Concurrent executions | 1,000 (default, can increase) |
| Temp storage (/tmp) | 512 MB — 10,240 MB |
Your First Lambda Function
Project setup with SAM CLI
# Install SAM CLI
pip install aws-sam-cli
# Create project
sam init --runtime python3.12 --name my-api --app-template hello-world
# Project structure
my-api/
├── template.yaml # SAM/CloudFormation template
├── hello_world/
│ ├── __init__.py
│ ├── app.py # Lambda handler
│ └── requirements.txt
├── tests/
│ └── unit/
│ └── test_handler.py
└── samconfig.toml
The handler
# hello_world/app.py
import json
def lambda_handler(event, context):
"""
Lambda entry point.
Args:
event: Trigger data (API Gateway request, S3 event, etc.)
context: Runtime info (function name, memory, time remaining)
Returns:
dict: API Gateway response format
"""
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
},
"body": json.dumps({
"message": "Hello from Lambda!",
"function": context.function_name,
"memory_mb": context.memory_limit_in_mb,
"remaining_ms": context.get_remaining_time_in_millis(),
}),
}
# Run locally
sam local start-api
# → http://127.0.0.1:3000/hello
# Invoke directly
sam local invoke HelloWorldFunction --event events/event.json
REST API with API Gateway
Build a CRUD API for a task manager — similar to our FastAPI guide, but serverless.
SAM template
# template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Globals:
Function:
Timeout: 30
Runtime: python3.12
MemorySize: 256
Environment:
Variables:
TABLE_NAME: !Ref TasksTable
LOG_LEVEL: INFO
Resources:
# --- API ---
TasksApi:
Type: AWS::Serverless::Api
Properties:
StageName: prod
Cors:
AllowOrigin: "'*'"
AllowMethods: "'GET,POST,PUT,DELETE,OPTIONS'"
AllowHeaders: "'Content-Type,Authorization'"
# --- Functions ---
CreateTask:
Type: AWS::Serverless::Function
Properties:
CodeUri: functions/
Handler: tasks.create_handler
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref TasksTable
Events:
Api:
Type: Api
Properties:
RestApiId: !Ref TasksApi
Path: /tasks
Method: POST
ListTasks:
Type: AWS::Serverless::Function
Properties:
CodeUri: functions/
Handler: tasks.list_handler
Policies:
- DynamoDBReadPolicy:
TableName: !Ref TasksTable
Events:
Api:
Type: Api
Properties:
RestApiId: !Ref TasksApi
Path: /tasks
Method: GET
GetTask:
Type: AWS::Serverless::Function
Properties:
CodeUri: functions/
Handler: tasks.get_handler
Policies:
- DynamoDBReadPolicy:
TableName: !Ref TasksTable
Events:
Api:
Type: Api
Properties:
RestApiId: !Ref TasksApi
Path: /tasks/{task_id}
Method: GET
UpdateTask:
Type: AWS::Serverless::Function
Properties:
CodeUri: functions/
Handler: tasks.update_handler
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref TasksTable
Events:
Api:
Type: Api
Properties:
RestApiId: !Ref TasksApi
Path: /tasks/{task_id}
Method: PUT
DeleteTask:
Type: AWS::Serverless::Function
Properties:
CodeUri: functions/
Handler: tasks.delete_handler
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref TasksTable
Events:
Api:
Type: Api
Properties:
RestApiId: !Ref TasksApi
Path: /tasks/{task_id}
Method: DELETE
# --- DynamoDB ---
TasksTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: tasks
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
Outputs:
ApiUrl:
Value: !Sub "https://${TasksApi}.execute-api.${AWS::Region}.amazonaws.com/prod"
Handler code
# functions/tasks.py
import json
import os
import uuid
import logging
from datetime import datetime, timezone
from decimal import Decimal
import boto3
from boto3.dynamodb.conditions import Key
logger = logging.getLogger()
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
# Initialize outside handler — reused across warm invocations
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["TABLE_NAME"])
class DecimalEncoder(json.JSONEncoder):
"""DynamoDB returns Decimal — convert to float for JSON."""
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
return super().default(obj)
def response(status_code: int, body: dict) -> dict:
return {
"statusCode": status_code,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
},
"body": json.dumps(body, cls=DecimalEncoder),
}
def create_handler(event, context):
"""POST /tasks — create a new task."""
try:
data = json.loads(event.get("body", "{}"))
except json.JSONDecodeError:
return response(400, {"error": "Invalid JSON"})
title = data.get("title", "").strip()
if not title:
return response(400, {"error": "title is required"})
task = {
"id": str(uuid.uuid4()),
"title": title,
"description": data.get("description", ""),
"completed": False,
"priority": data.get("priority", "medium"),
"created_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
}
table.put_item(Item=task)
logger.info("Task created", extra={"task_id": task["id"]})
return response(201, task)
def list_handler(event, context):
"""GET /tasks — list all tasks."""
result = table.scan(Limit=100)
tasks = sorted(result.get("Items", []), key=lambda t: t["created_at"], reverse=True)
return response(200, {"tasks": tasks, "count": len(tasks)})
def get_handler(event, context):
"""GET /tasks/{task_id} — get a single task."""
task_id = event["pathParameters"]["task_id"]
result = table.get_item(Key={"id": task_id})
task = result.get("Item")
if not task:
return response(404, {"error": "Task not found"})
return response(200, task)
def update_handler(event, context):
"""PUT /tasks/{task_id} — update a task."""
task_id = event["pathParameters"]["task_id"]
try:
data = json.loads(event.get("body", "{}"))
except json.JSONDecodeError:
return response(400, {"error": "Invalid JSON"})
# Build update expression dynamically
update_parts = []
values = {}
names = {}
allowed_fields = {"title", "description", "completed", "priority"}
for field in allowed_fields:
if field in data:
placeholder = f"#{field}"
value_key = f":{field}"
update_parts.append(f"{placeholder} = {value_key}")
values[value_key] = data[field]
names[placeholder] = field
if not update_parts:
return response(400, {"error": "No valid fields to update"})
# Always update timestamp
update_parts.append("#updated_at = :updated_at")
values[":updated_at"] = datetime.now(timezone.utc).isoformat()
names["#updated_at"] = "updated_at"
try:
result = table.update_item(
Key={"id": task_id},
UpdateExpression="SET " + ", ".join(update_parts),
ExpressionAttributeValues=values,
ExpressionAttributeNames=names,
ReturnValues="ALL_NEW",
ConditionExpression="attribute_exists(id)",
)
return response(200, result["Attributes"])
except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
return response(404, {"error": "Task not found"})
def delete_handler(event, context):
"""DELETE /tasks/{task_id} — delete a task."""
task_id = event["pathParameters"]["task_id"]
try:
table.delete_item(
Key={"id": task_id},
ConditionExpression="attribute_exists(id)",
)
return response(204, {})
except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
return response(404, {"error": "Task not found"})
S3 Event Triggers
Process files automatically when they're uploaded to S3:
# functions/process_upload.py
import boto3
import json
import csv
import io
import logging
s3 = boto3.client("s3")
logger = logging.getLogger()
def handler(event, context):
"""Triggered when a file is uploaded to S3."""
for record in event["Records"]:
bucket = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]
size = record["s3"]["object"]["size"]
logger.info(f"Processing {key} ({size} bytes) from {bucket}")
# Download file
obj = s3.get_object(Bucket=bucket, Key=key)
content = obj["Body"].read().decode("utf-8")
if key.endswith(".csv"):
process_csv(bucket, key, content)
elif key.endswith(".json"):
process_json(bucket, key, content)
else:
logger.warning(f"Unsupported file type: {key}")
def process_csv(bucket: str, key: str, content: str):
"""Parse CSV and store results."""
reader = csv.DictReader(io.StringIO(content))
rows = list(reader)
logger.info(f"Parsed {len(rows)} rows from {key}")
# Store results (e.g., write to DynamoDB or another S3 path)
result = json.dumps({"source": key, "row_count": len(rows), "sample": rows[:3]})
s3.put_object(
Bucket=bucket,
Key=f"processed/{key.replace('.csv', '.json')}",
Body=result,
ContentType="application/json",
)
def process_json(bucket: str, key: str, content: str):
data = json.loads(content)
logger.info(f"Parsed JSON with {len(data) if isinstance(data, list) else 1} items")
# template.yaml — S3 trigger
ProcessUpload:
Type: AWS::Serverless::Function
Properties:
CodeUri: functions/
Handler: process_upload.handler
Timeout: 300
MemorySize: 512
Policies:
- S3ReadPolicy:
BucketName: !Ref UploadBucket
- S3CrudPolicy:
BucketName: !Ref UploadBucket
Events:
S3Upload:
Type: S3
Properties:
Bucket: !Ref UploadBucket
Events: s3:ObjectCreated:*
Filter:
S3Key:
Rules:
- Name: prefix
Value: uploads/
UploadBucket:
Type: AWS::S3::Bucket
Scheduled Functions (Cron)
# functions/cleanup.py
import boto3
from datetime import datetime, timedelta, timezone
import logging
logger = logging.getLogger()
dynamodb = boto3.resource("dynamodb")
def handler(event, context):
"""Run daily — archive completed tasks older than 30 days."""
table = dynamodb.Table("tasks")
cutoff = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
# Scan for old completed tasks
result = table.scan(
FilterExpression="completed = :c AND created_at < :cutoff",
ExpressionAttributeValues={
":c": True,
":cutoff": cutoff,
},
)
archived = 0
with table.batch_writer() as batch:
for item in result.get("Items", []):
batch.delete_item(Key={"id": item["id"]})
archived += 1
logger.info(f"Archived {archived} tasks older than 30 days")
return {"archived": archived}
# template.yaml — cron trigger
CleanupFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: functions/
Handler: cleanup.handler
Timeout: 300
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref TasksTable
Events:
DailyCleanup:
Type: Schedule
Properties:
Schedule: cron(0 3 * * ? *) # 3 AM UTC daily
Description: Archive old completed tasks
Lambda Layers — Share Dependencies
# Create a layer for shared dependencies
mkdir -p layers/common/python
pip install requests boto3-stubs pydantic -t layers/common/python/
# template.yaml
CommonLayer:
Type: AWS::Serverless::LayerVersion
Properties:
LayerName: common-deps
ContentUri: layers/common/
CompatibleRuntimes:
- python3.12
Metadata:
BuildMethod: python3.12
# Reference in function
CreateTask:
Type: AWS::Serverless::Function
Properties:
Layers:
- !Ref CommonLayer
# ...
Docker-based Lambda
For large dependencies (pandas, numpy, ML models), use container images:
# Dockerfile
FROM public.ecr.aws/lambda/python:3.12
# Install heavy dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy function code
COPY functions/ ${LAMBDA_TASK_ROOT}/
CMD ["tasks.create_handler"]
# template.yaml — container function
MLFunction:
Type: AWS::Serverless::Function
Properties:
PackageType: Image
MemorySize: 1024
Timeout: 60
Metadata:
DockerTag: python3.12
DockerContext: .
Dockerfile: Dockerfile
Testing Lambda Functions
Test locally without deploying. See our testing guide for pytest fundamentals.
# tests/test_tasks.py
import json
import os
import pytest
import boto3
from moto import mock_dynamodb
@pytest.fixture
def dynamodb_table():
"""Create a mock DynamoDB table."""
with mock_dynamodb():
client = boto3.resource("dynamodb", region_name="us-east-1")
table = client.create_table(
TableName="tasks",
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
os.environ["TABLE_NAME"] = "tasks"
yield table
def make_event(method="GET", path="/tasks", body=None, path_params=None):
"""Build an API Gateway event."""
event = {
"httpMethod": method,
"path": path,
"headers": {"Content-Type": "application/json"},
"pathParameters": path_params or {},
"body": json.dumps(body) if body else None,
}
return event
class FakeContext:
function_name = "test"
memory_limit_in_mb = 256
def get_remaining_time_in_millis(self):
return 30000
def test_create_task(dynamodb_table):
from functions.tasks import create_handler
event = make_event("POST", "/tasks", body={
"title": "Test task",
"priority": "high",
})
result = create_handler(event, FakeContext())
assert result["statusCode"] == 201
body = json.loads(result["body"])
assert body["title"] == "Test task"
assert body["priority"] == "high"
assert body["completed"] is False
def test_create_task_no_title(dynamodb_table):
from functions.tasks import create_handler
event = make_event("POST", "/tasks", body={"description": "no title"})
result = create_handler(event, FakeContext())
assert result["statusCode"] == 400
def test_list_tasks(dynamodb_table):
from functions.tasks import create_handler, list_handler
# Create 3 tasks
for i in range(3):
create_handler(
make_event("POST", body={"title": f"Task {i}"}),
FakeContext(),
)
result = list_handler(make_event("GET", "/tasks"), FakeContext())
body = json.loads(result["body"])
assert body["count"] == 3
def test_get_task_not_found(dynamodb_table):
from functions.tasks import get_handler
event = make_event("GET", path_params={"task_id": "nonexistent"})
result = get_handler(event, FakeContext())
assert result["statusCode"] == 404
# Run tests
pip install moto pytest
pytest tests/ -v
Deploy
# Build and deploy
sam build
sam deploy --guided
# First deploy asks for config — stored in samconfig.toml
# Subsequent deploys:
sam deploy
# View logs
sam logs -n CreateTask --tail
# Test the deployed API
API_URL=$(sam list stack-outputs --output json | python3 -c "
import sys, json
outputs = json.load(sys.stdin)
print(next(o['OutputValue'] for o in outputs if 'Api' in o['OutputKey']))
")
curl -X POST "$API_URL/tasks" \
-H "Content-Type: application/json" \
-d '{"title": "Deployed task", "priority": "high"}'
Cold Start Optimization
- Keep functions small — fewer imports = faster cold start
- Use Provisioned Concurrency — pre-warm N instances (costs money, eliminates cold starts)
- Lazy imports — import heavy libraries inside the handler, not at module level
- ARM64 (Graviton2) — 20% cheaper and often faster than x86
- SnapStart — Java-only for now, but worth knowing about
# Lazy import pattern
def handler(event, context):
# Only import pandas when this function actually runs
import pandas as pd
# ...
# Provisioned concurrency in template.yaml
CreateTask:
Type: AWS::Serverless::Function
Properties:
AutoPublishAlias: live
ProvisionedConcurrencyConfig:
ProvisionedConcurrentExecutions: 5
Cost Optimization
| Strategy | Impact |
|---|---|
| Use ARM64 (Graviton2) | 20% cheaper per ms |
| Right-size memory | 128 MB vs 1024 MB = 8x cheaper (but slower) |
| Batch processing | 1 invocation for 100 records vs 100 invocations |
| Reserved concurrency | Prevents runaway costs from traffic spikes |
| DynamoDB on-demand | Pay per request — great for variable traffic |
| API Gateway caching | Cache responses for GET endpoints |
🚀 Want serverless templates, API tools, and 50+ Python automation scripts?
Related Articles
- Build a REST API with FastAPI — when you want a server-based API instead
- Python Microservices — containerized microservices vs serverless
- Dockerize Python Apps — Docker-based Lambda functions
- Python Testing Guide — mocking AWS services with moto
- Python Database Operations — DynamoDB vs SQL databases
- Python Environment & Config Management — managing Lambda environment variables
Need serverless architecture for your project? I build Python APIs, automation tools, and cloud-native systems. Reach out on Telegram →