Automate API Integrations with Python
Every modern application talks to APIs. CRMs push data to analytics platforms, payment gateways sync with accounting tools, and monitoring services fire webhooks when things break. Doing this manually doesn't scale.
This guide covers practical patterns for automating API integrations in Python — from simple REST calls to production-grade pipelines with retries, rate limiting, and error handling. All code is copy-paste ready.
1. The Foundation: Making API Calls That Don't Break
Most tutorials show you requests.get(url) and call it a day. In production, you need retries, timeouts, and proper error handling.
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_api_client(base_url, api_key=None, max_retries=3):
"""Create a requests session with retry logic and auth."""
session = requests.Session()
# Retry on 429 (rate limit), 500, 502, 503, 504
retry_strategy = Retry(
total=max_retries,
backoff_factor=1, # 1s, 2s, 4s between retries
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "PUT", "DELETE"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
# Default headers
session.headers.update({
"Content-Type": "application/json",
"User-Agent": "PythonAPIClient/1.0",
})
if api_key:
session.headers["Authorization"] = f"Bearer {api_key}"
# Always set timeouts
session.request = lambda method, url, **kwargs: (
super(type(session), session).request(
method, url, timeout=kwargs.pop("timeout", 30), **kwargs
)
)
return session
# Usage
client = create_api_client(
"https://api.example.com",
api_key="sk-your-key-here"
)
response = client.get("https://api.example.com/v1/users")
data = response.json()
timeout. Without it, a hung API will block your script forever. Use timeout=(5, 30) — 5 seconds for connection, 30 for read.
2. Handle Rate Limits Like a Pro
Every API has rate limits. Ignoring them gets your key banned. Here's a rate limiter that respects API limits automatically:
import time
from collections import deque
class RateLimiter:
"""Token bucket rate limiter."""
def __init__(self, calls_per_second=10):
self.calls_per_second = calls_per_second
self.min_interval = 1.0 / calls_per_second
self.timestamps = deque()
def wait(self):
"""Block until we can make another call."""
now = time.monotonic()
# Remove timestamps older than 1 second
while self.timestamps and now - self.timestamps[0] > 1.0:
self.timestamps.popleft()
if len(self.timestamps) >= self.calls_per_second:
sleep_time = 1.0 - (now - self.timestamps[0])
if sleep_time > 0:
time.sleep(sleep_time)
self.timestamps.append(time.monotonic())
def api_call_with_rate_limit(client, url, limiter, **kwargs):
"""Make an API call respecting rate limits."""
limiter.wait()
response = client.get(url, **kwargs)
# Handle 429 with Retry-After header
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
print(f"Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
return api_call_with_rate_limit(client, url, limiter, **kwargs)
response.raise_for_status()
return response.json()
# Usage
limiter = RateLimiter(calls_per_second=5)
for page in range(1, 100):
data = api_call_with_rate_limit(
client,
f"https://api.example.com/v1/items?page={page}",
limiter
)
3. Paginate Through Everything
APIs return data in pages. You need all of it. Here's a universal paginator that handles the three most common pagination styles:
def paginate_offset(client, url, limiter, page_size=100):
"""Offset-based pagination (page=1&per_page=100)."""
page = 1
while True:
data = api_call_with_rate_limit(
client, url, limiter,
params={"page": page, "per_page": page_size}
)
items = data if isinstance(data, list) else data.get("results", [])
if not items:
break
yield from items
page += 1
def paginate_cursor(client, url, limiter, cursor_field="next_cursor"):
"""Cursor-based pagination (Notion, Slack, Stripe style)."""
cursor = None
while True:
params = {}
if cursor:
params["cursor"] = cursor
data = api_call_with_rate_limit(client, url, limiter, params=params)
yield from data.get("results", data.get("data", []))
cursor = data.get(cursor_field)
if not cursor:
break
def paginate_link(client, limiter, url):
"""Link header pagination (GitHub style)."""
while url:
limiter.wait()
response = client.get(url)
response.raise_for_status()
yield from response.json()
# Parse Link header for next URL
link_header = response.headers.get("Link", "")
url = None
for part in link_header.split(","):
if 'rel="next"' in part:
url = part.split(";")[0].strip(" <>")
# Usage — get ALL items from an API
all_users = list(paginate_offset(
client, "https://api.example.com/v1/users", limiter
))
print(f"Fetched {len(all_users)} users")
4. Sync Data Between Two APIs
The most common integration task: read from API A, transform, write to API B. Here's a production-ready pattern:
import json
import hashlib
from datetime import datetime, timezone
class APISync:
"""Sync records between two APIs with change detection."""
def __init__(self, source_client, target_client, state_file="sync_state.json"):
self.source = source_client
self.target = target_client
self.state_file = state_file
self.state = self._load_state()
def _load_state(self):
try:
with open(self.state_file) as f:
return json.load(f)
except FileNotFoundError:
return {"last_sync": None, "synced_hashes": {}}
def _save_state(self):
with open(self.state_file, "w") as f:
json.dump(self.state, f, indent=2, default=str)
def _hash_record(self, record):
"""Create a hash to detect changes."""
return hashlib.md5(
json.dumps(record, sort_keys=True).encode()
).hexdigest()
def sync(self, source_url, target_url, transform_fn, id_field="id"):
"""
Full sync with change detection.
Args:
source_url: API endpoint to read from
target_url: API endpoint to write to
transform_fn: function to transform source → target format
id_field: unique identifier field name
"""
stats = {"created": 0, "updated": 0, "skipped": 0, "errors": 0}
# Fetch all source records
source_data = self.source.get(source_url).json()
records = source_data if isinstance(source_data, list) else source_data.get("results", [])
for record in records:
record_id = str(record[id_field])
record_hash = self._hash_record(record)
# Skip unchanged records
if self.state["synced_hashes"].get(record_id) == record_hash:
stats["skipped"] += 1
continue
try:
# Transform source format → target format
transformed = transform_fn(record)
# Upsert to target API
if record_id in self.state["synced_hashes"]:
self.target.put(f"{target_url}/{record_id}", json=transformed)
stats["updated"] += 1
else:
self.target.post(target_url, json=transformed)
stats["created"] += 1
self.state["synced_hashes"][record_id] = record_hash
except Exception as e:
print(f"Error syncing {record_id}: {e}")
stats["errors"] += 1
self.state["last_sync"] = datetime.now(timezone.utc).isoformat()
self._save_state()
return stats
# Usage: sync HubSpot contacts → Notion database
def hubspot_to_notion(contact):
"""Transform HubSpot contact to Notion page format."""
props = contact["properties"]
return {
"Name": props.get("firstname", "") + " " + props.get("lastname", ""),
"Email": props.get("email", ""),
"Company": props.get("company", ""),
"Deal Stage": props.get("dealstage", "new"),
}
syncer = APISync(hubspot_client, notion_client)
result = syncer.sync(
source_url="https://api.hubapi.com/crm/v3/objects/contacts",
target_url="https://api.notion.com/v1/pages",
transform_fn=hubspot_to_notion,
)
print(f"Sync complete: {result}")
5. Process Webhooks Reliably
Webhooks are the reverse — APIs calling you. Here's a minimal but production-ready webhook receiver:
from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib
app = FastAPI()
WEBHOOK_SECRET = "your-webhook-secret"
def verify_signature(payload: bytes, signature: str, secret: str) -> bool:
"""Verify webhook signature (works with Stripe, GitHub, etc.)."""
expected = hmac.new(
secret.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
@app.post("/webhooks/payments")
async def handle_payment_webhook(request: Request):
"""Process incoming payment webhooks."""
body = await request.body()
signature = request.headers.get("X-Signature-256", "")
# Always verify signatures
if not verify_signature(body, signature, WEBHOOK_SECRET):
raise HTTPException(status_code=401, detail="Invalid signature")
event = await request.json()
# Route by event type
handlers = {
"payment.completed": handle_payment_completed,
"payment.failed": handle_payment_failed,
"subscription.cancelled": handle_subscription_cancelled,
}
handler = handlers.get(event["type"])
if handler:
await handler(event["data"])
# Always return 200 quickly — process async if needed
return {"status": "ok"}
async def handle_payment_completed(data):
"""Process a completed payment."""
print(f"Payment received: ${data['amount']} from {data['customer_email']}")
# Update your database, send confirmation, etc.
6. Async API Calls for Speed
When you need to call 100+ endpoints, synchronous requests are too slow. Use aiohttp for parallel calls:
import aiohttp
import asyncio
async def fetch_all(urls, api_key, max_concurrent=10):
"""Fetch multiple URLs concurrently with rate limiting."""
semaphore = asyncio.Semaphore(max_concurrent)
results = []
async def fetch_one(session, url):
async with semaphore:
async with session.get(url) as response:
if response.status == 429:
retry_after = int(response.headers.get("Retry-After", 5))
await asyncio.sleep(retry_after)
return await fetch_one(session, url)
response.raise_for_status()
return await response.json()
headers = {"Authorization": f"Bearer {api_key}"}
async with aiohttp.ClientSession(headers=headers) as session:
tasks = [fetch_one(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
# Usage: fetch 500 user profiles in parallel
urls = [f"https://api.example.com/users/{i}" for i in range(500)]
data = asyncio.run(fetch_all(urls, api_key="sk-xxx", max_concurrent=20))
print(f"Fetched {len(data)} profiles")
max_concurrent to match the API's rate limit. 20 concurrent calls with 0.1s average response time = ~200 requests/second.
7. Error Handling That Actually Works
Production integrations fail in creative ways. Here's a pattern that handles the common failure modes:
import logging
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
class APIError(Exception):
def __init__(self, status_code, message, response_body=None):
self.status_code = status_code
self.message = message
self.response_body = response_body
super().__init__(f"HTTP {status_code}: {message}")
def safe_api_call(client, method, url, max_retries=3, **kwargs):
"""
Make an API call with comprehensive error handling.
Returns (data, None) on success, (None, error) on failure.
"""
last_error = None
for attempt in range(max_retries):
try:
response = getattr(client, method)(url, timeout=30, **kwargs)
if response.status_code == 204:
return None, None # Success, no content
if response.ok:
return response.json(), None
# Client errors (4xx) — don't retry (except 429)
if 400 <= response.status_code < 500 and response.status_code != 429:
error = APIError(
response.status_code,
response.text[:200],
response.text
)
logger.error(f"Client error: {error}")
return None, error
# Rate limit — wait and retry
if response.status_code == 429:
wait = int(response.headers.get("Retry-After", 2 ** attempt))
logger.warning(f"Rate limited. Waiting {wait}s (attempt {attempt + 1})")
time.sleep(wait)
continue
# Server errors (5xx) — retry with backoff
last_error = APIError(response.status_code, response.text[:200])
logger.warning(f"Server error: {last_error} (attempt {attempt + 1})")
time.sleep(2 ** attempt)
except requests.exceptions.Timeout:
last_error = APIError(0, "Request timed out")
logger.warning(f"Timeout (attempt {attempt + 1})")
time.sleep(2 ** attempt)
except requests.exceptions.ConnectionError:
last_error = APIError(0, "Connection failed")
logger.warning(f"Connection error (attempt {attempt + 1})")
time.sleep(2 ** attempt)
return None, last_error
# Usage
data, error = safe_api_call(client, "get", "https://api.example.com/v1/data")
if error:
print(f"Failed: {error}")
else:
print(f"Got {len(data)} records")
Putting It All Together
Here's a real-world example combining everything — syncing Stripe payments to a Notion database with full error handling:
# sync_stripe_to_notion.py
import os
from datetime import datetime, timezone
# Setup clients
stripe_client = create_api_client(
"https://api.stripe.com",
api_key=os.environ["STRIPE_SECRET_KEY"]
)
notion_client = create_api_client(
"https://api.notion.com",
api_key=os.environ["NOTION_API_KEY"]
)
notion_client.headers["Notion-Version"] = "2022-06-28"
limiter = RateLimiter(calls_per_second=5)
# Fetch all payments from Stripe
payments = list(paginate_cursor(
stripe_client,
"https://api.stripe.com/v1/charges",
limiter,
cursor_field="next_page"
))
print(f"Found {len(payments)} payments")
# Transform and sync to Notion
def stripe_to_notion(charge):
return {
"parent": {"database_id": "your-db-id"},
"properties": {
"Amount": {"number": charge["amount"] / 100},
"Currency": {"select": {"name": charge["currency"].upper()}},
"Status": {"select": {"name": charge["status"]}},
"Customer": {"email": charge.get("billing_details", {}).get("email", "")},
"Date": {"date": {"start": datetime.fromtimestamp(
charge["created"], tz=timezone.utc
).isoformat()}},
}
}
syncer = APISync(stripe_client, notion_client)
result = syncer.sync(
source_url="https://api.stripe.com/v1/charges",
target_url="https://api.notion.com/v1/pages",
transform_fn=stripe_to_notion,
)
print(f"Sync done: {result}")
# → Sync done: {'created': 42, 'updated': 3, 'skipped': 155, 'errors': 0}
Want 50+ production-ready Python scripts like these?
The AI Agent Toolkit includes API integrators, web scrapers, data pipelines, and automation workflows — all tested and documented.
Key Takeaways
- Always use retry logic — APIs fail. Your code shouldn't.
- Respect rate limits — both your own (token bucket) and theirs (Retry-After header).
- Paginate everything — never assume one page is enough.
- Change detection — hash records to avoid unnecessary writes.
- Async for speed — aiohttp + semaphore for parallel calls with rate control.
- Log everything — when integrations break at 3 AM, logs are all you have.
Related Articles
- Build a REST API with FastAPI — Complete Python Guide — build your own API endpoints with FastAPI, SQLAlchemy, and JWT auth
- Python Database Operations — SQLite, PostgreSQL & SQLAlchemy Guide
Need a custom API integration built for your workflow? I build Python automation tools, scrapers, and data pipelines. Reach out on Telegram →