Python Async Programming — asyncio, Tasks & Concurrency

March 2026 · 22 min read · Python, asyncio, Concurrency

If your Python code spends most of its time waiting — for API responses, database queries, file I/O, or network calls — async programming can make it 5-50x faster without threads or multiprocessing. This guide covers everything from basic coroutines to production-grade async patterns.

Sync vs Async — Why It Matters

Consider fetching 100 URLs. Synchronously, each request blocks until complete:

# Synchronous — ~100 seconds for 100 URLs (1s each)
import requests

urls = [f"https://httpbin.org/delay/1" for _ in range(100)]

for url in urls:
    resp = requests.get(url)  # Blocks for ~1 second
    print(resp.status_code)

With async, all 100 requests run concurrently:

# Async — ~2 seconds for 100 URLs
import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as resp:
        return resp.status

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(results)

asyncio.run(main())

Same work, ~50x faster. The key insight: while one request waits for a server response, Python can start and manage other requests.

Core Concepts

Coroutines

A coroutine is a function defined with async def. It doesn't run when called — it returns a coroutine object that must be awaited.

import asyncio

async def greet(name: str) -> str:
    """A coroutine — pauses at await, resumes when ready."""
    await asyncio.sleep(1)  # Non-blocking sleep
    return f"Hello, {name}!"

# Wrong — returns coroutine object, doesn't execute
result = greet("World")  # <coroutine object greet at 0x...>

# Right — run the coroutine
result = asyncio.run(greet("World"))  # "Hello, World!"

The Event Loop

The event loop is the scheduler. It runs coroutines, handles I/O callbacks, and manages tasks. Think of it as a single-threaded task manager that switches between coroutines at every await point.

# asyncio.run() creates and manages the event loop
asyncio.run(main())  # Python 3.7+

# Manual loop (rarely needed)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# Get the running loop (inside async code)
loop = asyncio.get_running_loop()

await — The Yield Point

Every await is a potential context switch. The event loop can run other coroutines while the current one waits:

async def worker(name: str, delay: float):
    print(f"{name}: starting")
    await asyncio.sleep(delay)  # Yields control here
    print(f"{name}: done after {delay}s")

async def main():
    # These run concurrently, not sequentially!
    await asyncio.gather(
        worker("A", 2),
        worker("B", 1),
        worker("C", 3),
    )
    # Output order: A start, B start, C start, B done (1s), A done (2s), C done (3s)
    # Total time: ~3s, not 6s

asyncio.run(main())

Tasks — Fire and Manage

Creating tasks

async def main():
    # Create a task — starts running immediately
    task = asyncio.create_task(worker("background", 5))

    # Do other work while task runs
    print("Doing other stuff...")
    await asyncio.sleep(1)

    # Wait for the task to complete
    result = await task
    print(f"Task result: {result}")

Task groups (Python 3.11+)

async def fetch_url(url: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return {"url": url, "status": resp.status}


async def main():
    urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/headers",
    ]

    # TaskGroup: structured concurrency — all tasks complete or all cancel
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_url(url)) for url in urls]

    # All tasks are done here
    results = [t.result() for t in tasks]
    for r in results:
        print(f"{r['url']}: {r['status']}")
💡 Tip: TaskGroup (Python 3.11+) is preferred over gather for new code. If any task raises an exception, all others are cancelled — preventing orphaned tasks.

gather — Run Multiple Coroutines

async def main():
    # Run concurrently, collect all results
    results = await asyncio.gather(
        fetch_url("https://api.github.com"),
        fetch_url("https://httpbin.org/get"),
        fetch_url("https://jsonplaceholder.typicode.com/posts/1"),
    )

    # return_exceptions=True: don't crash on individual failures
    results = await asyncio.gather(
        fetch_url("https://valid.com"),
        fetch_url("https://will-fail.invalid"),
        return_exceptions=True,  # Failed tasks return Exception objects
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"Failed: {r}")
        else:
            print(f"OK: {r}")

Controlling Concurrency

Semaphores — Rate Limiting

Without limits, 10,000 concurrent requests will overwhelm servers and your own machine. Semaphores cap concurrency:

async def fetch_with_limit(sem: asyncio.Semaphore, session, url: str):
    async with sem:  # Only N coroutines enter at a time
        async with session.get(url) as resp:
            return await resp.json()


async def main():
    sem = asyncio.Semaphore(20)  # Max 20 concurrent requests
    urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 101)]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(sem, session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    print(f"Fetched {len(results)} posts")

asyncio.run(main())

Queue-based worker pattern

async def worker(name: str, queue: asyncio.Queue, results: list):
    """Process items from queue until sentinel (None)."""
    while True:
        item = await queue.get()
        if item is None:
            break
        try:
            result = await process_item(item)
            results.append(result)
        except Exception as e:
            print(f"Worker {name}: error processing {item}: {e}")
        finally:
            queue.task_done()


async def process_item(item: dict) -> dict:
    """Simulate processing with async I/O."""
    await asyncio.sleep(0.1)  # Simulated API call
    return {"id": item["id"], "processed": True}


async def main():
    queue = asyncio.Queue(maxsize=100)
    results = []
    num_workers = 10

    # Start workers
    workers = [
        asyncio.create_task(worker(f"w-{i}", queue, results))
        for i in range(num_workers)
    ]

    # Feed items
    items = [{"id": i, "data": f"item-{i}"} for i in range(200)]
    for item in items:
        await queue.put(item)

    # Send stop signals
    for _ in range(num_workers):
        await queue.put(None)

    # Wait for all workers to finish
    await asyncio.gather(*workers)
    print(f"Processed {len(results)} items")

asyncio.run(main())

Async I/O Patterns

HTTP with aiohttp

import aiohttp
import asyncio

async def fetch_json(session: aiohttp.ClientSession, url: str) -> dict:
    """Fetch JSON with timeout and error handling."""
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
            resp.raise_for_status()
            return await resp.json()
    except aiohttp.ClientError as e:
        return {"error": str(e), "url": url}


async def post_data(session: aiohttp.ClientSession, url: str, data: dict) -> dict:
    async with session.post(url, json=data) as resp:
        return await resp.json()


async def main():
    # Reuse one session for connection pooling
    connector = aiohttp.TCPConnector(limit=30)  # Connection pool
    async with aiohttp.ClientSession(connector=connector) as session:
        # Parallel fetches
        urls = [
            "https://jsonplaceholder.typicode.com/posts/1",
            "https://jsonplaceholder.typicode.com/users/1",
            "https://jsonplaceholder.typicode.com/todos/1",
        ]
        results = await asyncio.gather(*[fetch_json(session, u) for u in urls])
        for r in results:
            print(r.get("title", r.get("name", r.get("error"))))

asyncio.run(main())

File I/O with aiofiles

import aiofiles
import asyncio
import json

async def read_json(path: str) -> dict:
    async with aiofiles.open(path, "r") as f:
        content = await f.read()
        return json.loads(content)


async def write_results(path: str, data: list):
    async with aiofiles.open(path, "w") as f:
        await f.write(json.dumps(data, indent=2))


async def process_files(input_dir: str, output_path: str):
    """Read multiple JSON files concurrently, process, write output."""
    from pathlib import Path

    files = list(Path(input_dir).glob("*.json"))
    tasks = [read_json(str(f)) for f in files]
    records = await asyncio.gather(*tasks)

    # Process
    processed = [{"file": f.name, "keys": len(r)} for f, r in zip(files, records)]
    await write_results(output_path, processed)

Database with asyncpg / aiosqlite

import asyncpg
import asyncio


async def main():
    # Connection pool — essential for production
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/mydb",
        min_size=5,
        max_size=20,
    )

    # Query
    async with pool.acquire() as conn:
        rows = await conn.fetch("SELECT id, name FROM users WHERE active = $1", True)
        for row in rows:
            print(f"User {row['id']}: {row['name']}")

    # Transaction
    async with pool.acquire() as conn:
        async with conn.transaction():
            await conn.execute(
                "INSERT INTO logs (message) VALUES ($1)", "async insert"
            )
            await conn.execute(
                "UPDATE counters SET value = value + 1 WHERE name = $1", "inserts"
            )

    await pool.close()

asyncio.run(main())

Async Generators and Iterators

async def paginated_fetch(session, base_url: str, per_page: int = 100):
    """Async generator: yields pages of results."""
    page = 1
    while True:
        url = f"{base_url}?page={page}&per_page={per_page}"
        async with session.get(url) as resp:
            data = await resp.json()
            if not data:
                break
            yield data
            page += 1


async def main():
    async with aiohttp.ClientSession() as session:
        all_items = []
        async for page_items in paginated_fetch(session, "https://api.example.com/items"):
            all_items.extend(page_items)
            print(f"Fetched {len(all_items)} items so far...")

        print(f"Total: {len(all_items)}")

Async context managers

from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_session():
    """Custom async context manager for HTTP sessions."""
    session = aiohttp.ClientSession(
        timeout=aiohttp.ClientTimeout(total=30),
        headers={"User-Agent": "MyBot/1.0"},
    )
    try:
        yield session
    finally:
        await session.close()


async def main():
    async with managed_session() as session:
        async with session.get("https://httpbin.org/get") as resp:
            print(await resp.json())

Error Handling

Timeouts

async def fetch_with_timeout(url: str, timeout_seconds: float = 5.0):
    """Cancel if too slow."""
    try:
        async with asyncio.timeout(timeout_seconds):  # Python 3.11+
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as resp:
                    return await resp.json()
    except TimeoutError:
        print(f"Timeout fetching {url}")
        return None

# For Python 3.10 and below:
async def fetch_with_timeout_legacy(url: str, timeout_seconds: float = 5.0):
    try:
        return await asyncio.wait_for(
            _fetch(url),
            timeout=timeout_seconds,
        )
    except asyncio.TimeoutError:
        return None

Graceful shutdown

import signal

async def shutdown(loop, signal=None):
    """Cleanup tasks on shutdown."""
    if signal:
        print(f"Received exit signal {signal.name}")

    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    print(f"Cancelling {len(tasks)} outstanding tasks")

    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)
    loop.stop()


def main():
    loop = asyncio.new_event_loop()

    # Handle SIGTERM/SIGINT
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(
            sig,
            lambda s=sig: asyncio.create_task(shutdown(loop, signal=s)),
        )

    try:
        loop.run_until_complete(run_server())
    finally:
        loop.close()

Retry with exponential backoff

import random

async def retry_async(
    coro_func,
    *args,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0,
    **kwargs,
):
    """Retry an async function with exponential backoff + jitter."""
    for attempt in range(max_retries + 1):
        try:
            return await coro_func(*args, **kwargs)
        except Exception as e:
            if attempt == max_retries:
                raise
            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = random.uniform(0, delay * 0.5)
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay + jitter:.1f}s")
            await asyncio.sleep(delay + jitter)


# Usage
result = await retry_async(fetch_json, session, "https://flaky-api.com/data")

Threading + Async — Mixing Worlds

Some libraries are synchronous (database drivers, file operations). Use run_in_executor to run blocking code without freezing the event loop:

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

def blocking_io(path: str) -> str:
    """Synchronous function — would block the event loop."""
    with open(path) as f:
        return f.read()

def cpu_heavy(data: str) -> int:
    """CPU-bound work — also blocks."""
    return sum(ord(c) for c in data)


async def main():
    loop = asyncio.get_running_loop()

    # Run blocking I/O in thread pool
    content = await loop.run_in_executor(executor, blocking_io, "large_file.txt")

    # Run CPU work in thread pool
    result = await loop.run_in_executor(executor, cpu_heavy, content)

    # Or use ProcessPoolExecutor for true CPU parallelism
    from concurrent.futures import ProcessPoolExecutor
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_heavy, content)

asyncio.run(main())
🔑 Rule of thumb: Use asyncio for I/O-bound work (network, files, DB). Use ProcessPoolExecutor for CPU-bound work (data crunching, image processing). Use ThreadPoolExecutor for blocking I/O libraries that don't have async versions.

Real-World Example: Async Scraper

import asyncio
import aiohttp
import aiofiles
import json
from dataclasses import dataclass, asdict
from datetime import datetime, timezone


@dataclass
class ScrapedItem:
    url: str
    title: str
    status: int
    scraped_at: str


async def scrape_page(
    sem: asyncio.Semaphore,
    session: aiohttp.ClientSession,
    url: str,
) -> ScrapedItem | None:
    """Scrape a single page with rate limiting."""
    async with sem:
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                text = await resp.text()
                # Extract title (simplified)
                title_start = text.find("<title>")
                title_end = text.find("</title>")
                title = text[title_start + 7:title_end] if title_start > 0 else "No title"

                return ScrapedItem(
                    url=url,
                    title=title.strip(),
                    status=resp.status,
                    scraped_at=datetime.now(timezone.utc).isoformat(),
                )
        except Exception as e:
            print(f"Error scraping {url}: {e}")
            return None


async def main():
    urls = [
        "https://httpbin.org/html",
        "https://example.com",
        "https://jsonplaceholder.typicode.com",
    ]

    sem = asyncio.Semaphore(10)  # Max 10 concurrent
    connector = aiohttp.TCPConnector(limit=20)

    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [scrape_page(sem, session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    # Filter out failures
    items = [asdict(r) for r in results if r is not None]

    # Save results
    async with aiofiles.open("scraped.json", "w") as f:
        await f.write(json.dumps(items, indent=2))

    print(f"Scraped {len(items)}/{len(urls)} pages")

asyncio.run(main())

When NOT to Use Async

Concurrency Comparison

ApproachBest ForOverheadScaling
asyncioI/O-bound (network, files)Very lowThousands of connections
threadingI/O-bound + legacy libsMedium (GIL)Dozens of threads
multiprocessingCPU-boundHigh (process spawn)Number of CPU cores
concurrent.futuresMixed workloadsMediumConfigurable pool

Best Practices

🚀 Want production-ready async Python scripts, scrapers, and automation tools?

Get the AI Agent Toolkit →

Related Articles

Need a high-performance async system built for your project? I build Python APIs, scrapers, and data pipelines. Reach out on Telegram →