Python Machine Learning Pipeline — scikit-learn from Scratch
Most ML tutorials stop at model.fit(X, y). Real-world machine learning is 80% data preparation, 10% modeling, and 10% deployment. This guide covers the full pipeline — from raw data to a deployed model that handles edge cases, logs predictions, and can be retrained without rewriting code.
The ML Pipeline Overview
# The pipeline we'll build:
# Raw Data → Clean → Features → Split → Train → Evaluate → Tune → Deploy
#
# Each step is modular, testable, and reproducible.
Setup
pip install scikit-learn pandas numpy joblib matplotlib seaborn
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.metrics import classification_report, confusion_matrix
import warnings
warnings.filterwarnings('ignore')
Step 1: Load and Explore Data
# We'll use a customer churn dataset as an example
# Works with any tabular classification/regression problem
def load_data(path: str) -> pd.DataFrame:
"""Load and do initial inspection."""
df = pd.read_csv(path)
print(f"Shape: {df.shape}")
print(f"\nData types:\n{df.dtypes}")
print(f"\nMissing values:\n{df.isnull().sum()[df.isnull().sum() > 0]}")
print(f"\nTarget distribution:\n{df['churn'].value_counts(normalize=True)}")
return df
def quick_eda(df: pd.DataFrame):
"""Quick exploratory data analysis."""
print("=" * 50)
print("NUMERIC FEATURES")
print("=" * 50)
print(df.describe().round(2))
print("\n" + "=" * 50)
print("CATEGORICAL FEATURES")
print("=" * 50)
for col in df.select_dtypes(include='object').columns:
print(f"\n{col}: {df[col].nunique()} unique values")
print(df[col].value_counts().head())
# Usage
# df = load_data("data/customer_churn.csv")
# quick_eda(df)
Step 2: Data Cleaning
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
"""Clean raw data: types, duplicates, obvious errors."""
df = df.copy()
# Remove duplicates
n_dupes = df.duplicated().sum()
if n_dupes > 0:
print(f"Removing {n_dupes} duplicates")
df = df.drop_duplicates()
# Fix data types
if 'total_charges' in df.columns:
df['total_charges'] = pd.to_numeric(df['total_charges'], errors='coerce')
# Remove obvious errors
numeric_cols = df.select_dtypes(include=np.number).columns
for col in numeric_cols:
n_neg = (df[col] < 0).sum()
if n_neg > 0 and col not in ['profit', 'balance']: # some cols can be negative
print(f"Warning: {n_neg} negative values in '{col}'")
# Standardize strings
str_cols = df.select_dtypes(include='object').columns
for col in str_cols:
df[col] = df[col].str.strip().str.lower()
print(f"Clean shape: {df.shape}")
return df
Step 3: Feature Engineering
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
"""Create new features from existing data."""
df = df.copy()
# Ratio features (often more predictive than raw values)
if 'monthly_charges' in df.columns and 'tenure' in df.columns:
df['avg_monthly_spend'] = df['total_charges'] / df['tenure'].clip(lower=1)
df['charge_tenure_ratio'] = df['monthly_charges'] / df['tenure'].clip(lower=1)
# Binning continuous features
if 'tenure' in df.columns:
df['tenure_group'] = pd.cut(
df['tenure'],
bins=[0, 12, 24, 48, 72, float('inf')],
labels=['0-1yr', '1-2yr', '2-4yr', '4-6yr', '6yr+']
)
# Interaction features
if 'contract' in df.columns and 'monthly_charges' in df.columns:
df['contract_x_charges'] = (
df['contract'].map({'month-to-month': 1, 'one year': 2, 'two year': 3})
* df['monthly_charges']
)
# Count of services
service_cols = [c for c in df.columns if c.startswith('has_') or c.endswith('_service')]
if service_cols:
df['n_services'] = df[service_cols].apply(
lambda row: (row == 'yes').sum(), axis=1
)
print(f"Features after engineering: {df.shape[1]}")
return df
💡 Feature engineering tips:
- Ratios and differences often outperform raw values
- Domain knowledge beats automated feature generation
- Log-transform skewed distributions (np.log1p)
- Encode cyclical features (day of week, month) with sin/cos
Step 4: Preprocessing Pipeline
This is where scikit-learn shines. Build a Pipeline that handles all preprocessing — no data leakage, reproducible, and works with cross-validation.
def build_preprocessor(
numeric_features: list[str],
categorical_features: list[str],
) -> ColumnTransformer:
"""Build a sklearn preprocessing pipeline."""
# Numeric: impute missing → scale
numeric_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
])
# Categorical: impute missing → one-hot encode
categorical_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('encoder', OneHotEncoder(
drop='first', # avoid multicollinearity
handle_unknown='ignore', # handle unseen categories at predict time
sparse_output=False,
)),
])
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_pipeline, numeric_features),
('cat', categorical_pipeline, categorical_features),
],
remainder='drop', # drop columns not in either list
)
return preprocessor
# Define feature groups
numeric_features = [
'tenure', 'monthly_charges', 'total_charges',
'avg_monthly_spend', 'charge_tenure_ratio', 'n_services',
]
categorical_features = [
'gender', 'contract', 'payment_method',
'internet_service', 'tenure_group',
]
preprocessor = build_preprocessor(numeric_features, categorical_features)
Step 5: Model Training
from sklearn.ensemble import (
RandomForestClassifier,
GradientBoostingClassifier,
)
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
def train_and_compare(
X_train, y_train, X_test, y_test, preprocessor
) -> dict:
"""Train multiple models and compare performance."""
models = {
'Logistic Regression': LogisticRegression(
max_iter=1000, class_weight='balanced'
),
'Random Forest': RandomForestClassifier(
n_estimators=200, class_weight='balanced', random_state=42, n_jobs=-1
),
'Gradient Boosting': GradientBoostingClassifier(
n_estimators=200, learning_rate=0.1, max_depth=5, random_state=42
),
}
results = {}
for name, model in models.items():
# Full pipeline: preprocess → train
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', model),
])
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
report = classification_report(y_test, y_pred, output_dict=True)
results[name] = {
'pipeline': pipeline,
'accuracy': report['accuracy'],
'precision': report['weighted avg']['precision'],
'recall': report['weighted avg']['recall'],
'f1': report['weighted avg']['f1-score'],
}
print(f"\n{'='*50}")
print(f"{name}")
print(f"{'='*50}")
print(classification_report(y_test, y_pred))
return results
# --- Usage ---
# Split data
# X = df.drop('churn', axis=1)
# y = df['churn'].map({'yes': 1, 'no': 0})
# X_train, X_test, y_train, y_test = train_test_split(
# X, y, test_size=0.2, random_state=42, stratify=y
# )
# results = train_and_compare(X_train, y_train, X_test, y_test, preprocessor)
Step 6: Model Evaluation
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import roc_curve, auc, precision_recall_curve
def evaluate_model(pipeline, X_test, y_test, model_name: str = "Model"):
"""Comprehensive model evaluation with plots."""
y_pred = pipeline.predict(X_test)
y_proba = pipeline.predict_proba(X_test)[:, 1]
# 1. Confusion Matrix
cm = confusion_matrix(y_test, y_pred)
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[0])
axes[0].set_title(f'{model_name} — Confusion Matrix')
axes[0].set_xlabel('Predicted')
axes[0].set_ylabel('Actual')
# 2. ROC Curve
fpr, tpr, _ = roc_curve(y_test, y_proba)
roc_auc = auc(fpr, tpr)
axes[1].plot(fpr, tpr, label=f'AUC = {roc_auc:.3f}')
axes[1].plot([0, 1], [0, 1], 'k--')
axes[1].set_title(f'{model_name} — ROC Curve')
axes[1].set_xlabel('False Positive Rate')
axes[1].set_ylabel('True Positive Rate')
axes[1].legend()
# 3. Precision-Recall Curve
precision, recall, _ = precision_recall_curve(y_test, y_proba)
axes[2].plot(recall, precision)
axes[2].set_title(f'{model_name} — Precision-Recall')
axes[2].set_xlabel('Recall')
axes[2].set_ylabel('Precision')
plt.tight_layout()
plt.savefig(f'{model_name.lower().replace(" ", "_")}_eval.png', dpi=150)
plt.show()
return {'auc': roc_auc, 'confusion_matrix': cm}
def feature_importance(pipeline, feature_names: list[str], top_n: int = 15):
"""Extract and plot feature importance from tree-based models."""
model = pipeline.named_steps['classifier']
if hasattr(model, 'feature_importances_'):
importances = model.feature_importances_
elif hasattr(model, 'coef_'):
importances = np.abs(model.coef_[0])
else:
print("Model doesn't support feature importance")
return
# Get transformed feature names
preprocessor = pipeline.named_steps['preprocessor']
try:
names = preprocessor.get_feature_names_out()
except AttributeError:
names = feature_names[:len(importances)]
feat_imp = pd.DataFrame({
'feature': names[:len(importances)],
'importance': importances,
}).sort_values('importance', ascending=False).head(top_n)
plt.figure(figsize=(10, 6))
sns.barplot(data=feat_imp, x='importance', y='feature')
plt.title(f'Top {top_n} Feature Importances')
plt.tight_layout()
plt.savefig('feature_importance.png', dpi=150)
plt.show()
return feat_imp
Step 7: Hyperparameter Tuning
from sklearn.model_selection import RandomizedSearchCV, cross_val_score
from scipy import stats
def tune_model(pipeline, X_train, y_train) -> Pipeline:
"""Tune hyperparameters with RandomizedSearchCV."""
# Parameter distributions for Random Forest
param_distributions = {
'classifier__n_estimators': [100, 200, 300, 500],
'classifier__max_depth': [5, 10, 15, 20, None],
'classifier__min_samples_split': [2, 5, 10, 20],
'classifier__min_samples_leaf': [1, 2, 4, 8],
'classifier__max_features': ['sqrt', 'log2', 0.3, 0.5],
'classifier__class_weight': ['balanced', 'balanced_subsample'],
}
search = RandomizedSearchCV(
pipeline,
param_distributions=param_distributions,
n_iter=50, # try 50 random combinations
cv=5, # 5-fold cross-validation
scoring='f1', # optimize for F1 (good for imbalanced)
n_jobs=-1, # use all CPU cores
random_state=42,
verbose=1,
)
search.fit(X_train, y_train)
print(f"\nBest F1: {search.best_score_:.4f}")
print(f"Best params: {search.best_params_}")
return search.best_estimator_
# Cross-validation sanity check
def validate_model(pipeline, X, y, cv: int = 5):
"""Cross-validate to check for overfitting."""
scores = cross_val_score(pipeline, X, y, cv=cv, scoring='f1', n_jobs=-1)
print(f"CV F1 scores: {scores.round(4)}")
print(f"Mean: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")
if scores.std() > 0.05:
print("⚠️ High variance — model may be overfitting")
else:
print("✅ Stable performance across folds")
return scores
🎯 Tuning tips:
- Use RandomizedSearchCV over GridSearchCV — finds good params faster
- Pick the right metric: accuracy is misleading for imbalanced data. Use F1, AUC, or precision/recall
- Always cross-validate — a single train/test split can be lucky
- For production: use Optuna for Bayesian optimization (smarter search)
Step 8: Save and Load Models
import joblib
from datetime import datetime
from pathlib import Path
import json
class ModelRegistry:
"""Save, load, and track model versions."""
def __init__(self, registry_dir: str = "models"):
self.dir = Path(registry_dir)
self.dir.mkdir(exist_ok=True)
def save(
self,
pipeline: Pipeline,
name: str,
metrics: dict,
features: list[str],
) -> str:
"""Save model with metadata."""
version = datetime.now().strftime("%Y%m%d_%H%M%S")
model_dir = self.dir / f"{name}_v{version}"
model_dir.mkdir()
# Save pipeline
model_path = model_dir / "model.joblib"
joblib.dump(pipeline, model_path)
# Save metadata
metadata = {
"name": name,
"version": version,
"created_at": datetime.now().isoformat(),
"metrics": metrics,
"features": features,
"sklearn_version": __import__('sklearn').__version__,
"python_version": __import__('sys').version,
}
(model_dir / "metadata.json").write_text(
json.dumps(metadata, indent=2, default=str)
)
print(f"Model saved: {model_dir}")
return str(model_dir)
def load(self, name: str, version: str = "latest") -> tuple:
"""Load model and metadata."""
if version == "latest":
# Find most recent version
versions = sorted(self.dir.glob(f"{name}_v*"))
if not versions:
raise FileNotFoundError(f"No models found for '{name}'")
model_dir = versions[-1]
else:
model_dir = self.dir / f"{name}_v{version}"
pipeline = joblib.load(model_dir / "model.joblib")
metadata = json.loads((model_dir / "metadata.json").read_text())
print(f"Loaded {metadata['name']} v{metadata['version']}")
print(f"Metrics: {metadata['metrics']}")
return pipeline, metadata
# --- Usage ---
# registry = ModelRegistry()
# registry.save(
# pipeline=best_pipeline,
# name="churn_predictor",
# metrics={"f1": 0.82, "auc": 0.91},
# features=numeric_features + categorical_features,
# )
# pipeline, meta = registry.load("churn_predictor")
Step 9: Prediction Service
Serve your model as a FastAPI endpoint:
# predict_service.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pandas as pd
import joblib
import logging
logger = logging.getLogger(__name__)
app = FastAPI(title="Churn Prediction API")
# Load model at startup
model = joblib.load("models/churn_predictor_latest/model.joblib")
class CustomerData(BaseModel):
tenure: int
monthly_charges: float
total_charges: float
gender: str
contract: str
payment_method: str
internet_service: str
n_services: int = 0
class PredictionResponse(BaseModel):
churn_probability: float
prediction: str
confidence: float
@app.post("/predict", response_model=PredictionResponse)
def predict(customer: CustomerData):
"""Predict churn probability for a customer."""
try:
# Convert to DataFrame (sklearn expects tabular input)
df = pd.DataFrame([customer.model_dump()])
# Feature engineering (same as training)
df['avg_monthly_spend'] = df['total_charges'] / df['tenure'].clip(lower=1)
df['charge_tenure_ratio'] = df['monthly_charges'] / df['tenure'].clip(lower=1)
df['tenure_group'] = pd.cut(
df['tenure'],
bins=[0, 12, 24, 48, 72, float('inf')],
labels=['0-1yr', '1-2yr', '2-4yr', '4-6yr', '6yr+']
)
# Predict
proba = model.predict_proba(df)[0]
churn_prob = float(proba[1])
prediction = "churn" if churn_prob > 0.5 else "stay"
confidence = float(max(proba))
logger.info(
f"Prediction: {prediction} (p={churn_prob:.3f})",
extra={"customer_tenure": customer.tenure},
)
return PredictionResponse(
churn_probability=round(churn_prob, 4),
prediction=prediction,
confidence=round(confidence, 4),
)
except Exception as e:
logger.error(f"Prediction failed: {e}")
raise HTTPException(500, f"Prediction error: {str(e)}")
@app.get("/health")
def health():
return {"status": "healthy", "model": "churn_predictor"}
# Run the service
uvicorn predict_service:app --host 0.0.0.0 --port 8000
# Test prediction
curl -X POST http://localhost:8000/predict \
-H "Content-Type: application/json" \
-d '{
"tenure": 6,
"monthly_charges": 89.5,
"total_charges": 537.0,
"gender": "female",
"contract": "month-to-month",
"payment_method": "electronic check",
"internet_service": "fiber optic",
"n_services": 3
}'
Putting It All Together
# main.py — Complete ML pipeline
import pandas as pd
from sklearn.model_selection import train_test_split
# 1. Load
df = pd.read_csv("data/customer_churn.csv")
# 2. Clean
df = clean_data(df)
# 3. Feature engineering
df = engineer_features(df)
# 4. Split
X = df.drop('churn', axis=1)
y = df['churn'].map({'yes': 1, 'no': 0})
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 5. Build preprocessor
preprocessor = build_preprocessor(numeric_features, categorical_features)
# 6. Train and compare
results = train_and_compare(X_train, y_train, X_test, y_test, preprocessor)
# 7. Pick best model
best_name = max(results, key=lambda k: results[k]['f1'])
best_pipeline = results[best_name]['pipeline']
print(f"\nBest model: {best_name} (F1: {results[best_name]['f1']:.4f})")
# 8. Tune
best_pipeline = tune_model(best_pipeline, X_train, y_train)
# 9. Final evaluation
evaluate_model(best_pipeline, X_test, y_test, best_name)
validate_model(best_pipeline, X, y)
# 10. Save
registry = ModelRegistry()
registry.save(
pipeline=best_pipeline,
name="churn_predictor",
metrics=results[best_name],
features=numeric_features + categorical_features,
)
print("\n✅ Pipeline complete!")
Common Pitfalls
- Data leakage — fitting preprocessor on test data. Use Pipeline to prevent this
- Class imbalance — use class_weight='balanced', SMOTE, or adjust the decision threshold
- Feature scaling — tree models don't need it, but linear models and SVM do. Pipeline handles it
- Overfitting — if train accuracy >> test accuracy, reduce model complexity or add regularization
- Missing values at inference — your pipeline must handle them (we use SimpleImputer)
- New categories — unseen categories at predict time crash OneHotEncoder. Use handle_unknown='ignore'
- Model drift — monitor prediction distributions; retrain when they shift
🚀 Want production-ready ML pipelines, data tools, and automation scripts?
Related Articles
- Build a Data Pipeline in Python — ETL from Scratch
- Data Visualization with Python — Matplotlib, Seaborn & Plotly
- Build a REST API with FastAPI — serve your model as an API
- Dockerize Python Apps — containerize your ML pipeline
- Python Testing Guide — test your data processing and models
- Build a RAG Pipeline in Python — AI/ML beyond classical ML
Need a custom ML pipeline built for your data? I build Python ML systems, data pipelines, and automation tools. Reach out on Telegram →