Python Machine Learning Pipeline — scikit-learn from Scratch

March 2026 · 25 min read · Python, Machine Learning, scikit-learn, Data Science

Most ML tutorials stop at model.fit(X, y). Real-world machine learning is 80% data preparation, 10% modeling, and 10% deployment. This guide covers the full pipeline — from raw data to a deployed model that handles edge cases, logs predictions, and can be retrained without rewriting code.

The ML Pipeline Overview

# The pipeline we'll build:
# Raw Data → Clean → Features → Split → Train → Evaluate → Tune → Deploy
#
# Each step is modular, testable, and reproducible.

Setup

pip install scikit-learn pandas numpy joblib matplotlib seaborn
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.metrics import classification_report, confusion_matrix
import warnings
warnings.filterwarnings('ignore')

Step 1: Load and Explore Data

# We'll use a customer churn dataset as an example
# Works with any tabular classification/regression problem

def load_data(path: str) -> pd.DataFrame:
    """Load and do initial inspection."""
    df = pd.read_csv(path)

    print(f"Shape: {df.shape}")
    print(f"\nData types:\n{df.dtypes}")
    print(f"\nMissing values:\n{df.isnull().sum()[df.isnull().sum() > 0]}")
    print(f"\nTarget distribution:\n{df['churn'].value_counts(normalize=True)}")

    return df


def quick_eda(df: pd.DataFrame):
    """Quick exploratory data analysis."""
    print("=" * 50)
    print("NUMERIC FEATURES")
    print("=" * 50)
    print(df.describe().round(2))

    print("\n" + "=" * 50)
    print("CATEGORICAL FEATURES")
    print("=" * 50)
    for col in df.select_dtypes(include='object').columns:
        print(f"\n{col}: {df[col].nunique()} unique values")
        print(df[col].value_counts().head())


# Usage
# df = load_data("data/customer_churn.csv")
# quick_eda(df)

Step 2: Data Cleaning

def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """Clean raw data: types, duplicates, obvious errors."""
    df = df.copy()

    # Remove duplicates
    n_dupes = df.duplicated().sum()
    if n_dupes > 0:
        print(f"Removing {n_dupes} duplicates")
        df = df.drop_duplicates()

    # Fix data types
    if 'total_charges' in df.columns:
        df['total_charges'] = pd.to_numeric(df['total_charges'], errors='coerce')

    # Remove obvious errors
    numeric_cols = df.select_dtypes(include=np.number).columns
    for col in numeric_cols:
        n_neg = (df[col] < 0).sum()
        if n_neg > 0 and col not in ['profit', 'balance']:  # some cols can be negative
            print(f"Warning: {n_neg} negative values in '{col}'")

    # Standardize strings
    str_cols = df.select_dtypes(include='object').columns
    for col in str_cols:
        df[col] = df[col].str.strip().str.lower()

    print(f"Clean shape: {df.shape}")
    return df

Step 3: Feature Engineering

def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
    """Create new features from existing data."""
    df = df.copy()

    # Ratio features (often more predictive than raw values)
    if 'monthly_charges' in df.columns and 'tenure' in df.columns:
        df['avg_monthly_spend'] = df['total_charges'] / df['tenure'].clip(lower=1)
        df['charge_tenure_ratio'] = df['monthly_charges'] / df['tenure'].clip(lower=1)

    # Binning continuous features
    if 'tenure' in df.columns:
        df['tenure_group'] = pd.cut(
            df['tenure'],
            bins=[0, 12, 24, 48, 72, float('inf')],
            labels=['0-1yr', '1-2yr', '2-4yr', '4-6yr', '6yr+']
        )

    # Interaction features
    if 'contract' in df.columns and 'monthly_charges' in df.columns:
        df['contract_x_charges'] = (
            df['contract'].map({'month-to-month': 1, 'one year': 2, 'two year': 3})
            * df['monthly_charges']
        )

    # Count of services
    service_cols = [c for c in df.columns if c.startswith('has_') or c.endswith('_service')]
    if service_cols:
        df['n_services'] = df[service_cols].apply(
            lambda row: (row == 'yes').sum(), axis=1
        )

    print(f"Features after engineering: {df.shape[1]}")
    return df
💡 Feature engineering tips:
  • Ratios and differences often outperform raw values
  • Domain knowledge beats automated feature generation
  • Log-transform skewed distributions (np.log1p)
  • Encode cyclical features (day of week, month) with sin/cos

Step 4: Preprocessing Pipeline

This is where scikit-learn shines. Build a Pipeline that handles all preprocessing — no data leakage, reproducible, and works with cross-validation.

def build_preprocessor(
    numeric_features: list[str],
    categorical_features: list[str],
) -> ColumnTransformer:
    """Build a sklearn preprocessing pipeline."""

    # Numeric: impute missing → scale
    numeric_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler()),
    ])

    # Categorical: impute missing → one-hot encode
    categorical_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('encoder', OneHotEncoder(
            drop='first',           # avoid multicollinearity
            handle_unknown='ignore', # handle unseen categories at predict time
            sparse_output=False,
        )),
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_pipeline, numeric_features),
            ('cat', categorical_pipeline, categorical_features),
        ],
        remainder='drop',  # drop columns not in either list
    )

    return preprocessor


# Define feature groups
numeric_features = [
    'tenure', 'monthly_charges', 'total_charges',
    'avg_monthly_spend', 'charge_tenure_ratio', 'n_services',
]

categorical_features = [
    'gender', 'contract', 'payment_method',
    'internet_service', 'tenure_group',
]

preprocessor = build_preprocessor(numeric_features, categorical_features)

Step 5: Model Training

from sklearn.ensemble import (
    RandomForestClassifier,
    GradientBoostingClassifier,
)
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC


def train_and_compare(
    X_train, y_train, X_test, y_test, preprocessor
) -> dict:
    """Train multiple models and compare performance."""

    models = {
        'Logistic Regression': LogisticRegression(
            max_iter=1000, class_weight='balanced'
        ),
        'Random Forest': RandomForestClassifier(
            n_estimators=200, class_weight='balanced', random_state=42, n_jobs=-1
        ),
        'Gradient Boosting': GradientBoostingClassifier(
            n_estimators=200, learning_rate=0.1, max_depth=5, random_state=42
        ),
    }

    results = {}

    for name, model in models.items():
        # Full pipeline: preprocess → train
        pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('classifier', model),
        ])

        pipeline.fit(X_train, y_train)
        y_pred = pipeline.predict(X_test)

        report = classification_report(y_test, y_pred, output_dict=True)

        results[name] = {
            'pipeline': pipeline,
            'accuracy': report['accuracy'],
            'precision': report['weighted avg']['precision'],
            'recall': report['weighted avg']['recall'],
            'f1': report['weighted avg']['f1-score'],
        }

        print(f"\n{'='*50}")
        print(f"{name}")
        print(f"{'='*50}")
        print(classification_report(y_test, y_pred))

    return results


# --- Usage ---
# Split data
# X = df.drop('churn', axis=1)
# y = df['churn'].map({'yes': 1, 'no': 0})
# X_train, X_test, y_train, y_test = train_test_split(
#     X, y, test_size=0.2, random_state=42, stratify=y
# )
# results = train_and_compare(X_train, y_train, X_test, y_test, preprocessor)

Step 6: Model Evaluation

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import roc_curve, auc, precision_recall_curve


def evaluate_model(pipeline, X_test, y_test, model_name: str = "Model"):
    """Comprehensive model evaluation with plots."""

    y_pred = pipeline.predict(X_test)
    y_proba = pipeline.predict_proba(X_test)[:, 1]

    # 1. Confusion Matrix
    cm = confusion_matrix(y_test, y_pred)
    fig, axes = plt.subplots(1, 3, figsize=(18, 5))

    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[0])
    axes[0].set_title(f'{model_name} — Confusion Matrix')
    axes[0].set_xlabel('Predicted')
    axes[0].set_ylabel('Actual')

    # 2. ROC Curve
    fpr, tpr, _ = roc_curve(y_test, y_proba)
    roc_auc = auc(fpr, tpr)

    axes[1].plot(fpr, tpr, label=f'AUC = {roc_auc:.3f}')
    axes[1].plot([0, 1], [0, 1], 'k--')
    axes[1].set_title(f'{model_name} — ROC Curve')
    axes[1].set_xlabel('False Positive Rate')
    axes[1].set_ylabel('True Positive Rate')
    axes[1].legend()

    # 3. Precision-Recall Curve
    precision, recall, _ = precision_recall_curve(y_test, y_proba)

    axes[2].plot(recall, precision)
    axes[2].set_title(f'{model_name} — Precision-Recall')
    axes[2].set_xlabel('Recall')
    axes[2].set_ylabel('Precision')

    plt.tight_layout()
    plt.savefig(f'{model_name.lower().replace(" ", "_")}_eval.png', dpi=150)
    plt.show()

    return {'auc': roc_auc, 'confusion_matrix': cm}


def feature_importance(pipeline, feature_names: list[str], top_n: int = 15):
    """Extract and plot feature importance from tree-based models."""
    model = pipeline.named_steps['classifier']

    if hasattr(model, 'feature_importances_'):
        importances = model.feature_importances_
    elif hasattr(model, 'coef_'):
        importances = np.abs(model.coef_[0])
    else:
        print("Model doesn't support feature importance")
        return

    # Get transformed feature names
    preprocessor = pipeline.named_steps['preprocessor']
    try:
        names = preprocessor.get_feature_names_out()
    except AttributeError:
        names = feature_names[:len(importances)]

    feat_imp = pd.DataFrame({
        'feature': names[:len(importances)],
        'importance': importances,
    }).sort_values('importance', ascending=False).head(top_n)

    plt.figure(figsize=(10, 6))
    sns.barplot(data=feat_imp, x='importance', y='feature')
    plt.title(f'Top {top_n} Feature Importances')
    plt.tight_layout()
    plt.savefig('feature_importance.png', dpi=150)
    plt.show()

    return feat_imp

Step 7: Hyperparameter Tuning

from sklearn.model_selection import RandomizedSearchCV, cross_val_score
from scipy import stats


def tune_model(pipeline, X_train, y_train) -> Pipeline:
    """Tune hyperparameters with RandomizedSearchCV."""

    # Parameter distributions for Random Forest
    param_distributions = {
        'classifier__n_estimators': [100, 200, 300, 500],
        'classifier__max_depth': [5, 10, 15, 20, None],
        'classifier__min_samples_split': [2, 5, 10, 20],
        'classifier__min_samples_leaf': [1, 2, 4, 8],
        'classifier__max_features': ['sqrt', 'log2', 0.3, 0.5],
        'classifier__class_weight': ['balanced', 'balanced_subsample'],
    }

    search = RandomizedSearchCV(
        pipeline,
        param_distributions=param_distributions,
        n_iter=50,           # try 50 random combinations
        cv=5,                # 5-fold cross-validation
        scoring='f1',        # optimize for F1 (good for imbalanced)
        n_jobs=-1,           # use all CPU cores
        random_state=42,
        verbose=1,
    )

    search.fit(X_train, y_train)

    print(f"\nBest F1: {search.best_score_:.4f}")
    print(f"Best params: {search.best_params_}")

    return search.best_estimator_


# Cross-validation sanity check
def validate_model(pipeline, X, y, cv: int = 5):
    """Cross-validate to check for overfitting."""
    scores = cross_val_score(pipeline, X, y, cv=cv, scoring='f1', n_jobs=-1)

    print(f"CV F1 scores: {scores.round(4)}")
    print(f"Mean: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")

    if scores.std() > 0.05:
        print("⚠️ High variance — model may be overfitting")
    else:
        print("✅ Stable performance across folds")

    return scores
🎯 Tuning tips:
  • Use RandomizedSearchCV over GridSearchCV — finds good params faster
  • Pick the right metric: accuracy is misleading for imbalanced data. Use F1, AUC, or precision/recall
  • Always cross-validate — a single train/test split can be lucky
  • For production: use Optuna for Bayesian optimization (smarter search)

Step 8: Save and Load Models

import joblib
from datetime import datetime
from pathlib import Path
import json


class ModelRegistry:
    """Save, load, and track model versions."""

    def __init__(self, registry_dir: str = "models"):
        self.dir = Path(registry_dir)
        self.dir.mkdir(exist_ok=True)

    def save(
        self,
        pipeline: Pipeline,
        name: str,
        metrics: dict,
        features: list[str],
    ) -> str:
        """Save model with metadata."""
        version = datetime.now().strftime("%Y%m%d_%H%M%S")
        model_dir = self.dir / f"{name}_v{version}"
        model_dir.mkdir()

        # Save pipeline
        model_path = model_dir / "model.joblib"
        joblib.dump(pipeline, model_path)

        # Save metadata
        metadata = {
            "name": name,
            "version": version,
            "created_at": datetime.now().isoformat(),
            "metrics": metrics,
            "features": features,
            "sklearn_version": __import__('sklearn').__version__,
            "python_version": __import__('sys').version,
        }
        (model_dir / "metadata.json").write_text(
            json.dumps(metadata, indent=2, default=str)
        )

        print(f"Model saved: {model_dir}")
        return str(model_dir)

    def load(self, name: str, version: str = "latest") -> tuple:
        """Load model and metadata."""
        if version == "latest":
            # Find most recent version
            versions = sorted(self.dir.glob(f"{name}_v*"))
            if not versions:
                raise FileNotFoundError(f"No models found for '{name}'")
            model_dir = versions[-1]
        else:
            model_dir = self.dir / f"{name}_v{version}"

        pipeline = joblib.load(model_dir / "model.joblib")
        metadata = json.loads((model_dir / "metadata.json").read_text())

        print(f"Loaded {metadata['name']} v{metadata['version']}")
        print(f"Metrics: {metadata['metrics']}")

        return pipeline, metadata


# --- Usage ---
# registry = ModelRegistry()
# registry.save(
#     pipeline=best_pipeline,
#     name="churn_predictor",
#     metrics={"f1": 0.82, "auc": 0.91},
#     features=numeric_features + categorical_features,
# )
# pipeline, meta = registry.load("churn_predictor")

Step 9: Prediction Service

Serve your model as a FastAPI endpoint:

# predict_service.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pandas as pd
import joblib
import logging

logger = logging.getLogger(__name__)

app = FastAPI(title="Churn Prediction API")

# Load model at startup
model = joblib.load("models/churn_predictor_latest/model.joblib")


class CustomerData(BaseModel):
    tenure: int
    monthly_charges: float
    total_charges: float
    gender: str
    contract: str
    payment_method: str
    internet_service: str
    n_services: int = 0


class PredictionResponse(BaseModel):
    churn_probability: float
    prediction: str
    confidence: float


@app.post("/predict", response_model=PredictionResponse)
def predict(customer: CustomerData):
    """Predict churn probability for a customer."""
    try:
        # Convert to DataFrame (sklearn expects tabular input)
        df = pd.DataFrame([customer.model_dump()])

        # Feature engineering (same as training)
        df['avg_monthly_spend'] = df['total_charges'] / df['tenure'].clip(lower=1)
        df['charge_tenure_ratio'] = df['monthly_charges'] / df['tenure'].clip(lower=1)
        df['tenure_group'] = pd.cut(
            df['tenure'],
            bins=[0, 12, 24, 48, 72, float('inf')],
            labels=['0-1yr', '1-2yr', '2-4yr', '4-6yr', '6yr+']
        )

        # Predict
        proba = model.predict_proba(df)[0]
        churn_prob = float(proba[1])
        prediction = "churn" if churn_prob > 0.5 else "stay"
        confidence = float(max(proba))

        logger.info(
            f"Prediction: {prediction} (p={churn_prob:.3f})",
            extra={"customer_tenure": customer.tenure},
        )

        return PredictionResponse(
            churn_probability=round(churn_prob, 4),
            prediction=prediction,
            confidence=round(confidence, 4),
        )

    except Exception as e:
        logger.error(f"Prediction failed: {e}")
        raise HTTPException(500, f"Prediction error: {str(e)}")


@app.get("/health")
def health():
    return {"status": "healthy", "model": "churn_predictor"}
# Run the service
uvicorn predict_service:app --host 0.0.0.0 --port 8000

# Test prediction
curl -X POST http://localhost:8000/predict \
  -H "Content-Type: application/json" \
  -d '{
    "tenure": 6,
    "monthly_charges": 89.5,
    "total_charges": 537.0,
    "gender": "female",
    "contract": "month-to-month",
    "payment_method": "electronic check",
    "internet_service": "fiber optic",
    "n_services": 3
  }'

Putting It All Together

# main.py — Complete ML pipeline
import pandas as pd
from sklearn.model_selection import train_test_split

# 1. Load
df = pd.read_csv("data/customer_churn.csv")

# 2. Clean
df = clean_data(df)

# 3. Feature engineering
df = engineer_features(df)

# 4. Split
X = df.drop('churn', axis=1)
y = df['churn'].map({'yes': 1, 'no': 0})
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# 5. Build preprocessor
preprocessor = build_preprocessor(numeric_features, categorical_features)

# 6. Train and compare
results = train_and_compare(X_train, y_train, X_test, y_test, preprocessor)

# 7. Pick best model
best_name = max(results, key=lambda k: results[k]['f1'])
best_pipeline = results[best_name]['pipeline']
print(f"\nBest model: {best_name} (F1: {results[best_name]['f1']:.4f})")

# 8. Tune
best_pipeline = tune_model(best_pipeline, X_train, y_train)

# 9. Final evaluation
evaluate_model(best_pipeline, X_test, y_test, best_name)
validate_model(best_pipeline, X, y)

# 10. Save
registry = ModelRegistry()
registry.save(
    pipeline=best_pipeline,
    name="churn_predictor",
    metrics=results[best_name],
    features=numeric_features + categorical_features,
)

print("\n✅ Pipeline complete!")

Common Pitfalls

🚀 Want production-ready ML pipelines, data tools, and automation scripts?

Get the AI Agent Toolkit →

Related Articles

Need a custom ML pipeline built for your data? I build Python ML systems, data pipelines, and automation tools. Reach out on Telegram →