Amazon SageMaker is a fully managed service that provides every developer and data scientist with the ability to build, train, and deploy machine learning (ML) models quickly. SageMaker removes the heavy lifting from each step of the ML process to make it easier to develop high-quality ML artifacts. AWS Serverless Application Model (AWS SAM) is an open-source framework for building serverless applications. It provides shorthand syntax to express functions, APIs, databases, event source mappings, steps in AWS Step Functions, and more.
Generally, ML workflows orchestrate and automate sequences of ML tasks. A workflow includes data collection, training, testing, human evaluation of the ML model, and deployment of the models for inference.
For continuous integration and continuous delivery (CI/CD) pipelines, AWS recently released Amazon SageMaker Pipelines, the first purpose-built, easy-to-use CI/CD service for ML. Pipelines is a native workflow orchestration tool for building ML pipelines that takes advantage of direct SageMaker integration. For more information, see Building, automating, managing, and scaling ML workflows using Amazon SageMaker Pipelines.
In this post, I show you an extensible way to automate and deploy custom ML models using service integrations between Amazon SageMaker, Step Functions, and AWS SAM using a CI/CD pipeline.
To build this pipeline, you also need to be familiar with the following AWS services:
The solution has two main sections:
The following diagram describes the general overview of the MLOps CI/CD pipeline.
The workflow includes the following steps:
In this first section, you visualize the Step Functions ML workflow easily in Visual Studio Code and deploy it to the AWS environment using AWS SAM. You use some of the new features and service integrations such as support in AWS SAM for AWS Step Functions, native support in Step Functions for SageMaker integrations, and support in Step Functions to visualize workflows directly in VS Code.
Before getting started, make sure you complete the following prerequisites:
To get started, follow the instructions on GitHub to complete the application setup. Alternatively, you can switch to the terminal and enter the following command:
git clone https://github.com/aws-samples/sam-sf-sagemaker-workflow.git
The directory structure should be as follows:
. sam-sf-sagemaker-workflow
|– cfn
|—- sam-template.yaml
| — functions
| —- api_sagemaker_endpoint
| —- create_and_email_accept_reject_links
| —- respond_to_links
| —- update_sagemakerEndpoint_API
| — scripts
| — statemachine
| —- mlops.asl.json
The code has been broken down into subfolders with the main AWS SAM template residing in path cfn/sam-template.yaml.
The Step Functions workflows are stored in the folder statemachine/mlops.asl.json, and any other Lambda functions used are stored in functions folder.
To start with the AWS SAM template, run the following bash scripts from the root folder:
#Create S3 buckets if required before executing the commands. S3_BUCKET=bucket-mlops #bucket to store AWS SAM template S3_BUCKET_MODEL=ml-models #bucket to store ML models STACK_NAME=sam-sf-sagemaker-workflow #Name of the AWS SAM stack sam build -t cfn/sam-template.yaml #AWS SAM build sam deploy –template-file .aws-sam/build/template.yaml –stack-name ${STACK_NAME} –force-upload –s3-bucket ${S3_BUCKET} –s3-prefix sam –parameter-overrides S3ModelBucket=${S3_BUCKET_MODEL} –capabilities CAPABILITY_IAM
The sam build command builds all the functions and creates the final AWS CloudFormation template. The sam deploy command uploads the necessary files to the S3 bucket and starts creating or updating the CloudFormation template to create the necessary AWS infrastructure.
When the template has finished successfully, go to the CloudFormation console. On the Outputs tab, copy the MLOpsStateMachineArn value to use later.
The following diagram shows the workflow carried out in Step Functions, using VS Code integrations with Step Functions.
The following JSON based snippet of Amazon States Language describes the workflow visualized in the preceding diagram.
{ “Comment”: “This Step Function starts machine learning pipeline, once the custom model has been uploaded to ECR. Two parameters are expected by Step Functions are git commitID and the sagemaker ECR custom container URI”, “StartAt”: “SageMaker Create Training Job”, “States”: { “SageMaker Create Training Job”: { “Type”: “Task”, “Resource”: “arn:aws:states:::sagemaker:createTrainingJob.sync”, “Parameters”: { “TrainingJobName.$”: “$.commitID”, “ResourceConfig”: { “InstanceCount”: 1, “InstanceType”: “ml.c4.2xlarge”, “VolumeSizeInGB”: 20 }, “HyperParameters”: { “mode”: “batch_skipgram”, “epochs”: “5”, “min_count”: “5”, “sampling_threshold”: “0.0001”, “learning_rate”: “0.025”, “window_size”: “5”, “vector_dim”: “300”, “negative_samples”: “5”, “batch_size”: “11” }, “AlgorithmSpecification”: { “TrainingImage.$”: “$.imageUri”, “TrainingInputMode”: “File” }, “OutputDataConfig”: { “S3OutputPath”: “s3://${S3ModelBucket}/output” }, “StoppingCondition”: { “MaxRuntimeInSeconds”: 100000 }, “RoleArn”: “${SagemakerRoleArn}”, “InputDataConfig”: [ { “ChannelName”: “training”, “DataSource”: { “S3DataSource”: { “S3DataType”: “S3Prefix”, “S3Uri”: “s3://${S3ModelBucket}/iris.csv”, “S3DataDistributionType”: “FullyReplicated” } } } ] }, “Retry”: [ { “ErrorEquals”: [ “SageMaker.AmazonSageMakerException” ], “IntervalSeconds”: 1, “MaxAttempts”: 1, “BackoffRate”: 1.1 }, { “ErrorEquals”: [ “SageMaker.ResourceLimitExceededException” ], “IntervalSeconds”: 60, “MaxAttempts”: 1, “BackoffRate”: 1 } ], “Catch”: [ { “ErrorEquals”: [ “States.ALL” ], “ResultPath”: “$.cause”, “Next”: “FailState” } ], “Next”: “SageMaker Create Model” }, “SageMaker Create Model”: { “Type”: “Task”, “Resource”: “arn:aws:states:::sagemaker:createModel”, “Parameters”: { “ExecutionRoleArn”: “${SagemakerRoleArn}”, “ModelName.$”: “$.TrainingJobName”, “PrimaryContainer”: { “ModelDataUrl.$”: “$.ModelArtifacts.S3ModelArtifacts”, “Image.$”: “$.AlgorithmSpecification.TrainingImage” } }, “ResultPath”: “$.taskresult”, “Next”: “SageMaker Create Transform Job”, “Catch”: [ { “ErrorEquals”: [“States.ALL” ], “Next”: “FailState” } ] }, “SageMaker Create Transform Job”: { “Type”: “Task”, “Resource”: “arn:aws:states:::sagemaker:createTransformJob.sync”, “Parameters”: { “ModelName.$”: “$.TrainingJobName”, “TransformInput”: { “SplitType”: “Line”, “CompressionType”: “None”, “ContentType”: “text/csv”, “DataSource”: { “S3DataSource”: { “S3DataType”: “S3Prefix”, “S3Uri”: “s3://${S3ModelBucket}/iris.csv” } } }, “TransformOutput”: { “S3OutputPath.$”: “States.Format(‘s3://${S3ModelBucket}/transform_output/{}/iris.csv’, $.TrainingJobName)” , “AssembleWith”: “Line”, “Accept”: “text/csv” }, “DataProcessing”: { “InputFilter”: “$[1:]” }, “TransformResources”: { “InstanceCount”: 1, “InstanceType”: “ml.m4.xlarge” }, “TransformJobName.$”: “$.TrainingJobName” }, “ResultPath”: “$.result”, “Next”: “Send Approve/Reject Email Request”, “Catch”: [ { “ErrorEquals”: [ “States.ALL” ], “Next”: “FailState” } ] }, “Send Approve/Reject Email Request”: { “Type”: “Task”, “Resource”: “arn:aws:states:::lambda:invoke.waitForTaskToken”, “Parameters”: { “FunctionName”: “${CreateAndEmailLinkFnName}”, “Payload”: { “token.$”:”$$.Task.Token”, “s3_batch_output.$”:”$.result.TransformOutput.S3OutputPath” } }, “ResultPath”: “$.output”, “Next”: “Sagemaker Create Endpoint Config”, “Catch”: [ { “ErrorEquals”: [ “rejected” ], “ResultPath”: “$.output”, “Next”: “FailState” } ] }, “Sagemaker Create Endpoint Config”: { “Type”: “Task”, “Resource”: “arn:aws:states:::sagemaker:createEndpointConfig”, “Parameters”: { “EndpointConfigName.$”: “$.TrainingJobName”, “ProductionVariants”: [ { “InitialInstanceCount”: 1, “InitialVariantWeight”: 1, “InstanceType”: “ml.t2.medium”, “ModelName.$”: “$.TrainingJobName”, “VariantName”: “AllTraffic” } ] }, “ResultPath”: “$.result”, “Next”: “Sagemaker Create Endpoint”, “Catch”: [ { “ErrorEquals”: [ “States.ALL” ], “Next”: “FailState” } ] }, “Sagemaker Create Endpoint”: { “Type”: “Task”, “Resource”: “arn:aws:states:::sagemaker:createEndpoint”, “Parameters”: { “EndpointName.$”: “$.TrainingJobName”, “EndpointConfigName.$”: “$.TrainingJobName” }, “Next”: “Send Email With API Endpoint”, “Catch”: [ { “ErrorEquals”: [ “States.ALL” ], “Next”: “FailState” } ] }, “Send Email With API Endpoint”: { “Type”: “Task”, “Resource”: “${UpdateSagemakerEndpointAPI}”, “Catch”: [ { “ErrorEquals”: [ “States.ALL” ], “Next”: “FailState” } ], “Next”: “SuccessState” }, “SuccessState”: { “Type”: “Succeed” }, “FailState”: { “Type”: “Fail” } } }
In this section, we discuss the detailed steps involved in creating the SageMaker workflow using Step Functions.
Step Functions uses the commit ID passed by CodePipeline as a unique identifier to create a SageMaker training job. The training job can sometimes take a long time to complete; to wait for the job, you use .sync while specifying the resource section of the SageMaker training job.
When the training job is complete, Step Functions creates a model and saves the model in an S3 bucket.
Step Functions then uses a batch transform step to evaluate and test the model, based on batch data initially provided by the data scientist in an S3 bucket. When the evaluation step is complete, the output is stored in an S3 bucket.
Step Functions then enters a manual approval stage. To create this state, you use callback URLs. To implement this state in Step Functions, use .waitForTaskToken while calling a Lambda resource and pass a token to the Lambda function.
The Lambda function uses Amazon SNS or Amazon Simple Email Service (Amazon SES) to send an email to the subscribed party. You need to add your email address to the SNS topic to receive the accept/reject email while testing.
You receive an email, as in the following screenshot, with links to the data stored in the S3 bucket. This data has been batch transformed using the custom ML model created in the earlier step by SageMaker. You can choose Accept or Reject based on your findings.
If you choose Reject, Step Functions stops running the workflow. If you’re satisfied with the results, choose Accept, which triggers the API link. This link passes the embedded token and type to the API Gateway or Lambda endpoint as request parameters to progress to the next Step Functions step.
See the following Python code:
import json import boto3 sf = boto3.client(‘stepfunctions’) def lambda_handler(event, context): type= event.get(‘queryStringParameters’).get(‘type’) token= event.get(‘queryStringParameters’).get(‘token’) if type ==’success’: sf.send_task_success( taskToken=token, output=”{}” ) else: sf.send_task_failure( taskToken=token ) return { ‘statusCode’: 200, ‘body’: json.dumps(‘Responded to Step Function’) }
Step Functions then creates the final unique SageMaker endpoint configuration and inference endpoint. You can achieve this in Lambda code using special resource values, as shown in the following screenshot.
When the SageMaker endpoint is ready, an email is sent to the subscriber with a link to the API of the SageMaker inference endpoint.
In this section, you use the CI/CD pipeline to deploy a custom ML model.
The pipeline starts its run as soon as it detects updates to the source code of the custom model. The pipeline downloads the source code from the repository, builds and tags the Docker image, and uploads the Docker image to Amazon ECR. After uploading the Docker image, the pipeline triggers the Step Functions workflow to train and deploy the custom model to SageMaker. Finally, the pipeline sends an email to the specified users with details about the SageMaker inference endpoint.
We use Scikit Bring Your Own Container to build a custom container image and use the iris dataset to train and test the model.
When your Step Functions workflow is ready, build your full pipeline using the code provided in the GitHub repo.
After you download the code from the repo, the directory structure should look like the following:
. codepipeline-ecr-build-sf-execution
| — cfn
| —- params.json
| —- pipeline-cfn.yaml
| — container
| —- descision_trees
| —- local_test
| —- .dockerignore
| —- Dockerfile
| — scripts
In the params.json file in folder /cfn, provide in your GitHub token, repo name, the ARN of the Step Function state machine you created earlier.
You now create the necessary services and resources for the CI/CD pipeline. To create the CloudFormation stack, run the following code:
aws cloudformation create-stack –stack-name codepipeline-ecr-build-sf-execution –template-body file://cfn/pipeline-cfn.yaml –parameters file://cfn/params.json –capabilities CAPABILITY_NAMED_IAM
Alternatively, to update the stack, run the following code:
aws cloudformation update-stack –stack-name codepipeline-ecr-build-sf-execution –template-body file://cfn/pipeline-cfn.yaml –parameters file://cfn/params.json –capabilities CAPABILITY_NAMED_IAM
The CloudFormation template deploys a CodePipeline pipeline into your AWS account. The pipeline starts running as soon as code changes are committed to the repo. After the source code is downloaded by the pipeline stage, CodeBuild creates a Docker image and tags it with the commit ID and current timestamp before pushing the image to Amazon ECR. CodePipeline moves to the next stage to trigger a Step Functions step (which you created earlier).
When Step Functions is complete, a final email is generated with a link to the API Gateway URL that references the newly created SageMaker inference endpoint.
To test your workflow, complete the following steps:
When the pipeline reaches its final state, it starts the Step Functions workflow, which sends an email for approval.
When the SageMaker endpoint is ready, you should receive another email with a link to the API inference endpoint.
To test the iris dataset, you can try sending a single data point to the inference endpoint.
INFERENCE_ENDPOINT=https://XXXX.execute-api.us-east-1.amazonaws.com/v1/invokeSagemakerAPI?sagemaker_endpoint=d236eba5-09-03-2020-18-29-15 curl –location –request POST ${INFERENCE_ENDPOINT} –header ‘Content-Type: application/json’ –data-raw ‘{ “data”: “4.5,1.3,0.3,0.3” }’ {“result”: “setosa”} curl –location –request POST ${INFERENCE_ENDPOINT} –header ‘Content-Type: application/json’ –data-raw ‘{ “data”: “5.9,3,5.1,1.8” }’ {“result”: “virginica”}
By sending different data, we get different sets of inference results back.
To avoid ongoing charges, delete the resources created in the previous steps by deleting the CloudFormation templates. Additionally, on the SageMaker console, delete any unused models, endpoint configurations, and inference endpoints.
This post demonstrated how to create an ML pipeline for custom SageMaker ML models using some of the latest AWS service integrations.
You can extend this ML pipeline further by adding a layer of authentication and encryption while sending approval links. You can also add more steps to CodePipeline or Step Functions as deemed necessary for your project’s workflow.
The sample files are available in the GitHub repo. To explore related features of SageMaker and further reading, see the following:
Sachin Doshi is a Senior Application Architect working in the AWS Professional Services team. He is based out of New York metropolitan area. Sachin helps customers optimize their applications using cloud native AWS services.