Launched at AWS re:Invent 2020, Amazon SageMaker Pipelines is the first purpose-built, easy-to-use continuous integration and continuous delivery (CI/CD) service for machine learning (ML). With Pipelines, you can create, automate, and manage end-to-end ML workflows at scale.
You can extend your pipelines to include steps for tasks performed outside of Amazon SageMaker by taking advantage of custom callback steps. This feature lets you include tasks that are performed using other AWS services, third parties, or tasks run outside AWS. Before the launch of this feature, steps within a pipeline were limited to the supported native SageMaker steps. With the launch of this new feature, you can use the new CallbackStep to generate a token and add a message to an Amazon Simple Queue Service (Amazon SQS) queue. The message on the SQS queue triggers a task outside of the currently supported native steps. When that task is complete, you can call the new SendStepSuccess API with the generated token to signal that the callback step and corresponding tasks are finished and the pipeline run can continue.
In this post, we demonstrate how to use CallbackStep to perform data preprocessing using AWS Glue. We use an Apache Spark job to prepare NYC taxi data for ML training. The raw data has one row per taxi trip, and shows information like the trip duration, number of passengers, and trip cost. To train an anomaly detection model, we want to transform the raw data into a count of the number of passengers that took taxi rides over 30-minute intervals.
Although we could run this specific Spark job in SageMaker Processing, we use AWS Glue for this post. In some cases, we may need capabilities that Amazon EMR or AWS Glue offer, like support for Hive queries or integration with the AWS Glue metadata catalog, so we demonstrate how to invoke AWS Glue from the pipeline.
The pipeline step that launches the AWS Glue job sends a message to an SQS queue. The message contains the callback token we need to send success or failure information back to the pipeline. This callback token triggers the next step in the pipeline. When handling this message, we need a handler that can launch the AWS Glue job and reliably check for job status until the job completes. We have to keep in mind that a Spark job can easily take longer than 15 minutes (the maximum duration of a single AWS Lambda function invocation), and the Spark job itself could fail for a number of reasons. That last point is worth emphasizing: in most Apache Spark runtimes, the job code itself runs in transient containers under the control of a coordinator like Apache YARN. We can’t add custom code to YARN, so we need something outside the job to check for completion.
We can accomplish this task several ways:
For this post, we use the first technique because it’s the simplest (but likely not the most efficient). For this, we build out the solution as shown in the following diagram.
The solution is one example of how to use the new CallbackStep to extend your pipeline to steps outside SageMaker (such as AWS Glue). You can apply the same general steps and architectural guidance to extend pipelines to other custom processes or tasks. In our solution, the pipeline runs the following tasks:
Data preprocessing –
These pipeline steps are just examples; you can modify the pipeline to meet your use case, such as adding steps to register the model in the SageMaker Model Registry.
In the next sections, we discuss how to set up this solution.
For the preceding pipeline, you need the prerequisites outlined in this section. The detailed setup of each of these prerequisites is available in the supporting notebook.
To run the provided notebook, you need the following:
Your pipeline uses the following services:
sqs_client = boto3.client(‘sqs’) queue_url = ” queue_name = ‘pipeline_callbacks_glue_prep’ try: response = sqs_client.create_queue(QueueName=queue_name) except: print(f”Failed to create queue”)
%%writefile queue_handler.py import json import boto3 import os import traceback ecs = boto3.client(‘ecs’) sagemaker = boto3.client(‘sagemaker’) def handler(event, context): print(f”Got event: {json.dumps(event)}”) cluster_arn = os.environ[“cluster_arn”] task_arn = os.environ[“task_arn”] task_subnets = os.environ[“task_subnets”] task_sgs = os.environ[“task_sgs”] glue_job_name = os.environ[“glue_job_name”] print(f”Cluster ARN: {cluster_arn}”) print(f”Task ARN: {task_arn}”) print(f”Task Subnets: {task_subnets}”) print(f”Task SG: {task_sgs}”) print(f”Glue job name: {glue_job_name}”) for record in event[‘Records’]: payload = json.loads(record[“body”]) print(f”Processing record {payload}”) token = payload[“token”] print(f”Got token {token}”) try: input_data_s3_uri = payload[“arguments”][“input_location”] output_data_s3_uri = payload[“arguments”][“output_location”] print(f”Got input_data_s3_uri {input_data_s3_uri}”) print(f”Got output_data_s3_uri {output_data_s3_uri}”) response = ecs.run_task( cluster = cluster_arn, count=1, launchType=’FARGATE’, taskDefinition=task_arn, networkConfiguration={ ‘awsvpcConfiguration’: { ‘subnets’: task_subnets.split(‘,’), ‘securityGroups’: task_sgs.split(‘,’), ‘assignPublicIp’: ‘ENABLED’ } }, overrides={ ‘containerOverrides’: [ { ‘name’: ‘FargateTask’, ‘environment’: [ { ‘name’: ‘inputLocation’, ‘value’: input_data_s3_uri }, { ‘name’: ‘outputLocation’, ‘value’: output_data_s3_uri }, { ‘name’: ‘token’, ‘value’: token }, { ‘name’: ‘glue_job_name’, ‘value’: glue_job_name } ] } ] } ) if ‘failures’ in response and len(response[‘failures’]) > 0: f = response[‘failures’][0] print(f”Failed to launch task for token {token}: {f[‘reason’]}”) sagemaker.send_step_failure( CallbackToken=token, FailureReason = f[‘reason’] ) else: print(f”Launched task {response[‘tasks’][0][‘taskArn’]}”) except Exception as e: trc = traceback.format_exc() print(f”Error handling record: {str(e)}:m {trc}”) sagemaker.send_step_failure( CallbackToken=token, FailureReason = e )
lambda_client.create_event_source_mapping( EventSourceArn=f’arn:aws:sqs:{region}:{account}:{queue_name}’, FunctionName=’SMPipelineQueueHandler’, Enabled=True, BatchSize=10 )
import boto3 ecs = boto3.client(‘ecs’) response = ecs.create_cluster(clusterName=’FargateTaskRunner’)
import boto3 import os import sys import traceback import time if ‘inputLocation’ in os.environ: input_uri = os.environ[‘inputLocation’] else: print(“inputLocation not found in environment”) sys.exit(1) if ‘outputLocation’ in os.environ: output_uri = os.environ[‘outputLocation’] else: print(“outputLocation not found in environment”) sys.exit(1) if ‘token’ in os.environ: token = os.environ[‘token’] else: print(“token not found in environment”) sys.exit(1) if ‘glue_job_name’ in os.environ: glue_job_name = os.environ[‘glue_job_name’] else: print(“glue_job_name not found in environment”) sys.exit(1) print(f”Processing from {input_uri} to {output_uri} using callback token {token}”) sagemaker = boto3.client(‘sagemaker’) glue = boto3.client(‘glue’) poll_interval = 60 try: t1 = time.time() response = glue.start_job_run( JobName=glue_job_name, Arguments={ ‘–output_uri’: output_uri, ‘–input_uri’: input_uri } ) job_run_id = response[‘JobRunId’] print(f”Starting job {job_run_id}”) job_status = ‘STARTING’ job_error = ” while job_status in [‘STARTING’,’RUNNING’,’STOPPING’]: time.sleep(poll_interval) response = glue.get_job_run( JobName=glue_job_name, RunId=job_run_id, PredecessorsIncluded=False ) job_status = response[‘JobRun’][‘JobRunState’] if ‘ErrorMessage’ in response[‘JobRun’]: job_error = response[‘JobRun’][‘ErrorMessage’] print(f”Job is in state {job_status}”) t2 = time.time() total_time = (t2 – t1) / 60.0 if job_status == ‘SUCCEEDED’: print(“Job succeeded”) sagemaker.send_pipeline_execution_step_success( CallbackToken=token, OutputParameters=[ { ‘Name’: ‘minutes’, ‘Value’: str(total_time) }, { ‘Name’: ‘s3_data_out’, ‘Value’: str(output_uri), } ] ) else: print(f”Job failed: {job_error}”) sagemaker.send_pipeline_execution_step_failure( CallbackToken=token, FailureReason = job_error ) except Exception as e: trc = traceback.format_exc() print(f”Error running ETL job: {str(e)}:m {trc}”) sagemaker.send_pipeline_execution_step_failure( CallbackToken=token, FailureReason = str(e) )
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.types import IntegerType from pyspark.sql import functions as F ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, [‘JOB_NAME’, ‘input_uri’, ‘output_uri’]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args[‘JOB_NAME’], args) df = spark.read.format(“csv”).option(“header”, “true”).load(“{0}*.csv”.format(args[‘input_uri’])) df = df.withColumn(“Passengers”, df[“passenger_count”].cast(IntegerType())) df = df.withColumn( ‘pickup_time’, F.to_timestamp( F.unix_timestamp(‘tpep_pickup_datetime’, ‘yyyy-MM-dd HH:mm:ss’).cast(‘timestamp’))) dfW = df.groupBy(F.window(“pickup_time”, “30 minutes”)).agg(F.sum(“Passengers”).alias(“passenger”)) dfOut = dfW.drop(‘window’) dfOut.repartition(1).write.option(“timestampFormat”, “yyyy-MM-dd HH:mm:ss”).csv(args[‘output_uri’]) job.commit()
glue = boto3.client(‘glue’) response = glue.create_job( Name=’GlueDataPrepForPipeline’, Description=’Prepare data for SageMaker training’, Role=glue_role_arn, ExecutionProperty={ ‘MaxConcurrentRuns’: 1 }, Command={ ‘Name’: ‘glueetl’, ‘ScriptLocation’: glue_script_location, }, MaxRetries=0, Timeout=60, MaxCapacity=10.0, GlueVersion=’2.0′ ) glue_job_name = response[‘Name’]
After these prerequisites are in place, including the necessary IAM permissions outlined in the example notebook, we’re ready to configure and run the pipeline.
To build out the pipeline, we rely on the preceding prerequisites in the callback step that perform data processing. We also combine that with steps native to SageMaker for model training and deployment to create an end-to-end pipeline.
To configure the pipeline, complete the following steps:
from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) input_data = ParameterString( name=”InputData”, default_value=f”s3://{default_bucket}/{taxi_prefix}/” ) id_out = ParameterString( name=”IdOut”, default_value=”taxiout”+ str(timestamp) ) output_data = ParameterString( name=”OutputData”, default_value=f”s3://{default_bucket}/{taxi_prefix}_output/” ) training_instance_count = ParameterInteger( name=”TrainingInstanceCount”, default_value=1 ) training_instance_type = ParameterString( name=”TrainingInstanceType”, default_value=”ml.c5.xlarge” )
This step uses the SQS queue created in the prerequisites in combination with arguments that are used by tasks in this step. These arguments include the inputs of the Amazon S3 location of the input (raw taxi data) and output training data. The step also defines the outputs, which in this case includes the callback output and Amazon S3 location of the training data. The outputs become the inputs to the next step in the pipeline. See the following code:
from sagemaker.workflow.callback_step import CallbackStep,CallbackOutput,CallbackOutputTypeEnum callback1_output=CallbackOutput(output_name=”s3_data_out”, output_type=CallbackOutputTypeEnum.String) step_callback_data = CallbackStep( name=”GluePrepCallbackStep”, sqs_queue_url=queue_url, inputs={ “input_location”: f”s3://{default_bucket}/{taxi_prefix}/”, “output_location”: f”s3://{default_bucket}/{taxi_prefix}_{id_out}/” }, outputs=[ callback1_output ], )
We first need to configure an estimator, then we configure the actual pipeline step. This step takes the output of the previous step and Amazon S3 location of the training data created by AWS Glue as input to train the model. See the following code:
containers = { ‘us-west-2’: ‘174872318107.dkr.ecr.us-west-2.amazonaws.com/randomcutforest:latest’, ‘us-east-1’: ‘382416733822.dkr.ecr.us-east-1.amazonaws.com/randomcutforest:latest’, ‘us-east-2’: ‘404615174143.dkr.ecr.us-east-2.amazonaws.com/randomcutforest:latest’, ‘eu-west-1’: ‘438346466558.dkr.ecr.eu-west-1.amazonaws.com/randomcutforest:latest’} region_name = boto3.Session().region_name container = containers[region_name] model_prefix = ‘model’ session = sagemaker.Session() rcf = sagemaker.estimator.Estimator( container, sagemaker.get_execution_role(), output_path=’s3://{}/{}/output’.format(default_bucket, model_prefix), instance_count=training_instance_count, instance_type=training_instance_type, sagemaker_session=session) rcf.set_hyperparameters( num_samples_per_tree=200, num_trees=50, feature_dim=1) from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep step_train = TrainingStep( name=”TrainModel”, estimator=rcf, inputs={ “train”: TrainingInput( #s3_data = Output of the previous call back steps3_data=step_callback_data.properties.Outputs[‘s3_data_out’], content_type=”text/csv;label_size=0″, distribution=’ShardedByS3Key’ ), }, )
from sagemaker.model import Model from sagemaker import get_execution_role role = get_execution_role() image_uri = sagemaker.image_uris.retrieve(“randomcutforest”, region) model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=sagemaker_session, role=role, ) from sagemaker.inputs import CreateModelInput from sagemaker.workflow.steps import CreateModelStep inputs = CreateModelInput( instance_type=”ml.m5.large”, ) create_model = CreateModelStep( name=”TaxiModel”, model=model, inputs=inputs, )
This step loads the trained model and processes the prediction request data stored in Amazon S3, then outputs the results (anomaly scores in this case) to the specified Amazon S3 location. See the following code:
base_uri = step_callback_data.properties.Outputs[‘s3_data_out’] output_prefix = ‘batch-out’ from sagemaker.transformer import Transformer transformer = Transformer( model_name=create_model.properties.ModelName, instance_type=”ml.m5.xlarge”, assemble_with = “Line”, accept = ‘text/csv’, instance_count=1, output_path=f”s3://{default_bucket}/{output_prefix}/”, ) from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep batch_data=step_callback_data.properties.Outputs[‘s3_data_out’] step_transform = TransformStep( name=”TaxiTransform”, transformer=transformer, inputs=TransformInput(data=batch_data,content_type=”text/csv”,split_type=”Line”,input_filter=”$[0]”,join_source=’Input’,output_filter=’$[0,-1]’) )
You’re now ready to create and run the pipeline. To do this, complete the following steps:
from sagemaker.workflow.pipeline import Pipeline pipeline_name = f”GluePipeline-{id_out}” pipeline = Pipeline( name=pipeline_name, parameters=[ input_data, training_instance_type, training_instance_count, id_out, ], steps=[step_callback_data, step_train,create_model,step_transform], )
from sagemaker import get_execution_role pipeline.upsert(role_arn = get_execution_role())
execution = pipeline.start()
You can monitor your pipeline using the SageMaker SDK, execution.list_steps(), or via the Studio console, as shown in the following screenshot.
You can follow the same pattern to integrate any long-running tasks or jobs with Pipelines. This may include running AWS Batch jobs, Amazon EMR job flows, or Amazon ECS or Fargate tasks.
You can also implement an email approval step for your models as part of your ML pipeline.
CallbackStep runs after the model EvaluationStep and sends an email containing approve or reject links with model metrics to a user. The workflow progresses to the next state after the user approves the task to proceed.
You can implement this pattern using a Lambda function and Amazon Simple Notification Service (Amazon SNS).
In this post, we showed you an example of how to use CallbackStep in Pipelines to extend your pipelines to integrate an AWS Glue job for data preprocessing. You can follow the same process to integrate any task or job outside of SageMaker. You can walk through the full solution explained in the example notebook.
Shelbee Eigenbrode is a Principal AI and Machine Learning Specialist Solutions Architect at Amazon Web Services (AWS). She holds 6 AWS certifications and has been in technology for 23 years spanning multiple industries, technologies, and roles. She is currently focusing on combining her DevOps and ML background to deliver and manage ML workloads at scale. With over 35 patents granted across various technology domains, she has a passion for continuous innovation and using data to drive business outcomes. Shelbee co-founded the Denver chapter of Women in Big Data.
Sofian Hamiti is an AI/ML specialist Solutions Architect at AWS. He helps customers across industries accelerate their AI/ML journey by helping them build and operationalize end-to-end machine learning solutions.
Randy DeFauw is a principal solutions architect at Amazon Web Services. He works with the AWS customers to provide guidance and technical assistance on database projects, helping them improve the value of their solutions when using AWS.
Payton Staub is a senior engineer with Amazon SageMaker. His current focus includes model building pipelines, experiment management, image management and other tools to help customers productionize and automate machine learning at scale.