Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_workflow_input_placeholder():
workflow_input = ExecutionInput()
test_step = Pass(
state_id='StateOne',
parameters={
'ParamA': 'SampleValueA',
'ParamB': workflow_input,
'ParamC': workflow_input['Key01'],
'ParamD': workflow_input['Key02']['Key03'],
'ParamE': workflow_input['Key01']['Key03'],
}
)
expected_repr = {
"Type": "Pass",
"Parameters": {
"ParamA": "SampleValueA",
"ParamB.$": "$$.Execution.Input",
def test_workflow_with_placeholders():
workflow_input = ExecutionInput()
test_step_01 = Pass(
state_id='StateOne',
parameters={
'ParamA': workflow_input['Key02']['Key03'],
'ParamD': workflow_input['Key01']['Key03'],
}
)
test_step_02 = Pass(
state_id='StateTwo',
parameters={
'ParamC': workflow_input["Key05"],
"ParamB": "SampleValueB",
"ParamE": test_step_01.output()["Response"]["Key04"]
}
def test_parallel_state_with_placeholders():
workflow_input = ExecutionInput()
parallel_state = Parallel('ParallelState01')
branch_A = Pass(
'Branch_A',
parameters={
'ParamA': parallel_state.output()['A']["B"],
'ParamB': workflow_input["Key01"]
})
branch_B = Pass(
'Branch_B',
parameters={
'ParamA': "TestValue",
'ParamB': parallel_state.output()["Response"]["Key"]["State"]
})
def test_step_input_order_validation():
workflow_input = ExecutionInput()
test_step_01 = Pass(
state_id='StateOne',
parameters={
'ParamA': workflow_input['Key02']['Key03'],
'ParamD': workflow_input['Key01']['Key03'],
}
)
test_step_02 = Pass(
state_id='StateTwo',
parameters={
'ParamC': workflow_input["Key05"],
"ParamB": "SampleValueB",
"ParamE": test_step_01.output()["Response"]["Key04"]
}
def test_map_state_with_placeholders():
workflow_input = ExecutionInput()
map_state = Map('MapState01')
iterator_state = Pass(
'TrainIterator',
parameters={
'ParamA': map_state.output()['X']["Y"],
'ParamB': workflow_input["Key01"]["Key02"]["Key03"]
})
map_state.attach_iterator(iterator_state)
workflow_definition = Chain([map_state])
expected_repr = {
"StartAt": "MapState01",
"States": {
"MapState01": {
def workflow(client):
execution_input = ExecutionInput()
test_step_01 = Pass(
state_id='StateOne',
parameters={
'ParamA': execution_input['Key02']['Key03'],
'ParamD': execution_input['Key01']['Key03'],
}
)
test_step_02 = Pass(
state_id='StateTwo',
parameters={
'ParamC': execution_input["Key05"],
"ParamB": "SampleValueB",
"ParamE": test_step_01.output()["Response"]["Key04"]
}