• How to Build a Discord Bot in Python
  • How to Build a Discord Bot in Python
  • Automate API Integrations with Python

    March 25, 2026 · 12 min read

    Every modern application talks to APIs. CRMs push data to analytics platforms, payment gateways sync with accounting tools, and monitoring services fire webhooks when things break. Doing this manually doesn't scale.

    This guide covers practical patterns for automating API integrations in Python — from simple REST calls to production-grade pipelines with retries, rate limiting, and error handling. All code is copy-paste ready.

    1. The Foundation: Making API Calls That Don't Break

    Most tutorials show you requests.get(url) and call it a day. In production, you need retries, timeouts, and proper error handling.

    import requests
    from requests.adapters import HTTPAdapter
    from urllib3.util.retry import Retry
    
    def create_api_client(base_url, api_key=None, max_retries=3):
        """Create a requests session with retry logic and auth."""
        session = requests.Session()
    
        # Retry on 429 (rate limit), 500, 502, 503, 504
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,  # 1s, 2s, 4s between retries
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET", "POST", "PUT", "DELETE"],
        )
    
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("https://", adapter)
        session.mount("http://", adapter)
    
        # Default headers
        session.headers.update({
            "Content-Type": "application/json",
            "User-Agent": "PythonAPIClient/1.0",
        })
    
        if api_key:
            session.headers["Authorization"] = f"Bearer {api_key}"
    
        # Always set timeouts
        session.request = lambda method, url, **kwargs: (
            super(type(session), session).request(
                method, url, timeout=kwargs.pop("timeout", 30), **kwargs
            )
        )
    
        return session
    
    # Usage
    client = create_api_client(
        "https://api.example.com",
        api_key="sk-your-key-here"
    )
    
    response = client.get("https://api.example.com/v1/users")
    data = response.json()
    Pro tip: Always set timeout. Without it, a hung API will block your script forever. Use timeout=(5, 30) — 5 seconds for connection, 30 for read.

    2. Handle Rate Limits Like a Pro

    Every API has rate limits. Ignoring them gets your key banned. Here's a rate limiter that respects API limits automatically:

    import time
    from collections import deque
    
    class RateLimiter:
        """Token bucket rate limiter."""
    
        def __init__(self, calls_per_second=10):
            self.calls_per_second = calls_per_second
            self.min_interval = 1.0 / calls_per_second
            self.timestamps = deque()
    
        def wait(self):
            """Block until we can make another call."""
            now = time.monotonic()
    
            # Remove timestamps older than 1 second
            while self.timestamps and now - self.timestamps[0] > 1.0:
                self.timestamps.popleft()
    
            if len(self.timestamps) >= self.calls_per_second:
                sleep_time = 1.0 - (now - self.timestamps[0])
                if sleep_time > 0:
                    time.sleep(sleep_time)
    
            self.timestamps.append(time.monotonic())
    
    def api_call_with_rate_limit(client, url, limiter, **kwargs):
        """Make an API call respecting rate limits."""
        limiter.wait()
        response = client.get(url, **kwargs)
    
        # Handle 429 with Retry-After header
        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 60))
            print(f"Rate limited. Waiting {retry_after}s...")
            time.sleep(retry_after)
            return api_call_with_rate_limit(client, url, limiter, **kwargs)
    
        response.raise_for_status()
        return response.json()
    
    # Usage
    limiter = RateLimiter(calls_per_second=5)
    for page in range(1, 100):
        data = api_call_with_rate_limit(
            client,
            f"https://api.example.com/v1/items?page={page}",
            limiter
        )

    3. Paginate Through Everything

    APIs return data in pages. You need all of it. Here's a universal paginator that handles the three most common pagination styles:

    def paginate_offset(client, url, limiter, page_size=100):
        """Offset-based pagination (page=1&per_page=100)."""
        page = 1
        while True:
            data = api_call_with_rate_limit(
                client, url,  limiter,
                params={"page": page, "per_page": page_size}
            )
    
            items = data if isinstance(data, list) else data.get("results", [])
            if not items:
                break
    
            yield from items
            page += 1
    
    def paginate_cursor(client, url, limiter, cursor_field="next_cursor"):
        """Cursor-based pagination (Notion, Slack, Stripe style)."""
        cursor = None
        while True:
            params = {}
            if cursor:
                params["cursor"] = cursor
    
            data = api_call_with_rate_limit(client, url, limiter, params=params)
    
            yield from data.get("results", data.get("data", []))
    
            cursor = data.get(cursor_field)
            if not cursor:
                break
    
    def paginate_link(client, limiter, url):
        """Link header pagination (GitHub style)."""
        while url:
            limiter.wait()
            response = client.get(url)
            response.raise_for_status()
    
            yield from response.json()
    
            # Parse Link header for next URL
            link_header = response.headers.get("Link", "")
            url = None
            for part in link_header.split(","):
                if 'rel="next"' in part:
                    url = part.split(";")[0].strip(" <>")
    
    # Usage — get ALL items from an API
    all_users = list(paginate_offset(
        client, "https://api.example.com/v1/users", limiter
    ))
    print(f"Fetched {len(all_users)} users")

    4. Sync Data Between Two APIs

    The most common integration task: read from API A, transform, write to API B. Here's a production-ready pattern:

    import json
    import hashlib
    from datetime import datetime, timezone
    
    class APISync:
        """Sync records between two APIs with change detection."""
    
        def __init__(self, source_client, target_client, state_file="sync_state.json"):
            self.source = source_client
            self.target = target_client
            self.state_file = state_file
            self.state = self._load_state()
    
        def _load_state(self):
            try:
                with open(self.state_file) as f:
                    return json.load(f)
            except FileNotFoundError:
                return {"last_sync": None, "synced_hashes": {}}
    
        def _save_state(self):
            with open(self.state_file, "w") as f:
                json.dump(self.state, f, indent=2, default=str)
    
        def _hash_record(self, record):
            """Create a hash to detect changes."""
            return hashlib.md5(
                json.dumps(record, sort_keys=True).encode()
            ).hexdigest()
    
        def sync(self, source_url, target_url, transform_fn, id_field="id"):
            """
            Full sync with change detection.
    
            Args:
                source_url: API endpoint to read from
                target_url: API endpoint to write to
                transform_fn: function to transform source → target format
                id_field: unique identifier field name
            """
            stats = {"created": 0, "updated": 0, "skipped": 0, "errors": 0}
    
            # Fetch all source records
            source_data = self.source.get(source_url).json()
            records = source_data if isinstance(source_data, list) else source_data.get("results", [])
    
            for record in records:
                record_id = str(record[id_field])
                record_hash = self._hash_record(record)
    
                # Skip unchanged records
                if self.state["synced_hashes"].get(record_id) == record_hash:
                    stats["skipped"] += 1
                    continue
    
                try:
                    # Transform source format → target format
                    transformed = transform_fn(record)
    
                    # Upsert to target API
                    if record_id in self.state["synced_hashes"]:
                        self.target.put(f"{target_url}/{record_id}", json=transformed)
                        stats["updated"] += 1
                    else:
                        self.target.post(target_url, json=transformed)
                        stats["created"] += 1
    
                    self.state["synced_hashes"][record_id] = record_hash
    
                except Exception as e:
                    print(f"Error syncing {record_id}: {e}")
                    stats["errors"] += 1
    
            self.state["last_sync"] = datetime.now(timezone.utc).isoformat()
            self._save_state()
    
            return stats
    
    # Usage: sync HubSpot contacts → Notion database
    def hubspot_to_notion(contact):
        """Transform HubSpot contact to Notion page format."""
        props = contact["properties"]
        return {
            "Name": props.get("firstname", "") + " " + props.get("lastname", ""),
            "Email": props.get("email", ""),
            "Company": props.get("company", ""),
            "Deal Stage": props.get("dealstage", "new"),
        }
    
    syncer = APISync(hubspot_client, notion_client)
    result = syncer.sync(
        source_url="https://api.hubapi.com/crm/v3/objects/contacts",
        target_url="https://api.notion.com/v1/pages",
        transform_fn=hubspot_to_notion,
    )
    print(f"Sync complete: {result}")

    5. Process Webhooks Reliably

    Webhooks are the reverse — APIs calling you. Here's a minimal but production-ready webhook receiver:

    from fastapi import FastAPI, Request, HTTPException
    import hmac
    import hashlib
    
    app = FastAPI()
    
    WEBHOOK_SECRET = "your-webhook-secret"
    
    def verify_signature(payload: bytes, signature: str, secret: str) -> bool:
        """Verify webhook signature (works with Stripe, GitHub, etc.)."""
        expected = hmac.new(
            secret.encode(), payload, hashlib.sha256
        ).hexdigest()
        return hmac.compare_digest(f"sha256={expected}", signature)
    
    @app.post("/webhooks/payments")
    async def handle_payment_webhook(request: Request):
        """Process incoming payment webhooks."""
        body = await request.body()
        signature = request.headers.get("X-Signature-256", "")
    
        # Always verify signatures
        if not verify_signature(body, signature, WEBHOOK_SECRET):
            raise HTTPException(status_code=401, detail="Invalid signature")
    
        event = await request.json()
    
        # Route by event type
        handlers = {
            "payment.completed": handle_payment_completed,
            "payment.failed": handle_payment_failed,
            "subscription.cancelled": handle_subscription_cancelled,
        }
    
        handler = handlers.get(event["type"])
        if handler:
            await handler(event["data"])
    
        # Always return 200 quickly — process async if needed
        return {"status": "ok"}
    
    async def handle_payment_completed(data):
        """Process a completed payment."""
        print(f"Payment received: ${data['amount']} from {data['customer_email']}")
        # Update your database, send confirmation, etc.

    6. Async API Calls for Speed

    When you need to call 100+ endpoints, synchronous requests are too slow. Use aiohttp for parallel calls:

    import aiohttp
    import asyncio
    
    async def fetch_all(urls, api_key, max_concurrent=10):
        """Fetch multiple URLs concurrently with rate limiting."""
        semaphore = asyncio.Semaphore(max_concurrent)
        results = []
    
        async def fetch_one(session, url):
            async with semaphore:
                async with session.get(url) as response:
                    if response.status == 429:
                        retry_after = int(response.headers.get("Retry-After", 5))
                        await asyncio.sleep(retry_after)
                        return await fetch_one(session, url)
    
                    response.raise_for_status()
                    return await response.json()
    
        headers = {"Authorization": f"Bearer {api_key}"}
        async with aiohttp.ClientSession(headers=headers) as session:
            tasks = [fetch_one(session, url) for url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)
    
        return [r for r in results if not isinstance(r, Exception)]
    
    # Usage: fetch 500 user profiles in parallel
    urls = [f"https://api.example.com/users/{i}" for i in range(500)]
    data = asyncio.run(fetch_all(urls, api_key="sk-xxx", max_concurrent=20))
    print(f"Fetched {len(data)} profiles")
    Pro tip: Set max_concurrent to match the API's rate limit. 20 concurrent calls with 0.1s average response time = ~200 requests/second.

    7. Error Handling That Actually Works

    Production integrations fail in creative ways. Here's a pattern that handles the common failure modes:

    import logging
    from datetime import datetime, timezone
    
    logger = logging.getLogger(__name__)
    
    class APIError(Exception):
        def __init__(self, status_code, message, response_body=None):
            self.status_code = status_code
            self.message = message
            self.response_body = response_body
            super().__init__(f"HTTP {status_code}: {message}")
    
    def safe_api_call(client, method, url, max_retries=3, **kwargs):
        """
        Make an API call with comprehensive error handling.
    
        Returns (data, None) on success, (None, error) on failure.
        """
        last_error = None
    
        for attempt in range(max_retries):
            try:
                response = getattr(client, method)(url, timeout=30, **kwargs)
    
                if response.status_code == 204:
                    return None, None  # Success, no content
    
                if response.ok:
                    return response.json(), None
    
                # Client errors (4xx) — don't retry (except 429)
                if 400 <= response.status_code < 500 and response.status_code != 429:
                    error = APIError(
                        response.status_code,
                        response.text[:200],
                        response.text
                    )
                    logger.error(f"Client error: {error}")
                    return None, error
    
                # Rate limit — wait and retry
                if response.status_code == 429:
                    wait = int(response.headers.get("Retry-After", 2 ** attempt))
                    logger.warning(f"Rate limited. Waiting {wait}s (attempt {attempt + 1})")
                    time.sleep(wait)
                    continue
    
                # Server errors (5xx) — retry with backoff
                last_error = APIError(response.status_code, response.text[:200])
                logger.warning(f"Server error: {last_error} (attempt {attempt + 1})")
                time.sleep(2 ** attempt)
    
            except requests.exceptions.Timeout:
                last_error = APIError(0, "Request timed out")
                logger.warning(f"Timeout (attempt {attempt + 1})")
                time.sleep(2 ** attempt)
    
            except requests.exceptions.ConnectionError:
                last_error = APIError(0, "Connection failed")
                logger.warning(f"Connection error (attempt {attempt + 1})")
                time.sleep(2 ** attempt)
    
        return None, last_error
    
    # Usage
    data, error = safe_api_call(client, "get", "https://api.example.com/v1/data")
    if error:
        print(f"Failed: {error}")
    else:
        print(f"Got {len(data)} records")

    Putting It All Together

    Here's a real-world example combining everything — syncing Stripe payments to a Notion database with full error handling:

    # sync_stripe_to_notion.py
    import os
    from datetime import datetime, timezone
    
    # Setup clients
    stripe_client = create_api_client(
        "https://api.stripe.com",
        api_key=os.environ["STRIPE_SECRET_KEY"]
    )
    notion_client = create_api_client(
        "https://api.notion.com",
        api_key=os.environ["NOTION_API_KEY"]
    )
    notion_client.headers["Notion-Version"] = "2022-06-28"
    
    limiter = RateLimiter(calls_per_second=5)
    
    # Fetch all payments from Stripe
    payments = list(paginate_cursor(
        stripe_client,
        "https://api.stripe.com/v1/charges",
        limiter,
        cursor_field="next_page"
    ))
    
    print(f"Found {len(payments)} payments")
    
    # Transform and sync to Notion
    def stripe_to_notion(charge):
        return {
            "parent": {"database_id": "your-db-id"},
            "properties": {
                "Amount": {"number": charge["amount"] / 100},
                "Currency": {"select": {"name": charge["currency"].upper()}},
                "Status": {"select": {"name": charge["status"]}},
                "Customer": {"email": charge.get("billing_details", {}).get("email", "")},
                "Date": {"date": {"start": datetime.fromtimestamp(
                    charge["created"], tz=timezone.utc
                ).isoformat()}},
            }
        }
    
    syncer = APISync(stripe_client, notion_client)
    result = syncer.sync(
        source_url="https://api.stripe.com/v1/charges",
        target_url="https://api.notion.com/v1/pages",
        transform_fn=stripe_to_notion,
    )
    
    print(f"Sync done: {result}")
    # → Sync done: {'created': 42, 'updated': 3, 'skipped': 155, 'errors': 0}

    Want 50+ production-ready Python scripts like these?

    The AI Agent Toolkit includes API integrators, web scrapers, data pipelines, and automation workflows — all tested and documented.

    Get the AI Agent Toolkit →

    Key Takeaways

    Related Articles

    Need a custom API integration built for your workflow? I build Python automation tools, scrapers, and data pipelines. Reach out on Telegram →