Python Async Programming — asyncio, Tasks & Concurrency
If your Python code spends most of its time waiting — for API responses, database queries, file I/O, or network calls — async programming can make it 5-50x faster without threads or multiprocessing. This guide covers everything from basic coroutines to production-grade async patterns.
Sync vs Async — Why It Matters
Consider fetching 100 URLs. Synchronously, each request blocks until complete:
# Synchronous — ~100 seconds for 100 URLs (1s each)
import requests
urls = [f"https://httpbin.org/delay/1" for _ in range(100)]
for url in urls:
resp = requests.get(url) # Blocks for ~1 second
print(resp.status_code)
With async, all 100 requests run concurrently:
# Async — ~2 seconds for 100 URLs
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as resp:
return resp.status
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
Same work, ~50x faster. The key insight: while one request waits for a server response, Python can start and manage other requests.
Core Concepts
Coroutines
A coroutine is a function defined with async def. It doesn't run when called — it returns a coroutine object that must be awaited.
import asyncio
async def greet(name: str) -> str:
"""A coroutine — pauses at await, resumes when ready."""
await asyncio.sleep(1) # Non-blocking sleep
return f"Hello, {name}!"
# Wrong — returns coroutine object, doesn't execute
result = greet("World") # <coroutine object greet at 0x...>
# Right — run the coroutine
result = asyncio.run(greet("World")) # "Hello, World!"
The Event Loop
The event loop is the scheduler. It runs coroutines, handles I/O callbacks, and manages tasks. Think of it as a single-threaded task manager that switches between coroutines at every await point.
# asyncio.run() creates and manages the event loop
asyncio.run(main()) # Python 3.7+
# Manual loop (rarely needed)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# Get the running loop (inside async code)
loop = asyncio.get_running_loop()
await — The Yield Point
Every await is a potential context switch. The event loop can run other coroutines while the current one waits:
async def worker(name: str, delay: float):
print(f"{name}: starting")
await asyncio.sleep(delay) # Yields control here
print(f"{name}: done after {delay}s")
async def main():
# These run concurrently, not sequentially!
await asyncio.gather(
worker("A", 2),
worker("B", 1),
worker("C", 3),
)
# Output order: A start, B start, C start, B done (1s), A done (2s), C done (3s)
# Total time: ~3s, not 6s
asyncio.run(main())
Tasks — Fire and Manage
Creating tasks
async def main():
# Create a task — starts running immediately
task = asyncio.create_task(worker("background", 5))
# Do other work while task runs
print("Doing other stuff...")
await asyncio.sleep(1)
# Wait for the task to complete
result = await task
print(f"Task result: {result}")
Task groups (Python 3.11+)
async def fetch_url(url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return {"url": url, "status": resp.status}
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/headers",
]
# TaskGroup: structured concurrency — all tasks complete or all cancel
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(url)) for url in urls]
# All tasks are done here
results = [t.result() for t in tasks]
for r in results:
print(f"{r['url']}: {r['status']}")
gather — Run Multiple Coroutines
async def main():
# Run concurrently, collect all results
results = await asyncio.gather(
fetch_url("https://api.github.com"),
fetch_url("https://httpbin.org/get"),
fetch_url("https://jsonplaceholder.typicode.com/posts/1"),
)
# return_exceptions=True: don't crash on individual failures
results = await asyncio.gather(
fetch_url("https://valid.com"),
fetch_url("https://will-fail.invalid"),
return_exceptions=True, # Failed tasks return Exception objects
)
for r in results:
if isinstance(r, Exception):
print(f"Failed: {r}")
else:
print(f"OK: {r}")
Controlling Concurrency
Semaphores — Rate Limiting
Without limits, 10,000 concurrent requests will overwhelm servers and your own machine. Semaphores cap concurrency:
async def fetch_with_limit(sem: asyncio.Semaphore, session, url: str):
async with sem: # Only N coroutines enter at a time
async with session.get(url) as resp:
return await resp.json()
async def main():
sem = asyncio.Semaphore(20) # Max 20 concurrent requests
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 101)]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(sem, session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} posts")
asyncio.run(main())
Queue-based worker pattern
async def worker(name: str, queue: asyncio.Queue, results: list):
"""Process items from queue until sentinel (None)."""
while True:
item = await queue.get()
if item is None:
break
try:
result = await process_item(item)
results.append(result)
except Exception as e:
print(f"Worker {name}: error processing {item}: {e}")
finally:
queue.task_done()
async def process_item(item: dict) -> dict:
"""Simulate processing with async I/O."""
await asyncio.sleep(0.1) # Simulated API call
return {"id": item["id"], "processed": True}
async def main():
queue = asyncio.Queue(maxsize=100)
results = []
num_workers = 10
# Start workers
workers = [
asyncio.create_task(worker(f"w-{i}", queue, results))
for i in range(num_workers)
]
# Feed items
items = [{"id": i, "data": f"item-{i}"} for i in range(200)]
for item in items:
await queue.put(item)
# Send stop signals
for _ in range(num_workers):
await queue.put(None)
# Wait for all workers to finish
await asyncio.gather(*workers)
print(f"Processed {len(results)} items")
asyncio.run(main())
Async I/O Patterns
HTTP with aiohttp
import aiohttp
import asyncio
async def fetch_json(session: aiohttp.ClientSession, url: str) -> dict:
"""Fetch JSON with timeout and error handling."""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
resp.raise_for_status()
return await resp.json()
except aiohttp.ClientError as e:
return {"error": str(e), "url": url}
async def post_data(session: aiohttp.ClientSession, url: str, data: dict) -> dict:
async with session.post(url, json=data) as resp:
return await resp.json()
async def main():
# Reuse one session for connection pooling
connector = aiohttp.TCPConnector(limit=30) # Connection pool
async with aiohttp.ClientSession(connector=connector) as session:
# Parallel fetches
urls = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/users/1",
"https://jsonplaceholder.typicode.com/todos/1",
]
results = await asyncio.gather(*[fetch_json(session, u) for u in urls])
for r in results:
print(r.get("title", r.get("name", r.get("error"))))
asyncio.run(main())
File I/O with aiofiles
import aiofiles
import asyncio
import json
async def read_json(path: str) -> dict:
async with aiofiles.open(path, "r") as f:
content = await f.read()
return json.loads(content)
async def write_results(path: str, data: list):
async with aiofiles.open(path, "w") as f:
await f.write(json.dumps(data, indent=2))
async def process_files(input_dir: str, output_path: str):
"""Read multiple JSON files concurrently, process, write output."""
from pathlib import Path
files = list(Path(input_dir).glob("*.json"))
tasks = [read_json(str(f)) for f in files]
records = await asyncio.gather(*tasks)
# Process
processed = [{"file": f.name, "keys": len(r)} for f, r in zip(files, records)]
await write_results(output_path, processed)
Database with asyncpg / aiosqlite
import asyncpg
import asyncio
async def main():
# Connection pool — essential for production
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/mydb",
min_size=5,
max_size=20,
)
# Query
async with pool.acquire() as conn:
rows = await conn.fetch("SELECT id, name FROM users WHERE active = $1", True)
for row in rows:
print(f"User {row['id']}: {row['name']}")
# Transaction
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
"INSERT INTO logs (message) VALUES ($1)", "async insert"
)
await conn.execute(
"UPDATE counters SET value = value + 1 WHERE name = $1", "inserts"
)
await pool.close()
asyncio.run(main())
Async Generators and Iterators
async def paginated_fetch(session, base_url: str, per_page: int = 100):
"""Async generator: yields pages of results."""
page = 1
while True:
url = f"{base_url}?page={page}&per_page={per_page}"
async with session.get(url) as resp:
data = await resp.json()
if not data:
break
yield data
page += 1
async def main():
async with aiohttp.ClientSession() as session:
all_items = []
async for page_items in paginated_fetch(session, "https://api.example.com/items"):
all_items.extend(page_items)
print(f"Fetched {len(all_items)} items so far...")
print(f"Total: {len(all_items)}")
Async context managers
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_session():
"""Custom async context manager for HTTP sessions."""
session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
headers={"User-Agent": "MyBot/1.0"},
)
try:
yield session
finally:
await session.close()
async def main():
async with managed_session() as session:
async with session.get("https://httpbin.org/get") as resp:
print(await resp.json())
Error Handling
Timeouts
async def fetch_with_timeout(url: str, timeout_seconds: float = 5.0):
"""Cancel if too slow."""
try:
async with asyncio.timeout(timeout_seconds): # Python 3.11+
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
except TimeoutError:
print(f"Timeout fetching {url}")
return None
# For Python 3.10 and below:
async def fetch_with_timeout_legacy(url: str, timeout_seconds: float = 5.0):
try:
return await asyncio.wait_for(
_fetch(url),
timeout=timeout_seconds,
)
except asyncio.TimeoutError:
return None
Graceful shutdown
import signal
async def shutdown(loop, signal=None):
"""Cleanup tasks on shutdown."""
if signal:
print(f"Received exit signal {signal.name}")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
print(f"Cancelling {len(tasks)} outstanding tasks")
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
def main():
loop = asyncio.new_event_loop()
# Handle SIGTERM/SIGINT
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda s=sig: asyncio.create_task(shutdown(loop, signal=s)),
)
try:
loop.run_until_complete(run_server())
finally:
loop.close()
Retry with exponential backoff
import random
async def retry_async(
coro_func,
*args,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
**kwargs,
):
"""Retry an async function with exponential backoff + jitter."""
for attempt in range(max_retries + 1):
try:
return await coro_func(*args, **kwargs)
except Exception as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.5)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay + jitter:.1f}s")
await asyncio.sleep(delay + jitter)
# Usage
result = await retry_async(fetch_json, session, "https://flaky-api.com/data")
Threading + Async — Mixing Worlds
Some libraries are synchronous (database drivers, file operations). Use run_in_executor to run blocking code without freezing the event loop:
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
def blocking_io(path: str) -> str:
"""Synchronous function — would block the event loop."""
with open(path) as f:
return f.read()
def cpu_heavy(data: str) -> int:
"""CPU-bound work — also blocks."""
return sum(ord(c) for c in data)
async def main():
loop = asyncio.get_running_loop()
# Run blocking I/O in thread pool
content = await loop.run_in_executor(executor, blocking_io, "large_file.txt")
# Run CPU work in thread pool
result = await loop.run_in_executor(executor, cpu_heavy, content)
# Or use ProcessPoolExecutor for true CPU parallelism
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy, content)
asyncio.run(main())
Real-World Example: Async Scraper
import asyncio
import aiohttp
import aiofiles
import json
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
@dataclass
class ScrapedItem:
url: str
title: str
status: int
scraped_at: str
async def scrape_page(
sem: asyncio.Semaphore,
session: aiohttp.ClientSession,
url: str,
) -> ScrapedItem | None:
"""Scrape a single page with rate limiting."""
async with sem:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
text = await resp.text()
# Extract title (simplified)
title_start = text.find("<title>")
title_end = text.find("</title>")
title = text[title_start + 7:title_end] if title_start > 0 else "No title"
return ScrapedItem(
url=url,
title=title.strip(),
status=resp.status,
scraped_at=datetime.now(timezone.utc).isoformat(),
)
except Exception as e:
print(f"Error scraping {url}: {e}")
return None
async def main():
urls = [
"https://httpbin.org/html",
"https://example.com",
"https://jsonplaceholder.typicode.com",
]
sem = asyncio.Semaphore(10) # Max 10 concurrent
connector = aiohttp.TCPConnector(limit=20)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [scrape_page(sem, session, url) for url in urls]
results = await asyncio.gather(*tasks)
# Filter out failures
items = [asdict(r) for r in results if r is not None]
# Save results
async with aiofiles.open("scraped.json", "w") as f:
await f.write(json.dumps(items, indent=2))
print(f"Scraped {len(items)}/{len(urls)} pages")
asyncio.run(main())
When NOT to Use Async
- CPU-bound work — asyncio won't help with number crunching. Use multiprocessing or ProcessPoolExecutor
- Simple scripts — if you make 3 API calls sequentially, sync is fine and simpler
- Libraries without async support — wrapping everything in run_in_executor adds complexity with little benefit
- Debugging — async stack traces are harder to read; start sync, go async when needed
Concurrency Comparison
| Approach | Best For | Overhead | Scaling |
|---|---|---|---|
| asyncio | I/O-bound (network, files) | Very low | Thousands of connections |
| threading | I/O-bound + legacy libs | Medium (GIL) | Dozens of threads |
| multiprocessing | CPU-bound | High (process spawn) | Number of CPU cores |
| concurrent.futures | Mixed workloads | Medium | Configurable pool |
Best Practices
- Always use semaphores for external API calls — unbounded concurrency will get you rate-limited or banned
- Reuse sessions — create one aiohttp.ClientSession and share it across coroutines
- Prefer TaskGroup over gather (Python 3.11+) — better error handling and cancellation
- Handle timeouts explicitly — asyncio.timeout() prevents hung coroutines
- Don't block the event loop — use run_in_executor for any sync I/O or CPU work
- Use structured concurrency — every task should have an owner that awaits it
- Test with pytest-asyncio — @pytest.mark.asyncio for async test functions
- Profile with asyncio.get_event_loop().slow_callback_duration — find blocking calls that stall the loop
🚀 Want production-ready async Python scripts, scrapers, and automation tools?
Related Articles
- Build a REST API with FastAPI — FastAPI is built on async Python
- Automate API Integrations with Python — sync and async patterns for API work
- WebSockets in Python — Real-Time Apps — async WebSocket connections
- Web Scraping with Python — scraping at scale with async
- Build a Data Pipeline in Python — ETL with async data sources
- Python Testing Guide — testing async code with pytest-asyncio
Need a high-performance async system built for your project? I build Python APIs, scrapers, and data pipelines. Reach out on Telegram →