
MLOps Pipelines: Automating the ML Lifecycle
MLOps brings DevOps practices to machine learning. This guide implements production ML pipelines with Kubeflow and MLflow.
MLflow Experiment Tracking
Track experiments with automatic logging:
import mlflow
import mlflow.pytorch
from sklearn.metrics import accuracy_score, f1_score
import torch
import torch.nn as nn
class MLflowTrainer:
"""Training with comprehensive experiment tracking"""
def __init__(self, experiment_name="my_experiment"):
mlflow.set_experiment(experiment_name)
self.run = None
def train(self, model, train_loader, val_loader, config):
"""Train with automatic logging"""
with mlflow.start_run() as run:
self.run = run
# Log hyperparameters
mlflow.log_params({
'learning_rate': config['lr'],
'batch_size': config['batch_size'],
'epochs': config['epochs'],
'optimizer': config['optimizer'],
'model_architecture': model.__class__.__name__
})
# Log model architecture
mlflow.set_tag("model_class", str(type(model)))
# Training loop
optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])
criterion = nn.CrossEntropyLoss()
for epoch in range(config['epochs']):
# Training
model.train()
train_loss = 0
for batch_idx, (data, target) in enumerate(train_loader):
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Validation
model.eval()
val_loss = 0
all_preds = []
all_targets = []
with torch.no_grad():
for data, target in val_loader:
output = model(data)
val_loss += criterion(output, target).item()
preds = output.argmax(dim=1)
all_preds.extend(preds.cpu().numpy())
all_targets.extend(target.cpu().numpy())
# Calculate metrics
accuracy = accuracy_score(all_targets, all_preds)
f1 = f1_score(all_targets, all_preds, average='weighted')
# Log metrics
mlflow.log_metrics({
'train_loss': train_loss / len(train_loader),
'val_loss': val_loss / len(val_loader),
'val_accuracy': accuracy,
'val_f1': f1
}, step=epoch)
print(f"Epoch {epoch}: val_acc={accuracy:.4f}, val_f1={f1:.4f}")
# ⚠️ Detect training anomalies
if epoch > 5 and accuracy < 0.1:
mlflow.set_tag("status", "failed_to_learn")
print("⚠️ Model not learning, aborting")
break
# Log final model
mlflow.pytorch.log_model(model, "model")
# Log model size
import os
model_path = "temp_model.pt"
torch.save(model.state_dict(), model_path)
model_size_mb = os.path.getsize(model_path) / 1e6
mlflow.log_metric("model_size_mb", model_size_mb)
os.remove(model_path)
# Register model if performance threshold met
if accuracy > 0.9:
model_uri = f"runs:/{run.info.run_id}/model"
mlflow.register_model(model_uri, "ProductionModel")
print("✅ Model registered for production")
else:
print(f"⚠️ Model accuracy {accuracy:.2%} below threshold, not registered")
return model
# Usage
trainer = MLflowTrainer("image_classification")
config = {
'lr': 0.001,
'batch_size': 64,
'epochs': 50,
'optimizer': 'Adam'
}
trained_model = trainer.train(model, train_loader, val_loader, config)
Click to examine closelydef preprocess_data(input_path: str, output_path: str) -> str:
Kubeflow Pipeline
Orchestrate end-to-end ML workflow:
import kfp
from kfp import dsl
from kfp.components import func_to_container_op
@func_to_container_op
def preprocess_data(input_path: str, output_path: str) -> str:
"""Data preprocessing component"""
import pandas as pd
from sklearn.preprocessing import StandardScaler
import pickle
# Load raw data
df = pd.read_csv(input_path)
# Clean data
df = df.dropna()
df = df[df['value'] > 0] # Remove invalid values
# Feature engineering
df['feature_ratio'] = df['feature_a'] / (df['feature_b'] + 1e-8)
df['feature_log'] = np.log1p(df['feature_c'])
# Scale features
scaler = StandardScaler()
feature_cols = [c for c in df.columns if c != 'target']
df[feature_cols] = scaler.fit_transform(df[feature_cols])
# Save processed data and scaler
df.to_csv(output_path, index=False)
with open(f"{output_path}.scaler.pkl", 'wb') as f:
pickle.dump(scaler, f)
print(f"Processed {len(df)} samples")
return output_path
@func_to_container_op
def train_model(data_path: str, model_output_path: str, hyperparams: dict) -> str:
"""Training component"""
import pandas as pd
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
# Load data
df = pd.read_csv(data_path)
X = df.drop('target', axis=1).values
y = df['target'].values
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2)
# Define model
class SimpleNN(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
x = torch.relu(self.fc1(x))
return self.fc2(x)
model = SimpleNN(X_train.shape[1], hyperparams['hidden_dim'], 10)
# Train
optimizer = torch.optim.Adam(model.parameters(), lr=hyperparams['lr'])
criterion = nn.CrossEntropyLoss()
for epoch in range(hyperparams['epochs']):
# Training code...
pass
# Save model
torch.save(model.state_dict(), model_output_path)
return model_output_path
@func_to_container_op
def evaluate_model(model_path: str, test_data_path: str) -> dict:
"""Evaluation component"""
import torch
import pandas as pd
from sklearn.metrics import accuracy_score, classification_report
# Load model and data
# ... evaluation code
metrics = {
'accuracy': 0.95,
'f1_score': 0.93
}
return metrics
@func_to_container_op
def deploy_model(model_path: str, metrics: dict, threshold: float = 0.9):
"""Conditional deployment based on metrics"""
if metrics['accuracy'] >= threshold:
# Deploy to production
print(f"✅ Deploying model (accuracy: {metrics['accuracy']})")
# kubectl apply -f deployment.yaml
# or TorchServe API call
else:
print(f"⚠️ Model accuracy {metrics['accuracy']} below threshold {threshold}")
raise Exception("Model performance insufficient for deployment")
@dsl.pipeline(
name='End-to-End ML Pipeline',
description='Automated training and deployment pipeline'
)
def ml_pipeline(
input_data_path: str = 's3://my-bucket/raw-data.csv',
model_registry: str = 's3://my-bucket/models/',
accuracy_threshold: float = 0.9
):
"""Complete ML pipeline"""
# Step 1: Preprocess
preprocess_task = preprocess_data(
input_path=input_data_path,
output_path='/tmp/processed_data.csv'
)
# Step 2: Train
hyperparams = {'lr': 0.001, 'hidden_dim': 128, 'epochs': 50}
train_task = train_model(
data_path=preprocess_task.output,
model_output_path='/tmp/model.pt',
hyperparams=hyperparams
)
train_task.after(preprocess_task)
# Step 3: Evaluate
eval_task = evaluate_model(
model_path=train_task.output,
test_data_path='/tmp/test_data.csv'
)
eval_task.after(train_task)
# Step 4: Deploy (conditional)
deploy_task = deploy_model(
model_path=train_task.output,
metrics=eval_task.output,
threshold=accuracy_threshold
)
deploy_task.after(eval_task)
# Compile and run pipeline
if __name__ == '__main__':
kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
client = kfp.Client(host='http://kubeflow.example.com')
run = client.create_run_from_pipeline_func(
ml_pipeline,
arguments={
'input_data_path': 's3://data/train.csv',
'accuracy_threshold': 0.92
}
)
Click to examine closely
Data Versioning with DVC
Version control for datasets:
# Initialize DVC dvc init # Track dataset dvc add data/train.csv git add data/train.csv.dvc data/.gitignore git commit -m "Add training data v1" # Configure remote storage dvc remote add -d myremote s3://my-bucket/dvc-storage dvc push # Create data pipeline dvc run -n preprocess \ -d data/raw.csv \ -o data/processed.csv \ python preprocess.py # Reproduce pipeline dvc reproClick to examine closely
Continuous Training
Automatically retrain on new data:
import schedule
import time
from datetime import datetime
class ContinuousTrainer:
"""Automated retraining system"""
def __init__(self, data_source, model_registry):
self.data_source = data_source
self.model_registry = model_registry
self.last_training_time = None
self.performance_history = []
def check_data_drift(self):
"""Detect if retraining is needed"""
from scipy.stats import ks_2samp
# Load current production data vs new data
prod_data = load_production_data()
new_data = load_new_data(self.data_source)
# Kolmogorov-Smirnov test for distribution shift
for feature in prod_data.columns:
statistic, pvalue = ks_2samp(prod_data[feature], new_data[feature])
if pvalue < 0.05: # Significant drift detected
print(f"⚠️ Data drift detected in {feature} (p={pvalue:.4f})")
return True
return False
def check_performance_degradation(self):
"""Monitor production model performance"""
current_accuracy = get_production_accuracy()
if len(self.performance_history) > 0:
baseline_accuracy = self.performance_history[0]
if current_accuracy < baseline_accuracy * 0.95:
print(f"⚠️ Performance degradation: {current_accuracy:.2%} vs {baseline_accuracy:.2%}")
return True
self.performance_history.append(current_accuracy)
return False
def trigger_training(self):
"""Execute training pipeline"""
print(f"🔄 Triggering retraining at {datetime.now()}")
# Run Kubeflow pipeline
client = kfp.Client(host='http://kubeflow.example.com')
run = client.create_run_from_pipeline_func(ml_pipeline, arguments={})
self.last_training_time = datetime.now()
# Monitor pipeline execution
run.wait_for_run_completion(timeout=3600)
print("✅ Retraining complete")
def scheduled_check(self):
"""Periodic check for retraining triggers"""
print(f"Checking retraining conditions at {datetime.now()}")
if self.check_data_drift() or self.check_performance_degradation():
self.trigger_training()
else:
print("No retraining needed")
# Schedule daily checks
trainer = ContinuousTrainer(data_source='s3://bucket/data', model_registry='s3://bucket/models')
schedule.every().day.at("02:00").do(trainer.scheduled_check)
while True:
schedule.run_pending()
time.sleep(3600)
Click to examine closelyWarnings ⚠️
Pipeline Complexity: Multi-stage pipelines accumulate failure modes. The 2034 "Pipeline Cascade" occurred when 300-step ML pipelines became impossible to debug.
Hidden Dependencies: Data lineage tracking fails, causing silent data quality issues.
Automation Runaway: Continuous training without human oversight deployed progressively worse models for weeks before detection.
Related Chronicles: The MLOps Meltdown (2034) - Automated systems deploying broken models
Tools: Kubeflow, MLflow, DVC, Airflow, Prefect, Weights & Biases
Research: Continuous learning systems, online learning, model monitoring