Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_expected_model_with_framework_estimator(tensorflow_estimator):
training_step = TrainingStep('Training',
estimator=tensorflow_estimator,
data={'train': 's3://sagemaker/train'},
job_name='tensorflow-job',
mini_batch_size=1024
)
expected_model = training_step.get_expected_model()
expected_model.entry_point = 'tf_train.py'
model_step = ModelStep('Create model', model=expected_model, model_name='tf-model')
assert model_step.to_dict() == {
'Type': 'Task',
'Parameters': {
'ExecutionRoleArn': EXECUTION_ROLE,
'ModelName': 'tf-model',
'PrimaryContainer': {
'Environment': {
'SAGEMAKER_PROGRAM': 'tf_train.py',
'SAGEMAKER_SUBMIT_DIRECTORY': 's3://sagemaker/tensorflow-job/source/sourcedir.tar.gz',
'SAGEMAKER_ENABLE_CLOUDWATCH_METRICS': 'false',
'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
'SAGEMAKER_REGION': 'us-east-1',
},
'Image': expected_model.image,
'ModelDataUrl.$': "$['ModelArtifacts']['S3ModelArtifacts']"
}
def test_transform_step(trained_estimator, sfn_client, sfn_role_arn):
# Create transformer from previously created estimator
job_name = generate_job_name()
pca_transformer = trained_estimator.transformer(instance_count=INSTANCE_COUNT, instance_type=INSTANCE_TYPE)
# Create a model step to save the model
model_step = ModelStep('create_model_step', model=trained_estimator.create_model(), model_name=job_name)
# Upload data for transformation to S3
data_path = os.path.join(DATA_DIR, "one_p_mnist")
transform_input_path = os.path.join(data_path, "transform_input.csv")
transform_input_key_prefix = "integ-test-data/one_p_mnist/transform"
transform_input = pca_transformer.sagemaker_session.upload_data(
path=transform_input_path, key_prefix=transform_input_key_prefix
)
# Build workflow definition
transform_step = TransformStep('create_transform_job_step', pca_transformer, job_name=job_name, model_name=job_name, data=transform_input, content_type="text/csv")
workflow_graph = Chain([model_step, transform_step])
with timeout(minutes=DEFAULT_TIMEOUT_MINUTES):
# Create workflow and check definition
workflow = create_workflow_and_check_definition(
def test_model_step(trained_estimator, sfn_client, sagemaker_session, sfn_role_arn):
# Build workflow definition
model_name = generate_job_name()
model_step = ModelStep('create_model_step', model=trained_estimator.create_model(), model_name=model_name)
workflow_graph = Chain([model_step])
with timeout(minutes=DEFAULT_TIMEOUT_MINUTES):
# Create workflow and check definition
workflow = create_workflow_and_check_definition(
workflow_graph=workflow_graph,
workflow_name=unique_name_from_base("integ-test-model-step-workflow"),
sfn_client=sfn_client,
sfn_role_arn=sfn_role_arn
)
# Execute workflow
execution = workflow.execute()
execution_output = execution.get_output(wait=True)
# Check workflow output
def test_get_expected_model(pca_estimator):
training_step = TrainingStep('Training', estimator=pca_estimator, job_name='TrainingJob')
expected_model = training_step.get_expected_model()
model_step = ModelStep('Create model', model=expected_model, model_name='pca-model')
assert model_step.to_dict() == {
'Type': 'Task',
'Parameters': {
'ExecutionRoleArn': EXECUTION_ROLE,
'ModelName': 'pca-model',
'PrimaryContainer': {
'Environment': {},
'Image': expected_model.image,
'ModelDataUrl.$': "$['ModelArtifacts']['S3ModelArtifacts']"
}
},
'Resource': 'arn:aws:states:::sagemaker:createModel',
'End': True
}
def test_training_step_creation_with_model(pca_estimator):
training_step = TrainingStep('Training', estimator=pca_estimator, job_name='TrainingJob')
model_step = ModelStep('Training - Save Model', training_step.get_expected_model(model_name=training_step.output()['TrainingJobName']))
training_step.next(model_step)
assert training_step.to_dict() == {
'Type': 'Task',
'Parameters': {
'AlgorithmSpecification': {
'TrainingImage': PCA_IMAGE,
'TrainingInputMode': 'File'
},
'OutputDataConfig': {
'S3OutputPath': 's3://sagemaker/models'
},
'StoppingCondition': {
'MaxRuntimeInSeconds': 86400
},
'ResourceConfig': {
'InstanceCount': 1,
def test_model_step_creation(pca_model):
step = ModelStep('Create model', model=pca_model, model_name='pca-model', tags=DEFAULT_TAGS)
assert step.to_dict() == {
'Type': 'Task',
'Parameters': {
'ExecutionRoleArn': EXECUTION_ROLE,
'ModelName': 'pca-model',
'PrimaryContainer': {
'Environment': {},
'Image': pca_model.image,
'ModelDataUrl': pca_model.model_data
},
'Tags': DEFAULT_TAGS_LIST
},
'Resource': 'arn:aws:states:::sagemaker:createModel',
'End': True
}