Python Concurrency — Threading, Multiprocessing & When to Use What

March 2026 · 20 min read · Python, Concurrency, Performance

Python gives you three concurrency models: threading, multiprocessing, and asyncio. Each solves different problems. Pick wrong and your "parallel" code runs slower than serial. This guide shows you when to use each, with real code and benchmarks — not toy examples.

For async/await specifically, see our Python Async Programming guide. This article focuses on threading and multiprocessing, plus how all three approaches compare.

The GIL — Why It Matters

The Global Interpreter Lock (GIL) is a mutex that protects Python objects from concurrent access. In practice:

💡 Simple rule: Waiting for things (network, disk, APIs)? Use threads or asyncio. Crunching numbers (image processing, ML, compression)? Use multiprocessing.

Threading — For I/O-Bound Work

Basic threading

import threading
import time
import requests


def download_page(url: str) -> int:
    """Download a page, return status code."""
    resp = requests.get(url, timeout=10)
    return resp.status_code


# --- Sequential (slow) ---
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
]

start = time.perf_counter()
for url in urls:
    download_page(url)
print(f"Sequential: {time.perf_counter() - start:.1f}s")  # ~4.0s


# --- Threaded (fast) ---
start = time.perf_counter()
threads = []
for url in urls:
    t = threading.Thread(target=download_page, args=(url,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()
print(f"Threaded: {time.perf_counter() - start:.1f}s")  # ~1.0s

ThreadPoolExecutor (preferred)

Don't manage threads manually. concurrent.futures gives you a clean, high-level API:

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time


def fetch(url: str) -> dict:
    start = time.perf_counter()
    resp = requests.get(url, timeout=10)
    elapsed = time.perf_counter() - start
    return {"url": url, "status": resp.status_code, "time": f"{elapsed:.2f}s"}


urls = [
    "https://api.github.com",
    "https://httpbin.org/get",
    "https://jsonplaceholder.typicode.com/posts/1",
    "https://api.ipify.org?format=json",
    "https://httpbin.org/delay/2",
]

# Process results as they complete (not in order)
with ThreadPoolExecutor(max_workers=5) as pool:
    futures = {pool.submit(fetch, url): url for url in urls}

    for future in as_completed(futures):
        url = futures[future]
        try:
            result = future.result()
            print(f"✅ {result['url']}: {result['status']} ({result['time']})")
        except Exception as e:
            print(f"❌ {url}: {e}")

Thread-safe data structures

import threading
from queue import Queue
from collections import defaultdict


# --- Thread-safe counter ---
class SafeCounter:
    def __init__(self):
        self._count = 0
        self._lock = threading.Lock()

    def increment(self, n: int = 1):
        with self._lock:
            self._count += n

    @property
    def value(self) -> int:
        with self._lock:
            return self._count


# --- Producer/Consumer with Queue ---
def producer(queue: Queue, items: list):
    for item in items:
        queue.put(item)
        print(f"Produced: {item}")
    queue.put(None)  # sentinel


def consumer(queue: Queue, name: str):
    while True:
        item = queue.get()
        if item is None:
            queue.put(None)  # pass sentinel to other consumers
            break
        print(f"{name} consumed: {item}")
        queue.task_done()


q = Queue(maxsize=10)  # bounded queue — blocks producer when full
t1 = threading.Thread(target=producer, args=(q, list(range(20))))
t2 = threading.Thread(target=consumer, args=(q, "Worker-A"))
t3 = threading.Thread(target=consumer, args=(q, "Worker-B"))

t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

Multiprocessing — For CPU-Bound Work

Basic multiprocessing

import multiprocessing
import time
import math


def is_prime(n: int) -> bool:
    if n < 2:
        return False
    if n < 4:
        return True
    if n % 2 == 0 or n % 3 == 0:
        return False
    i = 5
    while i * i <= n:
        if n % i == 0 or n % (i + 2) == 0:
            return False
        i += 6
    return True


def count_primes(start: int, end: int) -> int:
    """Count primes in range [start, end)."""
    return sum(1 for n in range(start, end) if is_prime(n))


LIMIT = 2_000_000

# --- Sequential ---
start = time.perf_counter()
total = count_primes(2, LIMIT)
print(f"Sequential: {total} primes in {time.perf_counter() - start:.2f}s")


# --- Multiprocessing ---
start = time.perf_counter()
num_cores = multiprocessing.cpu_count()
chunk_size = LIMIT // num_cores
ranges = [(i * chunk_size, (i + 1) * chunk_size) for i in range(num_cores)]

with multiprocessing.Pool(num_cores) as pool:
    results = pool.starmap(count_primes, ranges)
    total = sum(results)

print(f"Parallel ({num_cores} cores): {total} primes in {time.perf_counter() - start:.2f}s")
⚡ Benchmark example (8-core M2):
Sequential: 148933 primes in 3.21s
Parallel (8 cores): 148933 primes in 0.52s
6.2x speedup — that's real parallelism (not possible with threads due to GIL).

ProcessPoolExecutor (preferred)

from concurrent.futures import ProcessPoolExecutor
import time


def heavy_compute(n: int) -> dict:
    """Simulate CPU-bound work: compute Fibonacci iteratively."""
    a, b = 0, 1
    for _ in range(n):
        a, b = b, a + b
    return {"n": n, "digits": len(str(a))}


inputs = [500_000, 600_000, 700_000, 800_000, 900_000, 1_000_000]

# --- Sequential ---
start = time.perf_counter()
results_seq = [heavy_compute(n) for n in inputs]
print(f"Sequential: {time.perf_counter() - start:.2f}s")

# --- Parallel ---
start = time.perf_counter()
with ProcessPoolExecutor() as pool:
    results_par = list(pool.map(heavy_compute, inputs))
print(f"Parallel: {time.perf_counter() - start:.2f}s")

for r in results_par:
    print(f"  fib({r['n']}): {r['digits']} digits")

Sharing data between processes

import multiprocessing
from multiprocessing import Value, Array, Manager


# --- Shared memory (fast, limited types) ---
def worker_shared(counter, results, index, value):
    """Worker with shared memory."""
    with counter.get_lock():
        counter.value += 1
    results[index] = value * 2


counter = Value('i', 0)      # shared int
results = Array('d', 10)      # shared array of doubles

processes = []
for i in range(10):
    p = multiprocessing.Process(
        target=worker_shared, args=(counter, results, i, float(i))
    )
    p.start()
    processes.append(p)

for p in processes:
    p.join()

print(f"Counter: {counter.value}")      # 10
print(f"Results: {list(results)}")       # [0.0, 2.0, 4.0, ...]


# --- Manager (flexible, slower — supports dicts, lists, Queues) ---
def worker_manager(shared_dict, key, value):
    shared_dict[key] = value ** 2


with Manager() as manager:
    d = manager.dict()
    processes = [
        multiprocessing.Process(target=worker_manager, args=(d, i, i))
        for i in range(8)
    ]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"Manager dict: {dict(d)}")  # {0: 0, 1: 1, 2: 4, 3: 9, ...}

Combining Threading + Multiprocessing

Real-world apps often need both. Example: scrape URLs (I/O) then process the data (CPU).

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
import hashlib
import time


def download(url: str) -> bytes:
    """I/O-bound: download content."""
    resp = requests.get(url, timeout=10)
    return resp.content


def process_content(data: bytes) -> dict:
    """CPU-bound: hash and analyze content."""
    h = hashlib.sha256(data).hexdigest()
    # Simulate heavy processing
    words = len(data.decode("utf-8", errors="ignore").split())
    return {"hash": h[:12], "size": len(data), "words": words}


urls = [
    "https://jsonplaceholder.typicode.com/posts",
    "https://jsonplaceholder.typicode.com/comments",
    "https://jsonplaceholder.typicode.com/users",
    "https://httpbin.org/get",
]

# Step 1: Download concurrently (threads — I/O bound)
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as tpool:
    raw_data = list(tpool.map(download, urls))

# Step 2: Process concurrently (processes — CPU bound)
with ProcessPoolExecutor() as ppool:
    results = list(ppool.map(process_content, raw_data))

elapsed = time.perf_counter() - start
for url, result in zip(urls, results):
    print(f"{url.split('/')[-1]}: {result}")
print(f"Total: {elapsed:.2f}s")

The Decision Framework

Workload TypeBest ApproachWhy
API calls, web scrapingasyncio or ThreadPoolI/O-bound, GIL released during network wait
File reading/writingThreadPoolI/O-bound, simple API
Image processing, MLProcessPoolCPU-bound, bypasses GIL
Number crunchingProcessPool (or NumPy)CPU-bound; NumPy releases GIL internally
High-concurrency serverasyncioThousands of connections, minimal overhead
Mixed I/O + CPUThreadPool → ProcessPoolDownload in threads, process in processes
Simple parallel tasksconcurrent.futuresEasiest API, swap Thread/Process with one line

Quick decision tree

# Is your code waiting or computing?
#
# WAITING (I/O-bound)
# ├── Many connections (1000+)? → asyncio
# ├── Few connections, simple? → ThreadPoolExecutor
# └── Legacy sync library? → ThreadPoolExecutor
#
# COMPUTING (CPU-bound)
# ├── NumPy/pandas ops? → Already parallel (release GIL)
# ├── Pure Python math? → ProcessPoolExecutor
# └── Need shared state? → multiprocessing.Manager or Queue

Common Pitfalls

1. Race conditions

import threading

# ❌ BAD — race condition
counter = 0
def increment():
    global counter
    for _ in range(100_000):
        counter += 1  # NOT atomic!

threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 400000, Got: {counter}")  # Usually less!


# ✅ GOOD — use a lock
counter = 0
lock = threading.Lock()
def safe_increment():
    global counter
    for _ in range(100_000):
        with lock:
            counter += 1

threads = [threading.Thread(target=safe_increment) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 400000, Got: {counter}")  # Always 400000

2. Deadlocks

# ❌ BAD — potential deadlock (circular lock dependency)
lock_a = threading.Lock()
lock_b = threading.Lock()

def worker1():
    with lock_a:
        time.sleep(0.01)
        with lock_b:  # waits for worker2 to release lock_b
            print("Worker 1 done")

def worker2():
    with lock_b:
        time.sleep(0.01)
        with lock_a:  # waits for worker1 to release lock_a → DEADLOCK
            print("Worker 2 done")


# ✅ GOOD — always acquire locks in the same order
def worker1_fixed():
    with lock_a:
        with lock_b:
            print("Worker 1 done")

def worker2_fixed():
    with lock_a:  # same order as worker1
        with lock_b:
            print("Worker 2 done")

3. Pickling errors in multiprocessing

# ❌ BAD — lambdas and local functions can't be pickled
from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as pool:
    # This fails: lambda isn't picklable
    # results = list(pool.map(lambda x: x**2, range(10)))
    pass


# ✅ GOOD — use module-level functions
def square(x):
    return x ** 2

with ProcessPoolExecutor() as pool:
    results = list(pool.map(square, range(10)))
    print(results)  # [0, 1, 4, 9, 16, ...]

4. Process startup overhead

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time

# Small tasks — process overhead dominates
def tiny_task(x):
    return x + 1

data = list(range(100))

start = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as pool:
    list(pool.map(tiny_task, data))
print(f"ProcessPool: {time.perf_counter() - start:.3f}s")  # ~0.5s (overhead!)

start = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(tiny_task, data))
print(f"ThreadPool: {time.perf_counter() - start:.3f}s")  # ~0.002s

start = time.perf_counter()
list(map(tiny_task, data))
print(f"Sequential: {time.perf_counter() - start:.6f}s")  # ~0.00001s

# Lesson: only use ProcessPool when tasks take >10ms each

Production Patterns

Graceful shutdown

import signal
import threading
from concurrent.futures import ThreadPoolExecutor

shutdown_event = threading.Event()

def signal_handler(signum, frame):
    print("\nShutdown requested...")
    shutdown_event.set()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)


def worker(task_id: int):
    """Worker that checks for shutdown."""
    for step in range(10):
        if shutdown_event.is_set():
            print(f"Task {task_id}: stopping at step {step}")
            return
        time.sleep(0.5)  # simulate work
    print(f"Task {task_id}: complete")


with ThreadPoolExecutor(max_workers=3) as pool:
    futures = [pool.submit(worker, i) for i in range(5)]
    # Pool waits for running tasks, cancels pending ones on shutdown

Rate-limited thread pool

import threading
import time
from concurrent.futures import ThreadPoolExecutor


class RateLimiter:
    """Token bucket rate limiter for thread pools."""

    def __init__(self, calls_per_second: float):
        self._interval = 1.0 / calls_per_second
        self._lock = threading.Lock()
        self._last_call = 0.0

    def acquire(self):
        with self._lock:
            now = time.monotonic()
            wait = self._last_call + self._interval - now
            if wait > 0:
                time.sleep(wait)
            self._last_call = time.monotonic()


limiter = RateLimiter(calls_per_second=5)  # max 5 req/s

def rate_limited_fetch(url: str) -> int:
    limiter.acquire()
    resp = requests.get(url, timeout=10)
    return resp.status_code


with ThreadPoolExecutor(max_workers=10) as pool:
    # 10 threads but only 5 requests/second
    results = list(pool.map(rate_limited_fetch, urls))

Progress tracking

from concurrent.futures import ThreadPoolExecutor, as_completed
import time


def process_item(item: int) -> dict:
    time.sleep(0.1)  # simulate work
    return {"item": item, "result": item * 2}


items = list(range(50))
results = []

with ThreadPoolExecutor(max_workers=8) as pool:
    futures = {pool.submit(process_item, i): i for i in items}
    done_count = 0

    for future in as_completed(futures):
        done_count += 1
        result = future.result()
        results.append(result)

        # Progress bar
        pct = done_count / len(items) * 100
        bar = "█" * int(pct // 2) + "░" * (50 - int(pct // 2))
        print(f"\r[{bar}] {pct:.0f}% ({done_count}/{len(items)})", end="", flush=True)

print(f"\nDone! Processed {len(results)} items.")

🚀 Production-ready concurrency patterns, API integrators, and automation scripts

Get the AI Agent Toolkit →

Related Articles

Need high-performance concurrent Python code? I build parallel data pipelines, scrapers, and API integrations. Reach out on Telegram →