Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'ParamA': "TestValue",
'ParamB': parallel_state.output()["Response"]["Key"]["State"]
})
branch_C = Pass(
'Branch_C',
parameters={
'ParamA': parallel_state.output()['A']["B"].get("C", float),
'ParamB': "HelloWorld"
})
parallel_state.add_branch(branch_A)
parallel_state.add_branch(branch_B)
parallel_state.add_branch(branch_C)
workflow_definition = Chain([parallel_state])
result = Graph(workflow_definition).to_dict()
expected_repr = {
"StartAt": "ParallelState01",
"States": {
"ParallelState01": {
"Type": "Parallel",
"End": True,
"Branches": [
{
"StartAt": "Branch_A",
"States": {
"Branch_A": {
"Parameters": {
"ParamA.$": "$['A']['B']",
"ParamB.$": "$$.Execution.Input['Key01']"
"epochs": IntegerParameter(1, 2),
"init_method": CategoricalParameter(["kmeans++", "random"]),
}
tuner = HyperparameterTuner(
estimator=kmeans,
objective_metric_name="test:msd",
hyperparameter_ranges=hyperparameter_ranges,
objective_type="Minimize",
max_jobs=2,
max_parallel_jobs=2,
)
# Build workflow definition
tuning_step = TuningStep('Tuning', tuner=tuner, job_name=job_name, data=record_set_for_hyperparameter_tuning)
workflow_graph = Chain([tuning_step])
with timeout(minutes=DEFAULT_TIMEOUT_MINUTES):
# Create workflow and check definition
workflow = create_workflow_and_check_definition(
workflow_graph=workflow_graph,
workflow_name=unique_name_from_base("integ-test-tuning-step-workflow"),
sfn_client=sfn_client,
sfn_role_arn=sfn_role_arn
)
# Execute workflow
execution = workflow.execute()
execution_output = execution.get_output(wait=True)
# Check workflow output
assert execution_output.get("HyperParameterTuningJobStatus") == "Completed"
def test_model_step(trained_estimator, sfn_client, sagemaker_session, sfn_role_arn):
# Build workflow definition
model_name = generate_job_name()
model_step = ModelStep('create_model_step', model=trained_estimator.create_model(), model_name=model_name)
workflow_graph = Chain([model_step])
with timeout(minutes=DEFAULT_TIMEOUT_MINUTES):
# Create workflow and check definition
workflow = create_workflow_and_check_definition(
workflow_graph=workflow_graph,
workflow_name=unique_name_from_base("integ-test-model-step-workflow"),
sfn_client=sfn_client,
sfn_role_arn=sfn_role_arn
)
# Execute workflow
execution = workflow.execute()
execution_output = execution.get_output(wait=True)
# Check workflow output
assert execution_output.get("ModelArn") is not None
'ParamC': workflow_input["Key05"],
"ParamB": "SampleValueB",
"ParamE": test_step_01.output()["Response"]["Key04"]
}
)
test_step_03 = Pass(
state_id='StateThree',
parameters={
'ParamG': "SampleValueG",
"ParamF": workflow_input["Key06"],
"ParamH": "SampleValueH"
}
)
workflow_definition = Chain([test_step_01, test_step_02, test_step_03])
result = Graph(workflow_definition).to_dict()
expected_workflow_repr = {
"StartAt": "StateOne",
"States": {
"StateOne": {
"Type": "Pass",
"Parameters": {
"ParamA.$": "$$.Execution.Input['Key02']['Key03']",
"ParamD.$": "$$.Execution.Input['Key01']['Key03']"
},
"Next": "StateTwo"
},
"StateTwo": {
"Type": "Pass",
def test_chaining_steps():
s1 = Pass('Step - One')
s2 = Pass('Step - Two')
s3 = Pass('Step - Three')
Chain([s1, s2])
assert s1.next_step == s2
assert s2.next_step is None
chain1 = Chain([s2, s3])
assert s2.next_step == s3
chain2 = Chain([s1, s3])
assert s1.next_step == s3
assert s2.next_step == s1.next_step
with pytest.raises(DuplicateStatesInChain):
chain2.append(s3)
with pytest.raises(DuplicateStatesInChain):
chain3 = Chain([chain1, chain2])
s1.next(s2)
chain3 = Chain([s3, s1])
assert chain3.steps == [s3, s1]
assert s3.next_step == s1
assert s1.next_step == s2
assert s2.next_step == s3
Chain([Chain([s3]), Chain([s1])])
def test_create_endpoint_step(trained_estimator, record_set_fixture, sfn_client, sagemaker_session, sfn_role_arn):
# Setup: Create model and endpoint config for trained estimator in SageMaker
model = trained_estimator.create_model()
model._create_sagemaker_model(instance_type=INSTANCE_TYPE)
endpoint_config = model.sagemaker_session.create_endpoint_config(
name = model.name,
model_name = model.name,
initial_instance_count=INSTANCE_COUNT,
instance_type=INSTANCE_TYPE
)
# End of Setup
# Build workflow definition
endpoint_name = unique_name_from_base("integ-test-endpoint")
endpoint_step = EndpointStep('create_endpoint_step', endpoint_name=endpoint_name, endpoint_config_name=model.name)
workflow_graph = Chain([endpoint_step])
with timeout(minutes=DEFAULT_TIMEOUT_MINUTES):
# Create workflow and check definition
workflow = create_workflow_and_check_definition(
workflow_graph=workflow_graph,
workflow_name=unique_name_from_base("integ-test-create-endpoint-step-workflow"),
sfn_client=sfn_client,
sfn_role_arn=sfn_role_arn
)
# Execute workflow
execution = workflow.execute()
execution_output = execution.get_output(wait=True)
# Check workflow output
endpoint_arn = execution_output.get("EndpointArn")
pca_transformer = trained_estimator.transformer(instance_count=INSTANCE_COUNT, instance_type=INSTANCE_TYPE)
# Create a model step to save the model
model_step = ModelStep('create_model_step', model=trained_estimator.create_model(), model_name=job_name)
# Upload data for transformation to S3
data_path = os.path.join(DATA_DIR, "one_p_mnist")
transform_input_path = os.path.join(data_path, "transform_input.csv")
transform_input_key_prefix = "integ-test-data/one_p_mnist/transform"
transform_input = pca_transformer.sagemaker_session.upload_data(
path=transform_input_path, key_prefix=transform_input_key_prefix
)
# Build workflow definition
transform_step = TransformStep('create_transform_job_step', pca_transformer, job_name=job_name, model_name=job_name, data=transform_input, content_type="text/csv")
workflow_graph = Chain([model_step, transform_step])
with timeout(minutes=DEFAULT_TIMEOUT_MINUTES):
# Create workflow and check definition
workflow = create_workflow_and_check_definition(
workflow_graph=workflow_graph,
workflow_name=unique_name_from_base("integ-test-transform-step-workflow"),
sfn_client=sfn_client,
sfn_role_arn=sfn_role_arn
)
# Execute workflow
execution = workflow.execute()
execution_output = execution.get_output(wait=True)
# Check workflow output
assert execution_output.get("TransformJobStatus") == "Completed"
def test_nested_chain_is_now_allowed():
chain = Chain([Chain([Pass('S1')])])
"ParamB": "SampleValueB",
"ParamE": test_step_01.output()["Response"]["Key04"]
}
)
test_step_03 = Pass(
state_id='StateThree',
parameters={
'ParamG': "SampleValueG",
"ParamF": execution_input["Key06"],
"ParamH": "SampleValueH",
"ParamI": test_step_02.output()
}
)
workflow_definition = Chain([test_step_01, test_step_02, test_step_03])
workflow = Workflow(
name='TestWorkflow',
definition=workflow_definition,
role='testRoleArn',
execution_input=execution_input,
client=client
)
return workflow