Build a Data Pipeline in Python — ETL from Scratch

By Kristy · March 2026 · 14 min read

Table of Contents

Why Build Your Own Pipeline?

Tools like Airflow, Prefect, and Dagster are great — but sometimes you need a lightweight pipeline that runs anywhere without infrastructure overhead. A well-structured Python script can handle millions of records, retry on failure, and run on a $5 VPS or a cron job.

This guide builds a production-ready ETL pipeline that:

Pipeline Architecture

Every pipeline follows the same pattern:

# The universal pipeline pattern
#
#  [Source A] ──┐
#  [Source B] ──┼──→ [Extract] → [Transform] → [Load] → [Destination]
#  [Source C] ──┘         ↓           ↓           ↓
#                    [Error Log] [Validation] [Metrics]

from dataclasses import dataclass, field
from datetime import datetime
from typing import Iterator, Any
from pathlib import Path
import json


@dataclass
class PipelineResult:
    """Track what happened during a pipeline run."""
    started_at: datetime = field(default_factory=datetime.now)
    records_extracted: int = 0
    records_transformed: int = 0
    records_loaded: int = 0
    records_failed: int = 0
    errors: list[str] = field(default_factory=list)
    duration_seconds: float = 0.0

    @property
    def success_rate(self) -> float:
        total = self.records_transformed + self.records_failed
        return (self.records_transformed / total * 100) if total else 0

    def summary(self) -> str:
        return (
            f"Pipeline completed in {self.duration_seconds:.1f}s | "
            f"Extracted: {self.records_extracted} | "
            f"Transformed: {self.records_transformed} | "
            f"Loaded: {self.records_loaded} | "
            f"Failed: {self.records_failed} | "
            f"Success: {self.success_rate:.1f}%"
        )

Step 1: Extract — Pull Data from Anywhere

Extractors are generators that yield raw records. This keeps memory usage constant regardless of data size:

import csv
import sqlite3
from urllib.request import urlopen, Request
from typing import Iterator


def extract_csv(path: str, encoding: str = "utf-8") -> Iterator[dict]:
    """Extract records from a CSV file."""
    with open(path, "r", encoding=encoding) as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield dict(row)  # Convert OrderedDict to regular dict


def extract_json_api(
    url: str,
    headers: dict | None = None,
    params: dict | None = None,
    paginate_key: str | None = None,
    next_url_key: str | None = None,
) -> Iterator[dict]:
    """Extract records from a JSON API with pagination support."""
    import json
    from urllib.parse import urlencode

    current_url = url
    if params:
        current_url += "?" + urlencode(params)

    while current_url:
        req = Request(current_url, headers=headers or {})
        with urlopen(req, timeout=30) as resp:
            data = json.loads(resp.read())

        # Handle different API response shapes
        if isinstance(data, list):
            yield from data
            break
        elif paginate_key and paginate_key in data:
            yield from data[paginate_key]
            current_url = data.get(next_url_key) if next_url_key else None
        else:
            yield data
            break


def extract_sqlite(
    db_path: str,
    query: str,
    params: tuple = (),
) -> Iterator[dict]:
    """Extract records from a SQLite database."""
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row
    cursor = conn.execute(query, params)

    for row in cursor:
        yield dict(row)

    conn.close()


def extract_multiple(*extractors) -> Iterator[dict]:
    """Merge multiple extractors into one stream."""
    for extractor in extractors:
        yield from extractor


# Usage examples:
# records = extract_csv("data/users.csv")
# records = extract_json_api("https://api.example.com/users", paginate_key="results", next_url_key="next")
# records = extract_sqlite("app.db", "SELECT * FROM users WHERE active = ?", (1,))
💡 Why generators? A generator processes one record at a time. You can ETL a 10GB file with 50MB of RAM. If you use list() to collect everything, you lose this advantage.

Step 2: Transform — Clean, Validate, Enrich

Transformations are composable functions. Chain them together:

from datetime import datetime
import re
import hashlib


def transform_clean_strings(record: dict) -> dict:
    """Strip whitespace and normalize empty strings to None."""
    return {
        k: (v.strip() if isinstance(v, str) else v) or None
        for k, v in record.items()
    }


def transform_normalize_email(record: dict) -> dict:
    """Lowercase and validate email addresses."""
    email = record.get("email", "")
    if email:
        email = email.lower().strip()
        if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", email):
            record["_email_valid"] = False
        else:
            record["email"] = email
            record["_email_valid"] = True
    return record


def transform_add_timestamps(record: dict) -> dict:
    """Add processing metadata."""
    record["_processed_at"] = datetime.now().isoformat()
    record["_record_hash"] = hashlib.md5(
        json.dumps(record, sort_keys=True, default=str).encode()
    ).hexdigest()[:12]
    return record


def transform_parse_dates(record: dict, date_fields: list[str]) -> dict:
    """Parse date strings into ISO format."""
    for field_name in date_fields:
        value = record.get(field_name)
        if not value:
            continue
        for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%d.%m.%Y", "%Y-%m-%dT%H:%M:%S"):
            try:
                parsed = datetime.strptime(str(value), fmt)
                record[field_name] = parsed.isoformat()
                break
            except ValueError:
                continue
    return record


def transform_deduplicate(
    records: Iterator[dict],
    key_field: str,
) -> Iterator[dict]:
    """Remove duplicate records based on a key field."""
    seen = set()
    for record in records:
        key = record.get(key_field)
        if key and key not in seen:
            seen.add(key)
            yield record


def transform_filter(
    records: Iterator[dict],
    condition: callable,
) -> Iterator[dict]:
    """Filter records by a condition function."""
    for record in records:
        if condition(record):
            yield record


# Composable pipeline:
def apply_transforms(record: dict) -> dict:
    """Apply all record-level transforms in order."""
    record = transform_clean_strings(record)
    record = transform_normalize_email(record)
    record = transform_add_timestamps(record)
    record = transform_parse_dates(record, ["created_at", "updated_at"])
    return record

Step 3: Load — Store Results

Loaders accept an iterator of records and write them to a destination:

import csv
import json
import sqlite3
from pathlib import Path


def load_csv(records: Iterator[dict], path: str, mode: str = "w") -> int:
    """Load records to a CSV file. Returns count of records written."""
    count = 0
    fieldnames = None

    with open(path, mode, newline="", encoding="utf-8") as f:
        writer = None
        for record in records:
            if writer is None:
                fieldnames = list(record.keys())
                writer = csv.DictWriter(f, fieldnames=fieldnames)
                if mode == "w":
                    writer.writeheader()
            writer.writerow({k: record.get(k, "") for k in fieldnames})
            count += 1

    return count


def load_jsonl(records: Iterator[dict], path: str, mode: str = "w") -> int:
    """Load records to a JSONL (JSON Lines) file."""
    count = 0
    with open(path, mode, encoding="utf-8") as f:
        for record in records:
            f.write(json.dumps(record, default=str, ensure_ascii=False) + "\n")
            count += 1
    return count


def load_sqlite(
    records: Iterator[dict],
    db_path: str,
    table: str,
    if_exists: str = "append",  # "append" | "replace"
) -> int:
    """Load records to a SQLite table. Auto-creates table from first record."""
    conn = sqlite3.connect(db_path)
    count = 0
    columns = None

    for record in records:
        if columns is None:
            columns = list(record.keys())
            col_defs = ", ".join(f'"{c}" TEXT' for c in columns)

            if if_exists == "replace":
                conn.execute(f'DROP TABLE IF EXISTS "{table}"')

            conn.execute(
                f'CREATE TABLE IF NOT EXISTS "{table}" ({col_defs})'
            )

        placeholders = ", ".join("?" for _ in columns)
        values = [str(record.get(c, "")) for c in columns]
        conn.execute(
            f'INSERT INTO "{table}" ({", ".join(f"{c!r}" for c in columns)}) '
            f"VALUES ({placeholders})",
            values,
        )
        count += 1

        # Commit in batches for performance
        if count % 1000 == 0:
            conn.commit()

    conn.commit()
    conn.close()
    return count


def load_batched(
    records: Iterator[dict],
    batch_size: int,
    loader: callable,
) -> int:
    """Process records in batches for bulk operations."""
    batch = []
    total = 0

    for record in records:
        batch.append(record)
        if len(batch) >= batch_size:
            total += loader(iter(batch))
            batch = []

    if batch:
        total += loader(iter(batch))

    return total

Step 4: Orchestrate — Wire It Together

The pipeline class ties extract, transform, and load together:

import time
import logging

logger = logging.getLogger(__name__)


class Pipeline:
    """A composable ETL pipeline."""

    def __init__(self, name: str):
        self.name = name
        self.extractors: list[callable] = []
        self.transforms: list[callable] = []
        self.loaders: list[callable] = []
        self.result = PipelineResult()

    def extract_from(self, *extractors):
        """Add data sources."""
        self.extractors.extend(extractors)
        return self  # Enable chaining

    def transform_with(self, *transforms):
        """Add transformation functions."""
        self.transforms.extend(transforms)
        return self

    def load_to(self, *loaders):
        """Add destinations."""
        self.loaders.extend(loaders)
        return self

    def run(self) -> PipelineResult:
        """Execute the pipeline."""
        start = time.time()
        logger.info(f"Pipeline '{self.name}' starting...")

        # Extract
        raw_records = extract_multiple(
            *(ext() if callable(ext) else ext for ext in self.extractors)
        )

        # Transform
        transformed = self._apply_transforms(raw_records)

        # Collect for multiple loaders
        records_buffer = list(transformed)
        self.result.records_transformed = len(records_buffer)

        # Load to all destinations
        for loader in self.loaders:
            try:
                count = loader(iter(records_buffer))
                self.result.records_loaded += count
                logger.info(f"Loaded {count} records to {loader.__name__}")
            except Exception as e:
                logger.error(f"Loader failed: {e}")
                self.result.errors.append(f"Load error: {e}")

        self.result.duration_seconds = time.time() - start
        logger.info(self.result.summary())
        return self.result

    def _apply_transforms(self, records: Iterator[dict]) -> Iterator[dict]:
        """Apply all transforms to the record stream."""
        for record in records:
            self.result.records_extracted += 1
            try:
                for transform in self.transforms:
                    record = transform(record)
                    if record is None:
                        break  # Transform filtered out this record
                if record is not None:
                    yield record
            except Exception as e:
                self.result.records_failed += 1
                self.result.errors.append(
                    f"Transform error on record {self.result.records_extracted}: {e}"
                )
                logger.warning(f"Record failed transform: {e}")


# Usage:
# result = (
#     Pipeline("daily_users")
#     .extract_from(lambda: extract_json_api("https://api.example.com/users"))
#     .transform_with(transform_clean_strings, transform_normalize_email)
#     .load_to(lambda r: load_csv(r, "output/users.csv"))
#     .run()
# )

Step 5: Error Handling & Retries

Production pipelines need retry logic and dead-letter queues:

import time
import functools


def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
    """Decorator that retries a function on failure with exponential backoff."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            current_delay = delay

            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_attempts:
                        logger.warning(
                            f"{func.__name__} attempt {attempt}/{max_attempts} "
                            f"failed: {e}. Retrying in {current_delay:.1f}s..."
                        )
                        time.sleep(current_delay)
                        current_delay *= backoff
                    else:
                        logger.error(
                            f"{func.__name__} failed after {max_attempts} attempts: {e}"
                        )
            raise last_exception
        return wrapper
    return decorator


class DeadLetterQueue:
    """Store failed records for later inspection and reprocessing."""

    def __init__(self, path: str = "dead_letters.jsonl"):
        self.path = Path(path)
        self.count = 0

    def put(self, record: dict, error: str):
        """Add a failed record to the queue."""
        entry = {
            "record": record,
            "error": str(error),
            "timestamp": datetime.now().isoformat(),
        }
        with open(self.path, "a") as f:
            f.write(json.dumps(entry, default=str) + "\n")
        self.count += 1

    def reprocess(self, transform_fn: callable) -> Iterator[dict]:
        """Yield records from the DLQ for reprocessing."""
        if not self.path.exists():
            return

        with open(self.path) as f:
            for line in f:
                entry = json.loads(line)
                try:
                    result = transform_fn(entry["record"])
                    if result:
                        yield result
                except Exception:
                    continue  # Still failing, leave it

    def clear(self):
        """Empty the dead letter queue."""
        if self.path.exists():
            self.path.unlink()
        self.count = 0


# Apply retry to extractors:
@retry(max_attempts=3, delay=2.0)
def extract_api_with_retry(url: str) -> list[dict]:
    """Fetch API data with automatic retry."""
    req = Request(url)
    with urlopen(req, timeout=30) as resp:
        return json.loads(resp.read())

Step 6: Logging & Monitoring

Structured logging makes debugging easy:

import logging
import sys


def setup_logging(
    level: str = "INFO",
    log_file: str | None = None,
    json_format: bool = False,
) -> logging.Logger:
    """Configure logging for the pipeline."""
    logger = logging.getLogger("pipeline")
    logger.setLevel(getattr(logging, level.upper()))

    # Console handler
    console = logging.StreamHandler(sys.stdout)
    if json_format:
        fmt = '{"time":"%(asctime)s","level":"%(levelname)s","msg":"%(message)s"}'
    else:
        fmt = "%(asctime)s [%(levelname)s] %(message)s"
    console.setFormatter(logging.Formatter(fmt, datefmt="%Y-%m-%d %H:%M:%S"))
    logger.addHandler(console)

    # File handler (optional)
    if log_file:
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(
            logging.Formatter(fmt, datefmt="%Y-%m-%d %H:%M:%S")
        )
        logger.addHandler(file_handler)

    return logger


# Monitoring hook — call at the end of each run
def send_metrics(result: PipelineResult, webhook_url: str | None = None):
    """Send pipeline metrics to a monitoring endpoint."""
    metrics = {
        "pipeline_duration_seconds": result.duration_seconds,
        "records_extracted": result.records_extracted,
        "records_loaded": result.records_loaded,
        "records_failed": result.records_failed,
        "success_rate": result.success_rate,
        "error_count": len(result.errors),
        "timestamp": datetime.now().isoformat(),
    }

    logger.info(f"Metrics: {json.dumps(metrics)}")

    if webhook_url:
        req = Request(
            webhook_url,
            data=json.dumps(metrics).encode(),
            headers={"Content-Type": "application/json"},
            method="POST",
        )
        try:
            urlopen(req, timeout=10)
        except Exception as e:
            logger.warning(f"Failed to send metrics: {e}")

Step 7: Scheduling & Automation

Run your pipeline on a schedule without external tools:

import argparse
import schedule
import time as time_module


def create_cli():
    """Create CLI for the pipeline."""
    parser = argparse.ArgumentParser(description="Data Pipeline CLI")
    parser.add_argument(
        "action",
        choices=["run", "schedule", "backfill", "status"],
        help="Pipeline action",
    )
    parser.add_argument("--interval", type=int, default=60, help="Minutes between runs")
    parser.add_argument("--source", required=False, help="Data source path/URL")
    parser.add_argument("--output", default="output/", help="Output directory")
    parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING"])
    parser.add_argument("--dry-run", action="store_true", help="Show what would be done")
    return parser


def main():
    parser = create_cli()
    args = parser.parse_args()

    setup_logging(level=args.log_level)

    if args.action == "run":
        result = run_pipeline(args)
        print(result.summary())
        sys.exit(0 if result.records_failed == 0 else 1)

    elif args.action == "schedule":
        logger.info(f"Scheduling pipeline every {args.interval} minutes")
        schedule.every(args.interval).minutes.do(run_pipeline, args)
        while True:
            schedule.run_pending()
            time_module.sleep(1)

    elif args.action == "backfill":
        logger.info("Running backfill...")
        # Process historical data
        result = run_pipeline(args, backfill=True)
        print(result.summary())

    elif args.action == "status":
        show_pipeline_status(args.output)


# Crontab alternative (no external deps):
# */30 * * * * cd /path/to/pipeline && python pipeline.py run --source data/feed.csv

# Systemd timer (Linux):
# [Timer]
# OnCalendar=*:0/30
# Persistent=true

# launchd (macOS):
# RunAtLoad: true
# StartInterval: 1800

Complete Pipeline Example

Here's a real-world pipeline that syncs user data from an API to a local database:

#!/usr/bin/env python3
"""
User Data Pipeline — Sync users from API to SQLite.

Usage:
    python pipeline.py run
    python pipeline.py run --dry-run
    python pipeline.py schedule --interval 30
    python pipeline.py backfill --source data/historical_users.csv
"""

import csv
import hashlib
import json
import logging
import sqlite3
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Iterator
from urllib.request import urlopen, Request

logger = logging.getLogger("pipeline")


@dataclass
class PipelineResult:
    started_at: datetime = field(default_factory=datetime.now)
    extracted: int = 0
    transformed: int = 0
    loaded: int = 0
    failed: int = 0
    errors: list = field(default_factory=list)
    duration: float = 0.0

    def summary(self) -> str:
        rate = (self.transformed / (self.transformed + self.failed) * 100
                if (self.transformed + self.failed) else 0)
        return (f"Done in {self.duration:.1f}s — "
                f"E:{self.extracted} T:{self.transformed} "
                f"L:{self.loaded} F:{self.failed} ({rate:.0f}% ok)")


# ── Extract ──────────────────────────────────────

def extract_api(base_url: str, per_page: int = 100) -> Iterator[dict]:
    """Paginated API extraction."""
    page = 1
    while True:
        url = f"{base_url}?page={page}&per_page={per_page}"
        try:
            with urlopen(Request(url), timeout=30) as resp:
                data = json.loads(resp.read())
        except Exception as e:
            logger.error(f"API error page {page}: {e}")
            break

        items = data if isinstance(data, list) else data.get("data", [])
        if not items:
            break

        yield from items
        page += 1


def extract_csv_file(path: str) -> Iterator[dict]:
    with open(path, encoding="utf-8") as f:
        yield from csv.DictReader(f)


# ── Transform ────────────────────────────────────

def clean(record: dict) -> dict:
    """Clean and validate a user record."""
    # Normalize strings
    for k, v in record.items():
        if isinstance(v, str):
            record[k] = v.strip() or None

    # Validate email
    email = record.get("email", "") or ""
    record["email"] = email.lower()
    record["email_valid"] = bool(
        email and "@" in email and "." in email.split("@")[-1]
    )

    # Generate stable ID for dedup
    key = f"{record.get('email', '')}:{record.get('name', '')}"
    record["record_hash"] = hashlib.sha256(key.encode()).hexdigest()[:16]
    record["synced_at"] = datetime.now().isoformat()

    return record


def deduplicate(records: Iterator[dict]) -> Iterator[dict]:
    """Remove duplicates by record_hash."""
    seen = set()
    for r in records:
        h = r.get("record_hash")
        if h and h not in seen:
            seen.add(h)
            yield r


# ── Load ─────────────────────────────────────────

def load_sqlite(records: Iterator[dict], db_path: str, table: str) -> int:
    conn = sqlite3.connect(db_path)
    count = 0
    cols = None

    for record in records:
        if cols is None:
            cols = list(record.keys())
            col_sql = ", ".join(f'"{c}" TEXT' for c in cols)
            conn.execute(f'CREATE TABLE IF NOT EXISTS "{table}" ({col_sql})')

        placeholders = ",".join("?" * len(cols))
        values = [str(record.get(c, "")) if record.get(c) is not None else None
                  for c in cols]
        conn.execute(f'INSERT INTO "{table}" VALUES ({placeholders})', values)
        count += 1

        if count % 500 == 0:
            conn.commit()

    conn.commit()
    conn.close()
    return count


def load_jsonl(records: Iterator[dict], path: str) -> int:
    count = 0
    with open(path, "w") as f:
        for r in records:
            f.write(json.dumps(r, default=str) + "\n")
            count += 1
    return count


# ── Pipeline ─────────────────────────────────────

def run(source_url: str = "https://api.example.com/users",
        output_dir: str = "output") -> PipelineResult:
    """Run the full ETL pipeline."""
    result = PipelineResult()
    start = time.time()

    Path(output_dir).mkdir(parents=True, exist_ok=True)
    db_path = f"{output_dir}/users.db"
    jsonl_path = f"{output_dir}/users.jsonl"

    try:
        # Extract
        raw = extract_api(source_url)

        # Transform
        transformed = []
        for record in raw:
            result.extracted += 1
            try:
                cleaned = clean(record)
                transformed.append(cleaned)
            except Exception as e:
                result.failed += 1
                result.errors.append(str(e))

        # Deduplicate
        unique = list(deduplicate(iter(transformed)))
        result.transformed = len(unique)

        # Load to both destinations
        result.loaded += load_sqlite(iter(unique), db_path, "users")
        load_jsonl(iter(unique), jsonl_path)  # backup copy

        logger.info(f"Loaded to {db_path} and {jsonl_path}")

    except Exception as e:
        logger.error(f"Pipeline failed: {e}")
        result.errors.append(str(e))

    result.duration = time.time() - start
    logger.info(result.summary())
    return result


if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(message)s",
        datefmt="%H:%M:%S",
    )

    if len(sys.argv) > 1 and sys.argv[1] == "--dry-run":
        print("Dry run — would extract from API, transform, load to SQLite + JSONL")
        sys.exit(0)

    result = run()
    print(result.summary())
    sys.exit(0 if not result.errors else 1)

🚀 Want 50+ Production-Ready Python Scripts?

Data pipelines, scrapers, bots, automation workflows, AI integrations — tested and ready to deploy.

Get the Full Toolkit — $19