Tools like Airflow, Prefect, and Dagster are great — but sometimes you need a lightweight pipeline that runs anywhere without infrastructure overhead. A well-structured Python script can handle millions of records, retry on failure, and run on a $5 VPS or a cron job.
This guide builds a production-ready ETL pipeline that:
Every pipeline follows the same pattern:
# The universal pipeline pattern
#
# [Source A] ──┐
# [Source B] ──┼──→ [Extract] → [Transform] → [Load] → [Destination]
# [Source C] ──┘ ↓ ↓ ↓
# [Error Log] [Validation] [Metrics]
from dataclasses import dataclass, field
from datetime import datetime
from typing import Iterator, Any
from pathlib import Path
import json
@dataclass
class PipelineResult:
"""Track what happened during a pipeline run."""
started_at: datetime = field(default_factory=datetime.now)
records_extracted: int = 0
records_transformed: int = 0
records_loaded: int = 0
records_failed: int = 0
errors: list[str] = field(default_factory=list)
duration_seconds: float = 0.0
@property
def success_rate(self) -> float:
total = self.records_transformed + self.records_failed
return (self.records_transformed / total * 100) if total else 0
def summary(self) -> str:
return (
f"Pipeline completed in {self.duration_seconds:.1f}s | "
f"Extracted: {self.records_extracted} | "
f"Transformed: {self.records_transformed} | "
f"Loaded: {self.records_loaded} | "
f"Failed: {self.records_failed} | "
f"Success: {self.success_rate:.1f}%"
)
Extractors are generators that yield raw records. This keeps memory usage constant regardless of data size:
import csv
import sqlite3
from urllib.request import urlopen, Request
from typing import Iterator
def extract_csv(path: str, encoding: str = "utf-8") -> Iterator[dict]:
"""Extract records from a CSV file."""
with open(path, "r", encoding=encoding) as f:
reader = csv.DictReader(f)
for row in reader:
yield dict(row) # Convert OrderedDict to regular dict
def extract_json_api(
url: str,
headers: dict | None = None,
params: dict | None = None,
paginate_key: str | None = None,
next_url_key: str | None = None,
) -> Iterator[dict]:
"""Extract records from a JSON API with pagination support."""
import json
from urllib.parse import urlencode
current_url = url
if params:
current_url += "?" + urlencode(params)
while current_url:
req = Request(current_url, headers=headers or {})
with urlopen(req, timeout=30) as resp:
data = json.loads(resp.read())
# Handle different API response shapes
if isinstance(data, list):
yield from data
break
elif paginate_key and paginate_key in data:
yield from data[paginate_key]
current_url = data.get(next_url_key) if next_url_key else None
else:
yield data
break
def extract_sqlite(
db_path: str,
query: str,
params: tuple = (),
) -> Iterator[dict]:
"""Extract records from a SQLite database."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.execute(query, params)
for row in cursor:
yield dict(row)
conn.close()
def extract_multiple(*extractors) -> Iterator[dict]:
"""Merge multiple extractors into one stream."""
for extractor in extractors:
yield from extractor
# Usage examples:
# records = extract_csv("data/users.csv")
# records = extract_json_api("https://api.example.com/users", paginate_key="results", next_url_key="next")
# records = extract_sqlite("app.db", "SELECT * FROM users WHERE active = ?", (1,))
Transformations are composable functions. Chain them together:
from datetime import datetime
import re
import hashlib
def transform_clean_strings(record: dict) -> dict:
"""Strip whitespace and normalize empty strings to None."""
return {
k: (v.strip() if isinstance(v, str) else v) or None
for k, v in record.items()
}
def transform_normalize_email(record: dict) -> dict:
"""Lowercase and validate email addresses."""
email = record.get("email", "")
if email:
email = email.lower().strip()
if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", email):
record["_email_valid"] = False
else:
record["email"] = email
record["_email_valid"] = True
return record
def transform_add_timestamps(record: dict) -> dict:
"""Add processing metadata."""
record["_processed_at"] = datetime.now().isoformat()
record["_record_hash"] = hashlib.md5(
json.dumps(record, sort_keys=True, default=str).encode()
).hexdigest()[:12]
return record
def transform_parse_dates(record: dict, date_fields: list[str]) -> dict:
"""Parse date strings into ISO format."""
for field_name in date_fields:
value = record.get(field_name)
if not value:
continue
for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%d.%m.%Y", "%Y-%m-%dT%H:%M:%S"):
try:
parsed = datetime.strptime(str(value), fmt)
record[field_name] = parsed.isoformat()
break
except ValueError:
continue
return record
def transform_deduplicate(
records: Iterator[dict],
key_field: str,
) -> Iterator[dict]:
"""Remove duplicate records based on a key field."""
seen = set()
for record in records:
key = record.get(key_field)
if key and key not in seen:
seen.add(key)
yield record
def transform_filter(
records: Iterator[dict],
condition: callable,
) -> Iterator[dict]:
"""Filter records by a condition function."""
for record in records:
if condition(record):
yield record
# Composable pipeline:
def apply_transforms(record: dict) -> dict:
"""Apply all record-level transforms in order."""
record = transform_clean_strings(record)
record = transform_normalize_email(record)
record = transform_add_timestamps(record)
record = transform_parse_dates(record, ["created_at", "updated_at"])
return record
Loaders accept an iterator of records and write them to a destination:
import csv
import json
import sqlite3
from pathlib import Path
def load_csv(records: Iterator[dict], path: str, mode: str = "w") -> int:
"""Load records to a CSV file. Returns count of records written."""
count = 0
fieldnames = None
with open(path, mode, newline="", encoding="utf-8") as f:
writer = None
for record in records:
if writer is None:
fieldnames = list(record.keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
if mode == "w":
writer.writeheader()
writer.writerow({k: record.get(k, "") for k in fieldnames})
count += 1
return count
def load_jsonl(records: Iterator[dict], path: str, mode: str = "w") -> int:
"""Load records to a JSONL (JSON Lines) file."""
count = 0
with open(path, mode, encoding="utf-8") as f:
for record in records:
f.write(json.dumps(record, default=str, ensure_ascii=False) + "\n")
count += 1
return count
def load_sqlite(
records: Iterator[dict],
db_path: str,
table: str,
if_exists: str = "append", # "append" | "replace"
) -> int:
"""Load records to a SQLite table. Auto-creates table from first record."""
conn = sqlite3.connect(db_path)
count = 0
columns = None
for record in records:
if columns is None:
columns = list(record.keys())
col_defs = ", ".join(f'"{c}" TEXT' for c in columns)
if if_exists == "replace":
conn.execute(f'DROP TABLE IF EXISTS "{table}"')
conn.execute(
f'CREATE TABLE IF NOT EXISTS "{table}" ({col_defs})'
)
placeholders = ", ".join("?" for _ in columns)
values = [str(record.get(c, "")) for c in columns]
conn.execute(
f'INSERT INTO "{table}" ({", ".join(f"{c!r}" for c in columns)}) '
f"VALUES ({placeholders})",
values,
)
count += 1
# Commit in batches for performance
if count % 1000 == 0:
conn.commit()
conn.commit()
conn.close()
return count
def load_batched(
records: Iterator[dict],
batch_size: int,
loader: callable,
) -> int:
"""Process records in batches for bulk operations."""
batch = []
total = 0
for record in records:
batch.append(record)
if len(batch) >= batch_size:
total += loader(iter(batch))
batch = []
if batch:
total += loader(iter(batch))
return total
The pipeline class ties extract, transform, and load together:
import time
import logging
logger = logging.getLogger(__name__)
class Pipeline:
"""A composable ETL pipeline."""
def __init__(self, name: str):
self.name = name
self.extractors: list[callable] = []
self.transforms: list[callable] = []
self.loaders: list[callable] = []
self.result = PipelineResult()
def extract_from(self, *extractors):
"""Add data sources."""
self.extractors.extend(extractors)
return self # Enable chaining
def transform_with(self, *transforms):
"""Add transformation functions."""
self.transforms.extend(transforms)
return self
def load_to(self, *loaders):
"""Add destinations."""
self.loaders.extend(loaders)
return self
def run(self) -> PipelineResult:
"""Execute the pipeline."""
start = time.time()
logger.info(f"Pipeline '{self.name}' starting...")
# Extract
raw_records = extract_multiple(
*(ext() if callable(ext) else ext for ext in self.extractors)
)
# Transform
transformed = self._apply_transforms(raw_records)
# Collect for multiple loaders
records_buffer = list(transformed)
self.result.records_transformed = len(records_buffer)
# Load to all destinations
for loader in self.loaders:
try:
count = loader(iter(records_buffer))
self.result.records_loaded += count
logger.info(f"Loaded {count} records to {loader.__name__}")
except Exception as e:
logger.error(f"Loader failed: {e}")
self.result.errors.append(f"Load error: {e}")
self.result.duration_seconds = time.time() - start
logger.info(self.result.summary())
return self.result
def _apply_transforms(self, records: Iterator[dict]) -> Iterator[dict]:
"""Apply all transforms to the record stream."""
for record in records:
self.result.records_extracted += 1
try:
for transform in self.transforms:
record = transform(record)
if record is None:
break # Transform filtered out this record
if record is not None:
yield record
except Exception as e:
self.result.records_failed += 1
self.result.errors.append(
f"Transform error on record {self.result.records_extracted}: {e}"
)
logger.warning(f"Record failed transform: {e}")
# Usage:
# result = (
# Pipeline("daily_users")
# .extract_from(lambda: extract_json_api("https://api.example.com/users"))
# .transform_with(transform_clean_strings, transform_normalize_email)
# .load_to(lambda r: load_csv(r, "output/users.csv"))
# .run()
# )
Production pipelines need retry logic and dead-letter queues:
import time
import functools
def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
"""Decorator that retries a function on failure with exponential backoff."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
current_delay = delay
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts:
logger.warning(
f"{func.__name__} attempt {attempt}/{max_attempts} "
f"failed: {e}. Retrying in {current_delay:.1f}s..."
)
time.sleep(current_delay)
current_delay *= backoff
else:
logger.error(
f"{func.__name__} failed after {max_attempts} attempts: {e}"
)
raise last_exception
return wrapper
return decorator
class DeadLetterQueue:
"""Store failed records for later inspection and reprocessing."""
def __init__(self, path: str = "dead_letters.jsonl"):
self.path = Path(path)
self.count = 0
def put(self, record: dict, error: str):
"""Add a failed record to the queue."""
entry = {
"record": record,
"error": str(error),
"timestamp": datetime.now().isoformat(),
}
with open(self.path, "a") as f:
f.write(json.dumps(entry, default=str) + "\n")
self.count += 1
def reprocess(self, transform_fn: callable) -> Iterator[dict]:
"""Yield records from the DLQ for reprocessing."""
if not self.path.exists():
return
with open(self.path) as f:
for line in f:
entry = json.loads(line)
try:
result = transform_fn(entry["record"])
if result:
yield result
except Exception:
continue # Still failing, leave it
def clear(self):
"""Empty the dead letter queue."""
if self.path.exists():
self.path.unlink()
self.count = 0
# Apply retry to extractors:
@retry(max_attempts=3, delay=2.0)
def extract_api_with_retry(url: str) -> list[dict]:
"""Fetch API data with automatic retry."""
req = Request(url)
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
Structured logging makes debugging easy:
import logging
import sys
def setup_logging(
level: str = "INFO",
log_file: str | None = None,
json_format: bool = False,
) -> logging.Logger:
"""Configure logging for the pipeline."""
logger = logging.getLogger("pipeline")
logger.setLevel(getattr(logging, level.upper()))
# Console handler
console = logging.StreamHandler(sys.stdout)
if json_format:
fmt = '{"time":"%(asctime)s","level":"%(levelname)s","msg":"%(message)s"}'
else:
fmt = "%(asctime)s [%(levelname)s] %(message)s"
console.setFormatter(logging.Formatter(fmt, datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(console)
# File handler (optional)
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(
logging.Formatter(fmt, datefmt="%Y-%m-%d %H:%M:%S")
)
logger.addHandler(file_handler)
return logger
# Monitoring hook — call at the end of each run
def send_metrics(result: PipelineResult, webhook_url: str | None = None):
"""Send pipeline metrics to a monitoring endpoint."""
metrics = {
"pipeline_duration_seconds": result.duration_seconds,
"records_extracted": result.records_extracted,
"records_loaded": result.records_loaded,
"records_failed": result.records_failed,
"success_rate": result.success_rate,
"error_count": len(result.errors),
"timestamp": datetime.now().isoformat(),
}
logger.info(f"Metrics: {json.dumps(metrics)}")
if webhook_url:
req = Request(
webhook_url,
data=json.dumps(metrics).encode(),
headers={"Content-Type": "application/json"},
method="POST",
)
try:
urlopen(req, timeout=10)
except Exception as e:
logger.warning(f"Failed to send metrics: {e}")
Run your pipeline on a schedule without external tools:
import argparse
import schedule
import time as time_module
def create_cli():
"""Create CLI for the pipeline."""
parser = argparse.ArgumentParser(description="Data Pipeline CLI")
parser.add_argument(
"action",
choices=["run", "schedule", "backfill", "status"],
help="Pipeline action",
)
parser.add_argument("--interval", type=int, default=60, help="Minutes between runs")
parser.add_argument("--source", required=False, help="Data source path/URL")
parser.add_argument("--output", default="output/", help="Output directory")
parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING"])
parser.add_argument("--dry-run", action="store_true", help="Show what would be done")
return parser
def main():
parser = create_cli()
args = parser.parse_args()
setup_logging(level=args.log_level)
if args.action == "run":
result = run_pipeline(args)
print(result.summary())
sys.exit(0 if result.records_failed == 0 else 1)
elif args.action == "schedule":
logger.info(f"Scheduling pipeline every {args.interval} minutes")
schedule.every(args.interval).minutes.do(run_pipeline, args)
while True:
schedule.run_pending()
time_module.sleep(1)
elif args.action == "backfill":
logger.info("Running backfill...")
# Process historical data
result = run_pipeline(args, backfill=True)
print(result.summary())
elif args.action == "status":
show_pipeline_status(args.output)
# Crontab alternative (no external deps):
# */30 * * * * cd /path/to/pipeline && python pipeline.py run --source data/feed.csv
# Systemd timer (Linux):
# [Timer]
# OnCalendar=*:0/30
# Persistent=true
# launchd (macOS):
# RunAtLoad: true
# StartInterval: 1800
Here's a real-world pipeline that syncs user data from an API to a local database:
#!/usr/bin/env python3
"""
User Data Pipeline — Sync users from API to SQLite.
Usage:
python pipeline.py run
python pipeline.py run --dry-run
python pipeline.py schedule --interval 30
python pipeline.py backfill --source data/historical_users.csv
"""
import csv
import hashlib
import json
import logging
import sqlite3
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Iterator
from urllib.request import urlopen, Request
logger = logging.getLogger("pipeline")
@dataclass
class PipelineResult:
started_at: datetime = field(default_factory=datetime.now)
extracted: int = 0
transformed: int = 0
loaded: int = 0
failed: int = 0
errors: list = field(default_factory=list)
duration: float = 0.0
def summary(self) -> str:
rate = (self.transformed / (self.transformed + self.failed) * 100
if (self.transformed + self.failed) else 0)
return (f"Done in {self.duration:.1f}s — "
f"E:{self.extracted} T:{self.transformed} "
f"L:{self.loaded} F:{self.failed} ({rate:.0f}% ok)")
# ── Extract ──────────────────────────────────────
def extract_api(base_url: str, per_page: int = 100) -> Iterator[dict]:
"""Paginated API extraction."""
page = 1
while True:
url = f"{base_url}?page={page}&per_page={per_page}"
try:
with urlopen(Request(url), timeout=30) as resp:
data = json.loads(resp.read())
except Exception as e:
logger.error(f"API error page {page}: {e}")
break
items = data if isinstance(data, list) else data.get("data", [])
if not items:
break
yield from items
page += 1
def extract_csv_file(path: str) -> Iterator[dict]:
with open(path, encoding="utf-8") as f:
yield from csv.DictReader(f)
# ── Transform ────────────────────────────────────
def clean(record: dict) -> dict:
"""Clean and validate a user record."""
# Normalize strings
for k, v in record.items():
if isinstance(v, str):
record[k] = v.strip() or None
# Validate email
email = record.get("email", "") or ""
record["email"] = email.lower()
record["email_valid"] = bool(
email and "@" in email and "." in email.split("@")[-1]
)
# Generate stable ID for dedup
key = f"{record.get('email', '')}:{record.get('name', '')}"
record["record_hash"] = hashlib.sha256(key.encode()).hexdigest()[:16]
record["synced_at"] = datetime.now().isoformat()
return record
def deduplicate(records: Iterator[dict]) -> Iterator[dict]:
"""Remove duplicates by record_hash."""
seen = set()
for r in records:
h = r.get("record_hash")
if h and h not in seen:
seen.add(h)
yield r
# ── Load ─────────────────────────────────────────
def load_sqlite(records: Iterator[dict], db_path: str, table: str) -> int:
conn = sqlite3.connect(db_path)
count = 0
cols = None
for record in records:
if cols is None:
cols = list(record.keys())
col_sql = ", ".join(f'"{c}" TEXT' for c in cols)
conn.execute(f'CREATE TABLE IF NOT EXISTS "{table}" ({col_sql})')
placeholders = ",".join("?" * len(cols))
values = [str(record.get(c, "")) if record.get(c) is not None else None
for c in cols]
conn.execute(f'INSERT INTO "{table}" VALUES ({placeholders})', values)
count += 1
if count % 500 == 0:
conn.commit()
conn.commit()
conn.close()
return count
def load_jsonl(records: Iterator[dict], path: str) -> int:
count = 0
with open(path, "w") as f:
for r in records:
f.write(json.dumps(r, default=str) + "\n")
count += 1
return count
# ── Pipeline ─────────────────────────────────────
def run(source_url: str = "https://api.example.com/users",
output_dir: str = "output") -> PipelineResult:
"""Run the full ETL pipeline."""
result = PipelineResult()
start = time.time()
Path(output_dir).mkdir(parents=True, exist_ok=True)
db_path = f"{output_dir}/users.db"
jsonl_path = f"{output_dir}/users.jsonl"
try:
# Extract
raw = extract_api(source_url)
# Transform
transformed = []
for record in raw:
result.extracted += 1
try:
cleaned = clean(record)
transformed.append(cleaned)
except Exception as e:
result.failed += 1
result.errors.append(str(e))
# Deduplicate
unique = list(deduplicate(iter(transformed)))
result.transformed = len(unique)
# Load to both destinations
result.loaded += load_sqlite(iter(unique), db_path, "users")
load_jsonl(iter(unique), jsonl_path) # backup copy
logger.info(f"Loaded to {db_path} and {jsonl_path}")
except Exception as e:
logger.error(f"Pipeline failed: {e}")
result.errors.append(str(e))
result.duration = time.time() - start
logger.info(result.summary())
return result
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
if len(sys.argv) > 1 and sys.argv[1] == "--dry-run":
print("Dry run — would extract from API, transform, load to SQLite + JSONL")
sys.exit(0)
result = run()
print(result.summary())
sys.exit(0 if not result.errors else 1)
Data pipelines, scrapers, bots, automation workflows, AI integrations — tested and ready to deploy.
Get the Full Toolkit — $19