Python Concurrency — Threading, Multiprocessing & When to Use What
Python gives you three concurrency models: threading, multiprocessing, and asyncio. Each solves different problems. Pick wrong and your "parallel" code runs slower than serial. This guide shows you when to use each, with real code and benchmarks — not toy examples.
For async/await specifically, see our Python Async Programming guide. This article focuses on threading and multiprocessing, plus how all three approaches compare.
The GIL — Why It Matters
The Global Interpreter Lock (GIL) is a mutex that protects Python objects from concurrent access. In practice:
- Threads CAN'T run Python bytecode in parallel — only one thread executes at a time
- Threads CAN run I/O in parallel — the GIL is released during file reads, network calls, time.sleep()
- Processes bypass the GIL entirely — each process has its own interpreter and memory space
- Python 3.13+ introduced experimental free-threaded builds (no GIL), but it's opt-in and not yet mainstream
Threading — For I/O-Bound Work
Basic threading
import threading
import time
import requests
def download_page(url: str) -> int:
"""Download a page, return status code."""
resp = requests.get(url, timeout=10)
return resp.status_code
# --- Sequential (slow) ---
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
start = time.perf_counter()
for url in urls:
download_page(url)
print(f"Sequential: {time.perf_counter() - start:.1f}s") # ~4.0s
# --- Threaded (fast) ---
start = time.perf_counter()
threads = []
for url in urls:
t = threading.Thread(target=download_page, args=(url,))
t.start()
threads.append(t)
for t in threads:
t.join()
print(f"Threaded: {time.perf_counter() - start:.1f}s") # ~1.0s
ThreadPoolExecutor (preferred)
Don't manage threads manually. concurrent.futures gives you a clean, high-level API:
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
def fetch(url: str) -> dict:
start = time.perf_counter()
resp = requests.get(url, timeout=10)
elapsed = time.perf_counter() - start
return {"url": url, "status": resp.status_code, "time": f"{elapsed:.2f}s"}
urls = [
"https://api.github.com",
"https://httpbin.org/get",
"https://jsonplaceholder.typicode.com/posts/1",
"https://api.ipify.org?format=json",
"https://httpbin.org/delay/2",
]
# Process results as they complete (not in order)
with ThreadPoolExecutor(max_workers=5) as pool:
futures = {pool.submit(fetch, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
result = future.result()
print(f"✅ {result['url']}: {result['status']} ({result['time']})")
except Exception as e:
print(f"❌ {url}: {e}")
Thread-safe data structures
import threading
from queue import Queue
from collections import defaultdict
# --- Thread-safe counter ---
class SafeCounter:
def __init__(self):
self._count = 0
self._lock = threading.Lock()
def increment(self, n: int = 1):
with self._lock:
self._count += n
@property
def value(self) -> int:
with self._lock:
return self._count
# --- Producer/Consumer with Queue ---
def producer(queue: Queue, items: list):
for item in items:
queue.put(item)
print(f"Produced: {item}")
queue.put(None) # sentinel
def consumer(queue: Queue, name: str):
while True:
item = queue.get()
if item is None:
queue.put(None) # pass sentinel to other consumers
break
print(f"{name} consumed: {item}")
queue.task_done()
q = Queue(maxsize=10) # bounded queue — blocks producer when full
t1 = threading.Thread(target=producer, args=(q, list(range(20))))
t2 = threading.Thread(target=consumer, args=(q, "Worker-A"))
t3 = threading.Thread(target=consumer, args=(q, "Worker-B"))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()
Multiprocessing — For CPU-Bound Work
Basic multiprocessing
import multiprocessing
import time
import math
def is_prime(n: int) -> bool:
if n < 2:
return False
if n < 4:
return True
if n % 2 == 0 or n % 3 == 0:
return False
i = 5
while i * i <= n:
if n % i == 0 or n % (i + 2) == 0:
return False
i += 6
return True
def count_primes(start: int, end: int) -> int:
"""Count primes in range [start, end)."""
return sum(1 for n in range(start, end) if is_prime(n))
LIMIT = 2_000_000
# --- Sequential ---
start = time.perf_counter()
total = count_primes(2, LIMIT)
print(f"Sequential: {total} primes in {time.perf_counter() - start:.2f}s")
# --- Multiprocessing ---
start = time.perf_counter()
num_cores = multiprocessing.cpu_count()
chunk_size = LIMIT // num_cores
ranges = [(i * chunk_size, (i + 1) * chunk_size) for i in range(num_cores)]
with multiprocessing.Pool(num_cores) as pool:
results = pool.starmap(count_primes, ranges)
total = sum(results)
print(f"Parallel ({num_cores} cores): {total} primes in {time.perf_counter() - start:.2f}s")
Sequential: 148933 primes in 3.21s
Parallel (8 cores): 148933 primes in 0.52s
6.2x speedup — that's real parallelism (not possible with threads due to GIL).
ProcessPoolExecutor (preferred)
from concurrent.futures import ProcessPoolExecutor
import time
def heavy_compute(n: int) -> dict:
"""Simulate CPU-bound work: compute Fibonacci iteratively."""
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return {"n": n, "digits": len(str(a))}
inputs = [500_000, 600_000, 700_000, 800_000, 900_000, 1_000_000]
# --- Sequential ---
start = time.perf_counter()
results_seq = [heavy_compute(n) for n in inputs]
print(f"Sequential: {time.perf_counter() - start:.2f}s")
# --- Parallel ---
start = time.perf_counter()
with ProcessPoolExecutor() as pool:
results_par = list(pool.map(heavy_compute, inputs))
print(f"Parallel: {time.perf_counter() - start:.2f}s")
for r in results_par:
print(f" fib({r['n']}): {r['digits']} digits")
Sharing data between processes
import multiprocessing
from multiprocessing import Value, Array, Manager
# --- Shared memory (fast, limited types) ---
def worker_shared(counter, results, index, value):
"""Worker with shared memory."""
with counter.get_lock():
counter.value += 1
results[index] = value * 2
counter = Value('i', 0) # shared int
results = Array('d', 10) # shared array of doubles
processes = []
for i in range(10):
p = multiprocessing.Process(
target=worker_shared, args=(counter, results, i, float(i))
)
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Counter: {counter.value}") # 10
print(f"Results: {list(results)}") # [0.0, 2.0, 4.0, ...]
# --- Manager (flexible, slower — supports dicts, lists, Queues) ---
def worker_manager(shared_dict, key, value):
shared_dict[key] = value ** 2
with Manager() as manager:
d = manager.dict()
processes = [
multiprocessing.Process(target=worker_manager, args=(d, i, i))
for i in range(8)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Manager dict: {dict(d)}") # {0: 0, 1: 1, 2: 4, 3: 9, ...}
Combining Threading + Multiprocessing
Real-world apps often need both. Example: scrape URLs (I/O) then process the data (CPU).
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
import hashlib
import time
def download(url: str) -> bytes:
"""I/O-bound: download content."""
resp = requests.get(url, timeout=10)
return resp.content
def process_content(data: bytes) -> dict:
"""CPU-bound: hash and analyze content."""
h = hashlib.sha256(data).hexdigest()
# Simulate heavy processing
words = len(data.decode("utf-8", errors="ignore").split())
return {"hash": h[:12], "size": len(data), "words": words}
urls = [
"https://jsonplaceholder.typicode.com/posts",
"https://jsonplaceholder.typicode.com/comments",
"https://jsonplaceholder.typicode.com/users",
"https://httpbin.org/get",
]
# Step 1: Download concurrently (threads — I/O bound)
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as tpool:
raw_data = list(tpool.map(download, urls))
# Step 2: Process concurrently (processes — CPU bound)
with ProcessPoolExecutor() as ppool:
results = list(ppool.map(process_content, raw_data))
elapsed = time.perf_counter() - start
for url, result in zip(urls, results):
print(f"{url.split('/')[-1]}: {result}")
print(f"Total: {elapsed:.2f}s")
The Decision Framework
| Workload Type | Best Approach | Why |
|---|---|---|
| API calls, web scraping | asyncio or ThreadPool | I/O-bound, GIL released during network wait |
| File reading/writing | ThreadPool | I/O-bound, simple API |
| Image processing, ML | ProcessPool | CPU-bound, bypasses GIL |
| Number crunching | ProcessPool (or NumPy) | CPU-bound; NumPy releases GIL internally |
| High-concurrency server | asyncio | Thousands of connections, minimal overhead |
| Mixed I/O + CPU | ThreadPool → ProcessPool | Download in threads, process in processes |
| Simple parallel tasks | concurrent.futures | Easiest API, swap Thread/Process with one line |
Quick decision tree
# Is your code waiting or computing?
#
# WAITING (I/O-bound)
# ├── Many connections (1000+)? → asyncio
# ├── Few connections, simple? → ThreadPoolExecutor
# └── Legacy sync library? → ThreadPoolExecutor
#
# COMPUTING (CPU-bound)
# ├── NumPy/pandas ops? → Already parallel (release GIL)
# ├── Pure Python math? → ProcessPoolExecutor
# └── Need shared state? → multiprocessing.Manager or Queue
Common Pitfalls
1. Race conditions
import threading
# ❌ BAD — race condition
counter = 0
def increment():
global counter
for _ in range(100_000):
counter += 1 # NOT atomic!
threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 400000, Got: {counter}") # Usually less!
# ✅ GOOD — use a lock
counter = 0
lock = threading.Lock()
def safe_increment():
global counter
for _ in range(100_000):
with lock:
counter += 1
threads = [threading.Thread(target=safe_increment) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 400000, Got: {counter}") # Always 400000
2. Deadlocks
# ❌ BAD — potential deadlock (circular lock dependency)
lock_a = threading.Lock()
lock_b = threading.Lock()
def worker1():
with lock_a:
time.sleep(0.01)
with lock_b: # waits for worker2 to release lock_b
print("Worker 1 done")
def worker2():
with lock_b:
time.sleep(0.01)
with lock_a: # waits for worker1 to release lock_a → DEADLOCK
print("Worker 2 done")
# ✅ GOOD — always acquire locks in the same order
def worker1_fixed():
with lock_a:
with lock_b:
print("Worker 1 done")
def worker2_fixed():
with lock_a: # same order as worker1
with lock_b:
print("Worker 2 done")
3. Pickling errors in multiprocessing
# ❌ BAD — lambdas and local functions can't be pickled
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as pool:
# This fails: lambda isn't picklable
# results = list(pool.map(lambda x: x**2, range(10)))
pass
# ✅ GOOD — use module-level functions
def square(x):
return x ** 2
with ProcessPoolExecutor() as pool:
results = list(pool.map(square, range(10)))
print(results) # [0, 1, 4, 9, 16, ...]
4. Process startup overhead
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
# Small tasks — process overhead dominates
def tiny_task(x):
return x + 1
data = list(range(100))
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as pool:
list(pool.map(tiny_task, data))
print(f"ProcessPool: {time.perf_counter() - start:.3f}s") # ~0.5s (overhead!)
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as pool:
list(pool.map(tiny_task, data))
print(f"ThreadPool: {time.perf_counter() - start:.3f}s") # ~0.002s
start = time.perf_counter()
list(map(tiny_task, data))
print(f"Sequential: {time.perf_counter() - start:.6f}s") # ~0.00001s
# Lesson: only use ProcessPool when tasks take >10ms each
Production Patterns
Graceful shutdown
import signal
import threading
from concurrent.futures import ThreadPoolExecutor
shutdown_event = threading.Event()
def signal_handler(signum, frame):
print("\nShutdown requested...")
shutdown_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def worker(task_id: int):
"""Worker that checks for shutdown."""
for step in range(10):
if shutdown_event.is_set():
print(f"Task {task_id}: stopping at step {step}")
return
time.sleep(0.5) # simulate work
print(f"Task {task_id}: complete")
with ThreadPoolExecutor(max_workers=3) as pool:
futures = [pool.submit(worker, i) for i in range(5)]
# Pool waits for running tasks, cancels pending ones on shutdown
Rate-limited thread pool
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class RateLimiter:
"""Token bucket rate limiter for thread pools."""
def __init__(self, calls_per_second: float):
self._interval = 1.0 / calls_per_second
self._lock = threading.Lock()
self._last_call = 0.0
def acquire(self):
with self._lock:
now = time.monotonic()
wait = self._last_call + self._interval - now
if wait > 0:
time.sleep(wait)
self._last_call = time.monotonic()
limiter = RateLimiter(calls_per_second=5) # max 5 req/s
def rate_limited_fetch(url: str) -> int:
limiter.acquire()
resp = requests.get(url, timeout=10)
return resp.status_code
with ThreadPoolExecutor(max_workers=10) as pool:
# 10 threads but only 5 requests/second
results = list(pool.map(rate_limited_fetch, urls))
Progress tracking
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def process_item(item: int) -> dict:
time.sleep(0.1) # simulate work
return {"item": item, "result": item * 2}
items = list(range(50))
results = []
with ThreadPoolExecutor(max_workers=8) as pool:
futures = {pool.submit(process_item, i): i for i in items}
done_count = 0
for future in as_completed(futures):
done_count += 1
result = future.result()
results.append(result)
# Progress bar
pct = done_count / len(items) * 100
bar = "█" * int(pct // 2) + "░" * (50 - int(pct // 2))
print(f"\r[{bar}] {pct:.0f}% ({done_count}/{len(items)})", end="", flush=True)
print(f"\nDone! Processed {len(results)} items.")
🚀 Production-ready concurrency patterns, API integrators, and automation scripts
Related Articles
- Python Async Programming — asyncio, Tasks & Concurrency — deep dive into async/await
- Build a Data Pipeline in Python — parallel ETL with thread/process pools
- Web Scraping with Python — concurrent scraping patterns
- Automate API Integrations — threaded API calls with rate limiting
- Task Scheduling & Cron Jobs — schedule concurrent workers
- Python Logging & Monitoring — thread-safe logging for concurrent apps
Need high-performance concurrent Python code? I build parallel data pipelines, scrapers, and API integrations. Reach out on Telegram →