How to use the stepfunctions.steps.Pass function in stepfunctions

To help you get started, we’ve selected a few stepfunctions examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github aws / aws-step-functions-data-science-sdk-python / tests / unit / test_placeholders_with_steps.py View on Github external
def test_step_input_order_validation():
    workflow_input = ExecutionInput()

    test_step_01 = Pass(
        state_id='StateOne',
        parameters={
            'ParamA': workflow_input['Key02']['Key03'],
            'ParamD': workflow_input['Key01']['Key03'],
        }
    )

    test_step_02 = Pass(
        state_id='StateTwo',
        parameters={
            'ParamC': workflow_input["Key05"],
            "ParamB": "SampleValueB",
            "ParamE": test_step_01.output()["Response"]["Key04"]
        }
    )
github aws / aws-step-functions-data-science-sdk-python / tests / unit / test_placeholders_with_steps.py View on Github external
def test_workflow_with_placeholders():
    workflow_input = ExecutionInput()

    test_step_01 = Pass(
        state_id='StateOne',
        parameters={
            'ParamA': workflow_input['Key02']['Key03'],
            'ParamD': workflow_input['Key01']['Key03'],
        }
    )

    test_step_02 = Pass(
        state_id='StateTwo',
        parameters={
            'ParamC': workflow_input["Key05"],
            "ParamB": "SampleValueB",
            "ParamE": test_step_01.output()["Response"]["Key04"]
        }
    )
github aws / aws-step-functions-data-science-sdk-python / tests / unit / test_placeholders_with_steps.py View on Github external
parameters={
            'ParamA': workflow_input['Key02']['Key03'],
            'ParamD': workflow_input['Key01']['Key03'],
        }
    )

    test_step_02 = Pass(
        state_id='StateTwo',
        parameters={
            'ParamC': workflow_input["Key05"],
            "ParamB": "SampleValueB",
            "ParamE": test_step_01.output()["Response"]["Key04"]
        }
    )

    test_step_03 = Pass(
        state_id='StateThree',
        parameters={
            'ParamG': "SampleValueG",
            "ParamF": workflow_input["Key06"],
            "ParamH": "SampleValueH"
        }
    )

    workflow_definition = Chain([test_step_01, test_step_03, test_step_02])

    with pytest.raises(ValueError):
        result = Graph(workflow_definition).to_dict()
github aws / aws-step-functions-data-science-sdk-python / tests / unit / test_steps.py View on Github external
def test_catch_fail_for_unsupported_state():
    s1 = Pass('Step - One')

    with pytest.raises(ValueError):
        s1.add_retry(Retry(error_equals=['ErrorA', 'ErrorB'], interval_seconds=1, max_attempts=2, backoff_rate=2))
github aws / aws-step-functions-data-science-sdk-python / tests / unit / test_placeholders_with_workflows.py View on Github external
def workflow(client):
    execution_input = ExecutionInput()

    test_step_01 = Pass(
        state_id='StateOne',
        parameters={
            'ParamA': execution_input['Key02']['Key03'],
            'ParamD': execution_input['Key01']['Key03'],
        }
    )

    test_step_02 = Pass(
        state_id='StateTwo',
        parameters={
            'ParamC': execution_input["Key05"],
            "ParamB": "SampleValueB",
            "ParamE": test_step_01.output()["Response"]["Key04"]
        }
    )

    test_step_03 = Pass(
        state_id='StateThree',
        parameters={
            'ParamG': "SampleValueG",
            "ParamF": execution_input["Key06"],
            "ParamH": "SampleValueH",
            "ParamI": test_step_02.output()
        }
github aws / aws-step-functions-data-science-sdk-python / tests / unit / test_steps.py View on Github external
def test_choice_state_creation():
    choice_state = Choice('Choice', input_path='$.Input')
    choice_state.add_choice(ChoiceRule.StringEquals("$.StringVariable1", "ABC"), Pass("End State 1"))
    choice_state.add_choice(ChoiceRule.StringLessThanEquals("$.StringVariable2", "ABC"), Pass("End State 2"))
    choice_state.default_choice(Pass('End State 3'))
    assert choice_state.state_id == 'Choice'
    assert len(choice_state.choices) == 2
    assert choice_state.default.state_id == 'End State 3'
    assert choice_state.to_dict() == {
        'Type': 'Choice',
        'InputPath': '$.Input',
        'Choices': [
            {
                'Variable': '$.StringVariable1',
                'StringEquals': 'ABC',
                'Next': 'End State 1'
            },
            {
                'Variable': '$.StringVariable2',
                'StringLessThanEquals': 'ABC',
                'Next': 'End State 2'
github aws / aws-step-functions-data-science-sdk-python / tests / unit / test_placeholders_with_steps.py View on Github external
def test_step_input_order_validation():
    workflow_input = ExecutionInput()

    test_step_01 = Pass(
        state_id='StateOne',
        parameters={
            'ParamA': workflow_input['Key02']['Key03'],
            'ParamD': workflow_input['Key01']['Key03'],
        }
    )

    test_step_02 = Pass(
        state_id='StateTwo',
        parameters={
            'ParamC': workflow_input["Key05"],
            "ParamB": "SampleValueB",
            "ParamE": test_step_01.output()["Response"]["Key04"]
        }
    )

    test_step_03 = Pass(
        state_id='StateThree',
        parameters={
            'ParamG': "SampleValueG",
            "ParamF": workflow_input["Key06"],
            "ParamH": "SampleValueH"
        }
    )
github aws / aws-step-functions-data-science-sdk-python / tests / unit / test_steps.py View on Github external
def test_parallel_state_creation():
    parallel_state = Parallel('Parallel')
    parallel_state.add_branch(Pass('Branch 1'))
    parallel_state.add_branch(Pass('Branch 2'))
    parallel_state.add_branch(Pass('Branch 3'))
    assert parallel_state.type == 'Parallel'
    assert len(parallel_state.branches) == 3
    assert parallel_state.to_dict() == {
        'Type': 'Parallel',
        'Branches': [
            {
                'StartAt': 'Branch 1',
                'States': {
                    'Branch 1': {
                        'Type': 'Pass',
                        'End': True
                    }
                }
            },
            {
                'StartAt': 'Branch 2',
github aws / aws-step-functions-data-science-sdk-python / tests / unit / test_steps.py View on Github external
def test_pass_state_creation():
    pass_state = Pass('Pass', result='Pass')
    assert pass_state.state_id == 'Pass'
    assert pass_state.to_dict() == {
        'Type': 'Pass',
        'Result': 'Pass',
        'End': True
    }
github aws / aws-step-functions-data-science-sdk-python / tests / unit / test_placeholders_with_steps.py View on Github external
branch_A = Pass(
        'Branch_A',
        parameters={
            'ParamA': parallel_state.output()['A']["B"],
            'ParamB': workflow_input["Key01"]
    })

    branch_B = Pass(
        'Branch_B',
        parameters={
            'ParamA': "TestValue",
            'ParamB': parallel_state.output()["Response"]["Key"]["State"]
    })

    branch_C = Pass(
        'Branch_C',
        parameters={
            'ParamA': parallel_state.output()['A']["B"].get("C", float),
            'ParamB': "HelloWorld"
    })

    parallel_state.add_branch(branch_A)
    parallel_state.add_branch(branch_B)
    parallel_state.add_branch(branch_C)

    workflow_definition = Chain([parallel_state])
    result = Graph(workflow_definition).to_dict()

    expected_repr = {
        "StartAt": "ParallelState01",
        "States": {