Amazon SageMaker Pipelines allows data scientists and machine learning (ML) engineers to automate training workflows, which helps you create a repeatable process to orchestrate model development steps for rapid experimentation and model retraining. You can automate the entire model build workflow, including data preparation, feature engineering, model training, model tuning, and model validation, and catalog it in the model registry. You can configure pipelines to run automatically at regular intervals or when certain events are triggered, or you can run them manually as needed.
In this post, we highlight some of the enhancements to the Amazon SageMaker SDK and introduce new features of Amazon SageMaker Pipelines that make it easier for ML practitioners to build and train ML models.
Pipelines continues to innovate its developer experience, and with these recent releases, you can now use the service in a more customized way:
In this post, we walk you through a workflow using a sample dataset with a focus on model building and deployment to demonstrate how to implement Pipelines’s new features. By the end, you should have enough information to successfully use these newer features and simplify your ML workloads.
Pipelines offers the following new features:
In this solution, your entry point is the Amazon SageMaker Studio integrated development environment (IDE) for rapid experimentation. Studio offers an environment to manage the end-to-end Pipelines experience. With Studio, you can bypass the AWS Management Console for your entire workflow management. For more information on managing Pipelines from within Studio, refer to View, Track, and Execute SageMaker Pipelines in SageMaker Studio.
The following diagram illustrates the high-level architecture of the ML workflow with the different steps to train and generate inferences using the new features.
The pipeline includes the following steps:
To follow along with this post, you need an AWS account with a Studio domain.
Pipelines is integrated directly with SageMaker entities and resources, so you don’t need to interact with any other AWS services. You also don’t need to manage any resources because it’s a fully managed service, which means that it creates and manages resources for you. For more information on the various SageMaker components that are both standalone Python APIs along with integrated components of Studio, see the SageMaker product page.
Before getting started, install SageMaker SDK version >= 2.104.0 and xlrd >=1.0.0 within the Studio notebook using the following code snippet:
print(sagemaker.__version__) import sys !{sys.executable} -m pip install “sagemaker>=2.104.0” !{sys.executable} -m pip install “xlrd >=1.0.0” import sagemaker
For this post, you use the following components:
A SageMaker pipeline is a series of interconnected steps defined by a JSON pipeline definition. It encodes a pipeline using a directed acyclic graph (DAG). The DAG gives information on the requirements for and relationships between each step of the pipeline, and its structure is determined by the data dependencies between steps. These dependencies are created when the properties of a step’s output are passed as the input to another step.
The following diagram illustrates the different steps in the SageMaker pipeline (for a churn prediction use case) where the connections between the steps are inferred by SageMaker based on the inputs and outputs defined by the step definitions.
The next sections walk through creating each step of the pipeline and running the entire pipeline once created.
Let’s start with the project structure:
To follow along with this post, you need to download and save the sample dataset under the data folder within the project home directory, which saves the file in Amazon Elastic File System (Amazon EFS) within the Studio environment.
Now you’re ready to build the pipeline components.
Create a Studio notebook called sagemaker-pipelines-project.ipynb within the project home directory. Enter the following code block in a cell, and run the cell to set up SageMaker and S3 client objects, create PipelineSession, and set up the S3 bucket location using the default bucket that comes with a SageMaker session:
import boto3 import pandas as pd import sagemaker from sagemaker.workflow.pipeline_context import PipelineSession s3_client = boto3.resource(‘s3’) pipeline_name = f”ChurnModelPipeline” sagemaker_session = sagemaker.session.Session() region = sagemaker_session.boto_region_name role = sagemaker.get_execution_role() pipeline_session = PipelineSession() default_bucket = sagemaker_session.default_bucket() model_package_group_name = f”ChurnModelPackageGroup”
Pipelines supports parameterization, which allows you to specify input parameters at runtime without changing your pipeline code. You can use the modules available under the sagemaker.workflow.parameters module, such as ParameterInteger, ParameterFloat, and ParameterString, to specify pipeline parameters of various data types. Run the following code to set up multiple input parameters:
from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ParameterFloat, ) auc_score_threshold = 0.75 base_job_prefix = “churn-example” model_package_group_name = “churn-job-model-packages” batch_data = “s3://{}/data/batch/batch.csv”.format(default_bucket) processing_instance_count = ParameterInteger( name=”ProcessingInstanceCount”, default_value=1 ) processing_instance_type = ParameterString( name=”ProcessingInstanceType”, default_value=”ml.m5.xlarge” ) training_instance_type = ParameterString( name=”TrainingInstanceType”, default_value=”ml.m5.xlarge” ) input_data = ParameterString( name=”InputData”, default_value=”s3://{}/data/storedata_total.csv”.format(default_bucket), ) model_approval_status = ParameterString( name=”ModelApprovalStatus”, default_value=”PendingManualApproval” )
Generate the batch dataset, which you use later in the batch transform step:
def preprocess_batch_data(file_path): df = pd.read_csv(file_path) ## Convert to datetime columns df[“firstorder”]=pd.to_datetime(df[“firstorder”],errors=’coerce’) df[“lastorder”] = pd.to_datetime(df[“lastorder”],errors=’coerce’) ## Drop Rows with null values df = df.dropna() ## Create Column which gives the days between the last order and the first order df[“first_last_days_diff”] = (df[‘lastorder’]-df[‘firstorder’]).dt.days ## Create Column which gives the days between when the customer record was created and the first order df[‘created’] = pd.to_datetime(df[‘created’]) df[‘created_first_days_diff’]=(df[‘created’]-df[‘firstorder’]).dt.days ## Drop Columns df.drop([‘custid’,’created’,’firstorder’,’lastorder’],axis=1,inplace=True) ## Apply one hot encoding on favday and city columns df = pd.get_dummies(df,prefix=[‘favday’,’city’],columns=[‘favday’,’city’]) return df # convert the store_data file into csv format store_data = pd.read_excel(“data/storedata_total.xlsx”) store_data.to_csv(“data/storedata_total.csv”) # preprocess batch data and save into the data folder batch_data = preprocess_batch_data(“data/storedata_total.csv”) batch_data.pop(“retained”) batch_sample = batch_data.sample(frac=0.2) pd.DataFrame(batch_sample).to_csv(“data/batch.csv”,header=False,index=False)
Upload the datasets to Amazon S3:
s3_client.Bucket(default_bucket).upload_file(“data/batch.csv”,”data/batch/batch.csv”) s3_client.Bucket(default_bucket).upload_file(“data/storedata_total.csv”,”data/storedata_total.csv”)
In this step, you prepare a Python script to do feature engineering, one hot encoding, and curate the training, validation, and test splits to be used for model building. Run the following code to build your processing script:
%%writefile pipelines/customerchurn/preprocess.py import os import tempfile import numpy as np import pandas as pd import datetime as dt if __name__ == “__main__”: base_dir = “/opt/ml/processing” #Read Data df = pd.read_csv( f”{base_dir}/input/storedata_total.csv” ) # convert created column to datetime df[“created”] = pd.to_datetime(df[“created”]) #Convert firstorder and lastorder to datetime datatype df[“firstorder”] = pd.to_datetime(df[“firstorder”],errors=’coerce’) df[“lastorder”] = pd.to_datetime(df[“lastorder”],errors=’coerce’) #Drop Rows with Null Values df = df.dropna() #Create column which gives the days between the last order and the first order df[‘first_last_days_diff’] = (df[‘lastorder’] – df[‘firstorder’]).dt.days #Create column which gives the days between the customer record was created and the first order df[‘created_first_days_diff’] = (df[‘created’] – df[‘firstorder’]).dt.days #Drop columns df.drop([‘custid’, ‘created’,’firstorder’,’lastorder’], axis=1, inplace=True) #Apply one hot encoding on favday and city columns df = pd.get_dummies(df, prefix=[‘favday’, ‘city’], columns=[‘favday’, ‘city’]) # Split into train, validation and test datasets y = df.pop(“retained”) X_pre = df y_pre = y.to_numpy().reshape(len(y), 1) X = np.concatenate((y_pre, X_pre), axis=1) np.random.shuffle(X) # Split in Train, Test and Validation Datasets train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))]) train_rows = np.shape(train)[0] validation_rows = np.shape(validation)[0] test_rows = np.shape(test)[0] train = pd.DataFrame(train) test = pd.DataFrame(test) validation = pd.DataFrame(validation) # Convert the label column to integer train[0] = train[0].astype(int) test[0] = test[0].astype(int) validation[0] = validation[0].astype(int) # Save the Dataframes as csv files train.to_csv(f”{base_dir}/train/train.csv”, header=False, index=False) validation.to_csv(f”{base_dir}/validation/validation.csv”, header=False, index=False) test.to_csv(f”{base_dir}/test/test.csv”, header=False, index=False)
Next, run the following code block to instantiate the processor and the Pipelines step to run the processing script. Because the processing script is written in Pandas, you use a SKLearnProcessor. The Pipelines ProcessingStep function takes the following arguments: the processor, the input S3 locations for raw datasets, and the output S3 locations to save processed datasets.
# Upload processing script to S3 s3_client.Bucket(default_bucket).upload_file(“pipelines/customerchurn/preprocess.py”,”input/code/preprocess.py”) # Define Processing Step for Feature Engineering from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep framework_version = “1.0-1″sklearn_processor = SKLearnProcessor( framework_version=framework_version, instance_type=”ml.m5.xlarge”, instance_count=processing_instance_count, base_job_name=”sklearn-churn-process”, role=role, sagemaker_session=pipeline_session, ) processor_args = sklearn_processor.run( inputs=[ ProcessingInput(source=input_data, destination=”/opt/ml/processing/input”), ], outputs=[ ProcessingOutput(output_name=”train”, source=”/opt/ml/processing/train”, destination=f”s3://{default_bucket}/output/train” ), ProcessingOutput(output_name=”validation”, source=”/opt/ml/processing/validation”, destination=f”s3://{default_bucket}/output/validation”), ProcessingOutput(output_name=”test”, source=”/opt/ml/processing/test”, destination=f”s3://{default_bucket}/output/test”) ], code=f”s3://{default_bucket}/input/code/preprocess.py”, ) step_process = ProcessingStep(name=”ChurnModelProcess”, step_args=processor_args)
Set up model training using a SageMaker XGBoost estimator and the Pipelines TrainingStep function:
from sagemaker.estimator import Estimator from sagemaker.inputs import TrainingInput model_path = f”s3://{default_bucket}/output” image_uri = sagemaker.image_uris.retrieve( framework=”xgboost”, region=region, version=”1.0-1″, py_version=”py3″, instance_type=”ml.m5.xlarge”, ) xgb_train = Estimator( image_uri=image_uri, instance_type=training_instance_type, instance_count=1, output_path=model_path, role=role, sagemaker_session=pipeline_session, ) xgb_train.set_hyperparameters( objective=”reg:linear”, num_round=50, max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.7, ) train_args = xgb_train.fit( inputs={ “train”: TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ “train” ].S3Output.S3Uri, content_type=”text/csv”, ), “validation”: TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ “validation” ].S3Output.S3Uri, content_type=”text/csv”, ), }, ) from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep step_train = TrainingStep( name=”ChurnModelTrain”, step_args=train_args, )
Run the following code block to evaluate the model once trained. This script encapsulates the logic to check if the AUC score meets the specified threshold.
%%writefile pipelines/customerchurn/evaluate.py import json import pathlib import pickle import tarfile import joblib import numpy as np import pandas as pd import xgboost import datetime as dt from sklearn.metrics import roc_curve,auc if __name__ == “__main__”: #Read Model Tar File model_path = f”/opt/ml/processing/model/model.tar.gz” with tarfile.open(model_path) as tar: tar.extractall(path=”.”) model = pickle.load(open(“xgboost-model”, “rb”)) #Read Test Data using which we evaluate the model test_path = “/opt/ml/processing/test/test.csv” df = pd.read_csv(test_path, header=None) y_test = df.iloc[:, 0].to_numpy() df.drop(df.columns[0], axis=1, inplace=True) X_test = xgboost.DMatrix(df.values) #Run Predictions predictions = model.predict(X_test) #Evaluate Predictions fpr, tpr, thresholds = roc_curve(y_test, predictions) auc_score = auc(fpr, tpr) report_dict = { “classification_metrics”: { “auc_score”: { “value”: auc_score, }, }, } #Save Evaluation Report output_dir = “/opt/ml/processing/evaluation” pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) evaluation_path = f”{output_dir}/evaluation.json” with open(evaluation_path, “w”) as f: f.write(json.dumps(report_dict))
Next, run the following code block to instantiate the processor and the Pipelines step to run the evaluation script. Because the evaluation script uses the XGBoost package, you use a ScriptProcessor along with the XGBoost image. The Pipelines ProcessingStep function takes the following arguments: the processor, the input S3 locations for raw datasets, and the output S3 locations to save processed datasets.
#Upload the evaluation script to S3 s3_client.Bucket(default_bucket).upload_file(“pipelines/customerchurn/evaluate.py”,”input/code/evaluate.py”) from sagemaker.processing import ScriptProcessor # define model evaluation step to evaluate the trained model script_eval = ScriptProcessor( image_uri=image_uri, command=[“python3″], instance_type=processing_instance_type, instance_count=1, base_job_name=”script-churn-eval”, role=role, sagemaker_session=pipeline_session, ) eval_args = script_eval.run( inputs=[ ProcessingInput( source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination=”/opt/ml/processing/model”, ), ProcessingInput( source=step_process.properties.ProcessingOutputConfig.Outputs[“test”].S3Output.S3Uri, destination=”/opt/ml/processing/test”, ), ], outputs=[ ProcessingOutput(output_name=”evaluation”, source=”/opt/ml/processing/evaluation”, destination=f”s3://{default_bucket}/output/evaluation”), ], code=f”s3://{default_bucket}/input/code/evaluate.py”, ) from sagemaker.workflow.properties import PropertyFile evaluation_report = PropertyFile( name=”ChurnEvaluationReport”, output_name=”evaluation”, path=”evaluation.json” ) step_eval = ProcessingStep( name=”ChurnEvalModel”, step_args=eval_args, property_files=[evaluation_report], )
Run the following code block to create a SageMaker model using the Pipelines model step. This step utilizes the output of the training step to package the model for deployment. Note that the value for the instance type argument is passed using the Pipelines parameter you defined earlier in the post.
from sagemaker import Model from sagemaker.inputs import CreateModelInput from sagemaker.workflow.model_step import ModelStep # step to create model model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=pipeline_session, role=role, ) step_create_model = ModelStep( name=”ChurnCreateModel”, step_args=model.create(instance_type=”ml.m5.large”, accelerator_type=”ml.eia1.medium”), )
Run the following code block to run batch transformation using the trained model with the batch input created in the first step:
from sagemaker.transformer import Transformer from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep transformer = Transformer( model_name=step_create_model.properties.ModelName, instance_type=”ml.m5.xlarge”, instance_count=1, output_path=f”s3://{default_bucket}/ChurnTransform”, sagemaker_session=pipeline_session ) step_transform = TransformStep( name=”ChurnTransform”, step_args=transformer.transform( data=batch_data, content_type=”text/csv” ) )
The following code registers the model within the SageMaker model registry using the Pipelines model step:
model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=pipeline_session, role=role, ) from sagemaker.model_metrics import MetricsSource, ModelMetrics model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri=”{}/evaluation.json”.format( step_eval.arguments[“ProcessingOutputConfig”][“Outputs”][0][“S3Output”][“S3Uri”] ), content_type=”application/json”, ) ) register_args = model.register( content_types=[“text/csv”], response_types=[“text/csv”], inference_instances=[“ml.t2.medium”, “ml.m5.xlarge”], transform_instances=[“ml.m5.xlarge”], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics, ) step_register = ModelStep(name=”ChurnRegisterModel”, step_args=register_args)
The following code defines the Pipelines fail step to stop the pipeline run with an error message if the AUC score doesn’t meet the defined threshold:
from sagemaker.workflow.fail_step import FailStep from sagemaker.workflow.functions import Join step_fail = FailStep( name=”ChurnAUCScoreFail”, error_message=Join(on=” “, values=[“Execution failed due to AUC Score >”, auc_score_threshold]), )
The following code defines a condition step to check the AUC score and conditionally create a model and run a batch transformation and register a model in the model registry, or stop the pipeline run in a failed state:
from sagemaker.workflow.conditions import ConditionGreaterThan from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionGreaterThan( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path=”classification_metrics.auc_score.value”, ), right=auc_score_threshold, ) step_cond = ConditionStep( name=”CheckAUCScoreChurnEvaluation”, conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[step_fail], )
After defining all of the component steps, you can assemble them into a Pipelines object. You don’t need to specify the order of pipeline because Pipelines automatically infers the order sequence based on the dependencies between the steps.
import json from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, processing_instance_type, training_instance_type, model_approval_status, input_data, batch_data, auc_score_threshold, ], steps=[step_process, step_train, step_eval, step_cond], ) definition = json.loads(pipeline.definition()) print(definition)
Run the following code in a cell in your notebook. If the pipeline already exists, the code updates the pipeline. If the pipeline doesn’t exist, it creates a new one.
pipeline.start() # Create a new or update existing Pipeline pipeline.upsert(role_arn=sagemaker_role) # start Pipeline execution
In this post, we introduced some of the new features now available with Pipelines along with other built-in SageMaker features and the XGBoost algorithm to develop, iterate, and deploy a model for churn prediction. The solution can be extended with additional data sources
to implement your own ML workflow. For more details on the steps available in the Pipelines workflow, refer to Amazon SageMaker Model Building Pipeline and SageMaker Workflows. The AWS SageMaker Examples GitHub repo has more examples around various use cases using Pipelines.
Jerry Peng is a software development engineer with AWS SageMaker. He focuses on building end-to-end large-scale MLOps system from training to model monitoring in production. He is also passionate about bringing the concept of MLOps to broader audience.
Dewen Qi is a Software Development Engineer in AWS. She currently focuses on developing and improving SageMaker Pipelines. Outside of work, she enjoys practicing Cello.
Gayatri Ghanakota is a Sr. Machine Learning Engineer with AWS Professional Services. She is passionate about developing, deploying, and explaining AI/ ML solutions across various domains. Prior to this role, she led multiple initiatives as a data scientist and ML engineer with top global firms in the financial and retail space. She holds a master’s degree in Computer Science specialized in Data Science from the University of Colorado, Boulder.
Rupinder Grewal is a Sr Ai/ML Specialist Solutions Architect with AWS. He currently focuses on serving of models and MLOps on SageMaker. Prior to this role he has worked as Machine Learning Engineer building and hosting models. Outside of work he enjoys playing tennis and biking on mountain trails.
Ray Li is a Sr. Data Scientist with AWS Professional Services. His specialty focuses on building and operationalizing AI/ML solutions for customers of varying sizes, ranging from startups to enterprise organizations. Outside of work, Ray enjoys fitness and traveling.