(function(w,d,s,l,i){ w[l]=w[l]||[]; w[l].push({'gtm.start': new Date().getTime(),event:'gtm.js'}); var f=d.getElementsByTagName(s)[0], j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:''; j.async=true; j.src='https://www.googletagmanager.com/gtm.js?id='+i+dl; f.parentNode.insertBefore(j,f); })(window,document,'script','dataLayer','GTM-W24L468');
MLOps Pipelines: Automating the ML Lifecycle
Polarity:Mixed/Knife-edge

MLOps Pipelines: Automating the ML Lifecycle

Visual Variations
fast sdxl
v2
kolors

MLOps brings DevOps practices to machine learning. This guide implements production ML pipelines with Kubeflow and MLflow.

MLflow Experiment Tracking

Track experiments with automatic logging:

import mlflow
import mlflow.pytorch
from sklearn.metrics import accuracy_score, f1_score
import torch
import torch.nn as nn

class MLflowTrainer:
    """Training with comprehensive experiment tracking"""
    def __init__(self, experiment_name="my_experiment"):
        mlflow.set_experiment(experiment_name)
        self.run = None
    
    def train(self, model, train_loader, val_loader, config):
        """Train with automatic logging"""
        with mlflow.start_run() as run:
            self.run = run
            
            # Log hyperparameters
            mlflow.log_params({
                'learning_rate': config['lr'],
                'batch_size': config['batch_size'],
                'epochs': config['epochs'],
                'optimizer': config['optimizer'],
                'model_architecture': model.__class__.__name__
            })
            
            # Log model architecture
            mlflow.set_tag("model_class", str(type(model)))
            
            # Training loop
            optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])
            criterion = nn.CrossEntropyLoss()
            
            for epoch in range(config['epochs']):
                # Training
                model.train()
                train_loss = 0
                for batch_idx, (data, target) in enumerate(train_loader):
                    optimizer.zero_grad()
                    output = model(data)
                    loss = criterion(output, target)
                    loss.backward()
                    optimizer.step()
                    
                    train_loss += loss.item()
                
                # Validation
                model.eval()
                val_loss = 0
                all_preds = []
                all_targets = []
                
                with torch.no_grad():
                    for data, target in val_loader:
                        output = model(data)
                        val_loss += criterion(output, target).item()
                        
                        preds = output.argmax(dim=1)
                        all_preds.extend(preds.cpu().numpy())
                        all_targets.extend(target.cpu().numpy())
                
                # Calculate metrics
                accuracy = accuracy_score(all_targets, all_preds)
                f1 = f1_score(all_targets, all_preds, average='weighted')
                
                # Log metrics
                mlflow.log_metrics({
                    'train_loss': train_loss / len(train_loader),
                    'val_loss': val_loss / len(val_loader),
                    'val_accuracy': accuracy,
                    'val_f1': f1
                }, step=epoch)
                
                print(f"Epoch {epoch}: val_acc={accuracy:.4f}, val_f1={f1:.4f}")
                
                # ⚠️ Detect training anomalies
                if epoch > 5 and accuracy < 0.1:
                    mlflow.set_tag("status", "failed_to_learn")
                    print("⚠️ Model not learning, aborting")
                    break
            
            # Log final model
            mlflow.pytorch.log_model(model, "model")
            
            # Log model size
            import os
            model_path = "temp_model.pt"
            torch.save(model.state_dict(), model_path)
            model_size_mb = os.path.getsize(model_path) / 1e6
            mlflow.log_metric("model_size_mb", model_size_mb)
            os.remove(model_path)
            
            # Register model if performance threshold met
            if accuracy > 0.9:
                model_uri = f"runs:/{run.info.run_id}/model"
                mlflow.register_model(model_uri, "ProductionModel")
                print("✅ Model registered for production")
            else:
                print(f"⚠️ Model accuracy {accuracy:.2%} below threshold, not registered")
            
            return model

# Usage
trainer = MLflowTrainer("image_classification")
config = {
    'lr': 0.001,
    'batch_size': 64,
    'epochs': 50,
    'optimizer': 'Adam'
}
trained_model = trainer.train(model, train_loader, val_loader, config)
Click to examine closely

def preprocess_data(input_path: str, output_path: str) -> str:

Kubeflow Pipeline

Orchestrate end-to-end ML workflow:

import kfp
from kfp import dsl
from kfp.components import func_to_container_op

@func_to_container_op
def preprocess_data(input_path: str, output_path: str) -> str:
    """Data preprocessing component"""
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    import pickle
    
    # Load raw data
    df = pd.read_csv(input_path)
    
    # Clean data
    df = df.dropna()
    df = df[df['value'] > 0]  # Remove invalid values
    
    # Feature engineering
    df['feature_ratio'] = df['feature_a'] / (df['feature_b'] + 1e-8)
    df['feature_log'] = np.log1p(df['feature_c'])
    
    # Scale features
    scaler = StandardScaler()
    feature_cols = [c for c in df.columns if c != 'target']
    df[feature_cols] = scaler.fit_transform(df[feature_cols])
    
    # Save processed data and scaler
    df.to_csv(output_path, index=False)
    with open(f"{output_path}.scaler.pkl", 'wb') as f:
        pickle.dump(scaler, f)
    
    print(f"Processed {len(df)} samples")
    return output_path

@func_to_container_op
def train_model(data_path: str, model_output_path: str, hyperparams: dict) -> str:
    """Training component"""
    import pandas as pd
    import torch
    import torch.nn as nn
    from sklearn.model_selection import train_test_split
    
    # Load data
    df = pd.read_csv(data_path)
    X = df.drop('target', axis=1).values
    y = df['target'].values
    
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2)
    
    # Define model
    class SimpleNN(nn.Module):
        def __init__(self, input_dim, hidden_dim, output_dim):
            super().__init__()
            self.fc1 = nn.Linear(input_dim, hidden_dim)
            self.fc2 = nn.Linear(hidden_dim, output_dim)
        
        def forward(self, x):
            x = torch.relu(self.fc1(x))
            return self.fc2(x)
    
    model = SimpleNN(X_train.shape[1], hyperparams['hidden_dim'], 10)
    
    # Train
    optimizer = torch.optim.Adam(model.parameters(), lr=hyperparams['lr'])
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(hyperparams['epochs']):
        # Training code...
        pass
    
    # Save model
    torch.save(model.state_dict(), model_output_path)
    
    return model_output_path

@func_to_container_op
def evaluate_model(model_path: str, test_data_path: str) -> dict:
    """Evaluation component"""
    import torch
    import pandas as pd
    from sklearn.metrics import accuracy_score, classification_report
    
    # Load model and data
    # ... evaluation code
    
    metrics = {
        'accuracy': 0.95,
        'f1_score': 0.93
    }
    
    return metrics

@func_to_container_op
def deploy_model(model_path: str, metrics: dict, threshold: float = 0.9):
    """Conditional deployment based on metrics"""
    if metrics['accuracy'] >= threshold:
        # Deploy to production
        print(f"✅ Deploying model (accuracy: {metrics['accuracy']})")
        # kubectl apply -f deployment.yaml
        # or TorchServe API call
    else:
        print(f"⚠️ Model accuracy {metrics['accuracy']} below threshold {threshold}")
        raise Exception("Model performance insufficient for deployment")

@dsl.pipeline(
    name='End-to-End ML Pipeline',
    description='Automated training and deployment pipeline'
)
def ml_pipeline(
    input_data_path: str = 's3://my-bucket/raw-data.csv',
    model_registry: str = 's3://my-bucket/models/',
    accuracy_threshold: float = 0.9
):
    """Complete ML pipeline"""
    # Step 1: Preprocess
    preprocess_task = preprocess_data(
        input_path=input_data_path,
        output_path='/tmp/processed_data.csv'
    )
    
    # Step 2: Train
    hyperparams = {'lr': 0.001, 'hidden_dim': 128, 'epochs': 50}
    train_task = train_model(
        data_path=preprocess_task.output,
        model_output_path='/tmp/model.pt',
        hyperparams=hyperparams
    )
    train_task.after(preprocess_task)
    
    # Step 3: Evaluate
    eval_task = evaluate_model(
        model_path=train_task.output,
        test_data_path='/tmp/test_data.csv'
    )
    eval_task.after(train_task)
    
    # Step 4: Deploy (conditional)
    deploy_task = deploy_model(
        model_path=train_task.output,
        metrics=eval_task.output,
        threshold=accuracy_threshold
    )
    deploy_task.after(eval_task)

# Compile and run pipeline
if __name__ == '__main__':
    kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
    
    client = kfp.Client(host='http://kubeflow.example.com')
    run = client.create_run_from_pipeline_func(
        ml_pipeline,
        arguments={
            'input_data_path': 's3://data/train.csv',
            'accuracy_threshold': 0.92
        }
    )
Click to examine closely
fast-sdxl artwork
fast sdxl

Data Versioning with DVC

Version control for datasets:

# Initialize DVC
dvc init

# Track dataset
dvc add data/train.csv
git add data/train.csv.dvc data/.gitignore
git commit -m "Add training data v1"

# Configure remote storage
dvc remote add -d myremote s3://my-bucket/dvc-storage
dvc push

# Create data pipeline
dvc run -n preprocess \
  -d data/raw.csv \
  -o data/processed.csv \
  python preprocess.py

# Reproduce pipeline
dvc repro
Click to examine closely

Continuous Training

Automatically retrain on new data:

import schedule
import time
from datetime import datetime

class ContinuousTrainer:
    """Automated retraining system"""
    def __init__(self, data_source, model_registry):
        self.data_source = data_source
        self.model_registry = model_registry
        self.last_training_time = None
        self.performance_history = []
    
    def check_data_drift(self):
        """Detect if retraining is needed"""
        from scipy.stats import ks_2samp
        
        # Load current production data vs new data
        prod_data = load_production_data()
        new_data = load_new_data(self.data_source)
        
        # Kolmogorov-Smirnov test for distribution shift
        for feature in prod_data.columns:
            statistic, pvalue = ks_2samp(prod_data[feature], new_data[feature])
            
            if pvalue < 0.05:  # Significant drift detected
                print(f"⚠️ Data drift detected in {feature} (p={pvalue:.4f})")
                return True
        
        return False
    
    def check_performance_degradation(self):
        """Monitor production model performance"""
        current_accuracy = get_production_accuracy()
        
        if len(self.performance_history) > 0:
            baseline_accuracy = self.performance_history[0]
            
            if current_accuracy < baseline_accuracy * 0.95:
                print(f"⚠️ Performance degradation: {current_accuracy:.2%} vs {baseline_accuracy:.2%}")
                return True
        
        self.performance_history.append(current_accuracy)
        return False
    
    def trigger_training(self):
        """Execute training pipeline"""
        print(f"🔄 Triggering retraining at {datetime.now()}")
        
        # Run Kubeflow pipeline
        client = kfp.Client(host='http://kubeflow.example.com')
        run = client.create_run_from_pipeline_func(ml_pipeline, arguments={})
        
        self.last_training_time = datetime.now()
        
        # Monitor pipeline execution
        run.wait_for_run_completion(timeout=3600)
        
        print("✅ Retraining complete")
    
    def scheduled_check(self):
        """Periodic check for retraining triggers"""
        print(f"Checking retraining conditions at {datetime.now()}")
        
        if self.check_data_drift() or self.check_performance_degradation():
            self.trigger_training()
        else:
            print("No retraining needed")

# Schedule daily checks
trainer = ContinuousTrainer(data_source='s3://bucket/data', model_registry='s3://bucket/models')
schedule.every().day.at("02:00").do(trainer.scheduled_check)

while True:
    schedule.run_pending()
    time.sleep(3600)
Click to examine closely

Warnings ⚠️

Pipeline Complexity: Multi-stage pipelines accumulate failure modes. The 2034 "Pipeline Cascade" occurred when 300-step ML pipelines became impossible to debug.

Hidden Dependencies: Data lineage tracking fails, causing silent data quality issues.

Automation Runaway: Continuous training without human oversight deployed progressively worse models for weeks before detection.

Related Chronicles: The MLOps Meltdown (2034) - Automated systems deploying broken models

Tools: Kubeflow, MLflow, DVC, Airflow, Prefect, Weights & Biases

Research: Continuous learning systems, online learning, model monitoring

AW
Alex Welcing
AI Product Expert
About
Discover related articles and explore the archive