Amazon SageMaker Pipelines revolutionizes machine learning (ML) workflows by providing a purpose-built CI/CD service for automating data processing, model training, evaluation, and deployment. Unlike traditional scripting, SageMaker Pipelines offers:
In this tutorial, we'll build a fraud detection pipeline using XGBoost, complete with code explanations for every step.
Before starting, ensure you have:
First, create a SageMaker Studio domain
Create a new notebook with Python 3 kernel and import dependencies:
import boto3
import sagemaker
from sagemaker.workflow.parameters import (
ParameterString,
ParameterInteger
)
from sagemaker.sklearn.processing import SKLearnProcessor
# Initialize session and S3 paths
session = sagemaker.Session()
write_bucket = session.default_bucket()
raw_data_path = ParameterString(name="RawData", default_value="s3://sagemaker-sample-files/datasets/tabular/claims.csv")
processed_data_path = ParameterString(name="ProcessedData", default_value=f"s3://{write_bucket}/processed/")
Create a processing job to clean and split data:
processor = SKLearnProcessor(
framework_version="1.0-1",
instance_type="ml.c5.xlarge",
instance_count=1,
base_job_name="fraud-process",
role=sagemaker.get_execution_role()
)
process_step = ProcessingStep(
name="ProcessData",
processor=processor,
inputs=[ProcessingInput(source=raw_data_path, destination="/opt/ml/processing/input")],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
],
code="preprocessing.py" # Custom script for feature engineering
)
Key parameters:
framework_version
: Scikit-learn versionoutputs
: Automatic upload to S3 after processingConfigure XGBoost training using processed data:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
xgboost_image = sagemaker.image_uris.retrieve("xgboost", region=session.boto_region_name, version="1.3-1")
estimator = Estimator(
image_uri=xgboost_image,
instance_type="ml.m4.xlarge",
instance_count=1,
output_path=f"s3://{write_bucket}/models/",
role=sagemaker.get_execution_role()
)
train_step = TrainingStep(
name="TrainModel",
estimator=estimator,
inputs={
"train": TrainingInput(
s3_data=process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.s3_uri,
content_type="text/csv"
)
}
)
from sagemaker.workflow.clarify_check_step import ClarifyCheckStep
from sagemaker.clarify import BiasConfig, DataConfig
data_config = DataConfig(
s3_data_input_path=process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.s3_uri,
label="fraud",
headers=["age", "claim_amount", "fraud"] # Your dataset columns
)
bias_config = BiasConfig(
label_values_or_threshold=[1], # Fraud=1
facet_name="age",
facet_values_or_threshold=[30]
)
clarify_step = ClarifyCheckStep(
name="BiasCheck",
clarify_check_config=data_config,
bias_config=bias_config,
check_job_config=CheckJobConfig(
instance_type="ml.m4.xlarge",
instance_count=1
)
)
from sagemaker.workflow.pipeline import Pipeline
pipeline = Pipeline(
name="FraudDetectionPipeline",
parameters=[raw_data_path, processed_data_path],
steps=[process_step, train_step, clarify_step],
sagemaker_session=session
)
pipeline.upsert(role_arn=sagemaker.get_execution_role())
execution = pipeline.start()
Execution Flow:
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model import Model
model = Model(
image_uri=xgboost_image,
model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
role=sagemaker.get_execution_role()
)
deploy_step = ModelStep(
name="DeployModel",
step_args=model.deploy(
initial_instance_count=1,
instance_type="ml.m4.xlarge",
endpoint_name="fraud-detection-endpoint"
)
)
Add model monitoring using SageMaker Model Monitor:
from sagemaker.model_monitor import DefaultModelMonitor
monitor = DefaultModelMonitor(
role=sagemaker.get_execution_role(),
instance_count=1,
instance_type="ml.m4.xlarge",
volume_size_in_gb=20,
max_runtime_in_seconds=3600
)
monitor.suggest_baseline(
baseline_dataset=process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.s3_uri,
dataset_format=DatasetFormat.csv(header=True)
)
This tutorial demonstrated how to:
For advanced implementations, explore:
Category: AWS
Similar Articles