Automating ML Workflows with SageMaker Pipelines: A Step-by-Step Guide

Introduction to SageMaker Pipelines

Amazon SageMaker Pipelines revolutionizes machine learning (ML) workflows by providing a purpose-built CI/CD service for automating data processing, model training, evaluation, and deployment. Unlike traditional scripting, SageMaker Pipelines offers:

  • Reproducibility: Track every pipeline execution with lineage metadata
  • Scalability: Serverless infrastructure auto-scales with your workload
  • Integration: Native compatibility with SageMaker services like Model Registry and Clarify

In this tutorial, we'll build a fraud detection pipeline using XGBoost, complete with code explanations for every step.

Prerequisites

Before starting, ensure you have:

  1. An AWS account with SageMaker access
  2. SageMaker Studio domain in US East (N. Virginia) region
  3. Basic Python and pandas knowledge

Step 1: Configuring SageMaker Studio

First, create a SageMaker Studio domain

Step 2: Initialize Pipeline Parameters

Create a new notebook with Python 3 kernel and import dependencies:

import boto3
  import sagemaker
  from sagemaker.workflow.parameters import (
      ParameterString, 
      ParameterInteger
  )
  from sagemaker.sklearn.processing import SKLearnProcessor
  
  # Initialize session and S3 paths
  session = sagemaker.Session()
  write_bucket = session.default_bucket()
  raw_data_path = ParameterString(name="RawData", default_value="s3://sagemaker-sample-files/datasets/tabular/claims.csv")
  processed_data_path = ParameterString(name="ProcessedData", default_value=f"s3://{write_bucket}/processed/")

Step 3: Data Processing Step

Create a processing job to clean and split data:

processor = SKLearnProcessor(
      framework_version="1.0-1",
      instance_type="ml.c5.xlarge",
      instance_count=1,
      base_job_name="fraud-process",
      role=sagemaker.get_execution_role()
  )
  
  process_step = ProcessingStep(
      name="ProcessData",
      processor=processor,
      inputs=[ProcessingInput(source=raw_data_path, destination="/opt/ml/processing/input")],
      outputs=[
          ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
          ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
      ],
      code="preprocessing.py"  # Custom script for feature engineering 
  )

Key parameters:

  • framework_version: Scikit-learn version
  • outputs: Automatic upload to S3 after processing

Step 4: Model Training Step

Configure XGBoost training using processed data:

from sagemaker.estimator import Estimator
  from sagemaker.inputs import TrainingInput
  
  xgboost_image = sagemaker.image_uris.retrieve("xgboost", region=session.boto_region_name, version="1.3-1")
  
  estimator = Estimator(
      image_uri=xgboost_image,
      instance_type="ml.m4.xlarge",
      instance_count=1,
      output_path=f"s3://{write_bucket}/models/",
      role=sagemaker.get_execution_role()
  )
  
  train_step = TrainingStep(
      name="TrainModel",
      estimator=estimator,
      inputs={
          "train": TrainingInput(
              s3_data=process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.s3_uri,
              content_type="text/csv"
          )
      }
  )

Step 5: Model Evaluation with SageMaker Clarify

from sagemaker.workflow.clarify_check_step import ClarifyCheckStep
  from sagemaker.clarify import BiasConfig, DataConfig
  
  data_config = DataConfig(
      s3_data_input_path=process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.s3_uri,
      label="fraud",
      headers=["age", "claim_amount", "fraud"]  # Your dataset columns
  )
  
  bias_config = BiasConfig(
      label_values_or_threshold=[1],  # Fraud=1
      facet_name="age",
      facet_values_or_threshold=[30]
  )
  
  clarify_step = ClarifyCheckStep(
      name="BiasCheck",
      clarify_check_config=data_config,
      bias_config=bias_config,
      check_job_config=CheckJobConfig(
          instance_type="ml.m4.xlarge",
          instance_count=1
      )
  )

Step 6: Assembling the Pipeline

from sagemaker.workflow.pipeline import Pipeline
  
  pipeline = Pipeline(
      name="FraudDetectionPipeline",
      parameters=[raw_data_path, processed_data_path],
      steps=[process_step, train_step, clarify_step],
      sagemaker_session=session
  )
  
  pipeline.upsert(role_arn=sagemaker.get_execution_role())
  execution = pipeline.start() 

Execution Flow:

  1. Data processing → Model training → Bias check (automatic sequence)
  2. View progress in SageMaker Studio's pipeline visualization

Step 7: Deployment & Monitoring

from sagemaker.workflow.model_step import ModelStep
  from sagemaker.model import Model
  
  model = Model(
      image_uri=xgboost_image,
      model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
      role=sagemaker.get_execution_role()
  )
  
  deploy_step = ModelStep(
      name="DeployModel",
      step_args=model.deploy(
          initial_instance_count=1,
          instance_type="ml.m4.xlarge",
          endpoint_name="fraud-detection-endpoint"
      )
  )

Add model monitoring using SageMaker Model Monitor:

from sagemaker.model_monitor import DefaultModelMonitor
  
  monitor = DefaultModelMonitor(
      role=sagemaker.get_execution_role(),
      instance_count=1,
      instance_type="ml.m4.xlarge",
      volume_size_in_gb=20,
      max_runtime_in_seconds=3600
  )
  
  monitor.suggest_baseline(
      baseline_dataset=process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.s3_uri,
      dataset_format=DatasetFormat.csv(header=True)
  ) 

Conclusion

This tutorial demonstrated how to:

  1. Configure SageMaker Studio for pipeline development
  2. Process data at scale using SKLearnProcessor
  3. Train models with built-in XGBoost containers
  4. Implement bias checks with SageMaker Clarify
  5. Deploy and monitor production endpoints

For advanced implementations, explore:

  • Custom containers for preprocessing
  • CI/CD integration with GitHub Actions
  • Multi-model pipelines

Category: AWS

Trending
Latest Articles