Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
}
)
test_step_03 = Pass(
state_id='StateThree',
parameters={
'ParamG': "SampleValueG",
"ParamF": workflow_input["Key06"],
"ParamH": "SampleValueH"
}
)
workflow_definition = Chain([test_step_01, test_step_03, test_step_02])
with pytest.raises(ValueError):
result = Graph(workflow_definition).to_dict()
"States": {
"TrainIterator": {
"Parameters": {
"ParamA.$": "$['X']['Y']",
"ParamB.$": "$$.Execution.Input['Key01']['Key02']['Key03']"
},
"Type": "Pass",
"End": True
}
}
}
}
}
}
result = Graph(workflow_definition).to_dict()
assert result == expected_repr
first_state = Task('FirstState', resource='arn:aws:lambda:us-east-1:1234567890:function:FirstState')
retry = Chain([Pass('Retry'), Pass('Cleanup'), first_state])
choice_state = Choice('Is Completed?')
choice_state.add_choice(
ChoiceRule.BooleanEquals(choice_state.output()["Completed"], True),
Succeed('Complete')
)
choice_state.add_choice(
ChoiceRule.BooleanEquals(choice_state.output()["Completed"], False),
retry
)
first_state.next(choice_state)
result = Graph(first_state).to_dict()
expected_repr = {
"StartAt": "FirstState",
"States": {
"FirstState": {
"Resource": "arn:aws:lambda:us-east-1:1234567890:function:FirstState",
"Type": "Task",
"Next": "Is Completed?"
},
"Is Completed?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$['Completed']",
"BooleanEquals": True,
"Next": "Complete"
"ParamE": test_step_01.output()["Response"]["Key04"]
}
)
test_step_03 = Pass(
state_id='StateThree',
parameters={
'ParamG': "SampleValueG",
"ParamF": workflow_input["Key06"],
"ParamH": "SampleValueH"
}
)
workflow_definition = Chain([test_step_01, test_step_02, test_step_03])
result = Graph(workflow_definition).to_dict()
expected_workflow_repr = {
"StartAt": "StateOne",
"States": {
"StateOne": {
"Type": "Pass",
"Parameters": {
"ParamA.$": "$$.Execution.Input['Key02']['Key03']",
"ParamD.$": "$$.Execution.Input['Key01']['Key03']"
},
"Next": "StateTwo"
},
"StateTwo": {
"Type": "Pass",
"Parameters": {
"ParamC.$": "$$.Execution.Input['Key05']",
'ParamB': parallel_state.output()["Response"]["Key"]["State"]
})
branch_C = Pass(
'Branch_C',
parameters={
'ParamA': parallel_state.output()['A']["B"].get("C", float),
'ParamB': "HelloWorld"
})
parallel_state.add_branch(branch_A)
parallel_state.add_branch(branch_B)
parallel_state.add_branch(branch_C)
workflow_definition = Chain([parallel_state])
result = Graph(workflow_definition).to_dict()
expected_repr = {
"StartAt": "ParallelState01",
"States": {
"ParallelState01": {
"Type": "Parallel",
"End": True,
"Branches": [
{
"StartAt": "Branch_A",
"States": {
"Branch_A": {
"Parameters": {
"ParamA.$": "$['A']['B']",
"ParamB.$": "$$.Execution.Input['Key01']"
},
Returns:
str: The state machine definition and/or role updated. If the update fails, None will be returned.
"""
if definition is None and role is None:
raise MissingRequiredParameter("A new definition and/or role must be provided to update an existing workflow.")
if self.state_machine_arn is None:
raise WorkflowNotFound("Local workflow instance does not point to an existing workflow on AWS StepFunctions. Please consider using Workflow.create(...) to create a new workflow, or Workflow.attach(...) to attach the instance to an existing workflow on AWS Step Functions.")
if definition:
if isinstance(definition, Graph):
self.definition = definition
else:
self.definition = Graph(
definition,
timeout_seconds=self.timeout_seconds,
comment=self.comment,
version=self.version
)
if role:
self.role = role
response = self.client.update_state_machine(
stateMachineArn=self.state_machine_arn,
definition=self.definition.to_json(pretty=self.format_json),
roleArn=self.role
)
logger.info("Workflow updated successfully on AWS Step Functions. All execute() calls will use the updated definition and role within a few seconds. ")
return self.state_machine_arn
Args:
definition (State or Chain, optional): The `Amazon States Language `_ definition to update the workflow with. (default: None)
role (str, optional): The Amazon Resource Name (ARN) of the IAM role to use for creating, managing, and running the workflow. (default: None)
Returns:
str: The state machine definition and/or role updated. If the update fails, None will be returned.
"""
if definition is None and role is None:
raise MissingRequiredParameter("A new definition and/or role must be provided to update an existing workflow.")
if self.state_machine_arn is None:
raise WorkflowNotFound("Local workflow instance does not point to an existing workflow on AWS StepFunctions. Please consider using Workflow.create(...) to create a new workflow, or Workflow.attach(...) to attach the instance to an existing workflow on AWS Step Functions.")
if definition:
if isinstance(definition, Graph):
self.definition = definition
else:
self.definition = Graph(
definition,
timeout_seconds=self.timeout_seconds,
comment=self.comment,
version=self.version
)
if role:
self.role = role
response = self.client.update_state_machine(
stateMachineArn=self.state_machine_arn,
definition=self.definition.to_json(pretty=self.format_json),
roleArn=self.role
definition (State or Chain): The `Amazon States Language `_ definition of the workflow.
role (str): The Amazon Resource Name (ARN) of the IAM role to use for creating, managing, and running the workflow.
tags (list): Tags to be added when creating a workflow. Tags are key-value pairs that can be associated with Step Functions workflows and activities. (default: [])
execution_input (ExecutionInput, optional): Placeholder collection that defines the placeholder variables for the workflow execution. \
This is also used to validate inputs provided when executing the workflow. (default: None)
timeout_seconds (int, optional): The maximum number of seconds an execution of the workflow can run. If it runs longer than the specified time, the workflow run fails with a `States.Timeout` Error Name. (default: None)
comment (str, optional): A human-readable description of the workflow. (default: None)
version (str, optional): The version of the Amazon States Language used in the workflow. (default: None)
state_machine_arn (str, optional): The Amazon Resource Name (ARN) of the workflow. (default: None)
format_json (bool, optional): Boolean flag set to `True` if workflow definition and execution inputs should be prettified for this workflow. `False`, otherwise. (default: True)
client (SFN.Client, optional): boto3 client to use for creating, managing, and running the workflow on Step Functions. If not provided, a default boto3 client for Step Functions will be automatically created and used. (default: None)
"""
self.timeout_seconds = timeout_seconds
self.comment = comment
self.version = version
if isinstance(definition, Graph):
self.definition = definition
else:
self.definition = Graph(
definition,
timeout_seconds=self.timeout_seconds,
comment=self.comment,
version=self.version
)
self.name = name
self.role = role
self.tags = tags
self.workflow_input = execution_input
if client:
self.client = client
else:
execution_input (ExecutionInput, optional): Placeholder collection that defines the placeholder variables for the workflow execution. \
This is also used to validate inputs provided when executing the workflow. (default: None)
timeout_seconds (int, optional): The maximum number of seconds an execution of the workflow can run. If it runs longer than the specified time, the workflow run fails with a `States.Timeout` Error Name. (default: None)
comment (str, optional): A human-readable description of the workflow. (default: None)
version (str, optional): The version of the Amazon States Language used in the workflow. (default: None)
state_machine_arn (str, optional): The Amazon Resource Name (ARN) of the workflow. (default: None)
format_json (bool, optional): Boolean flag set to `True` if workflow definition and execution inputs should be prettified for this workflow. `False`, otherwise. (default: True)
client (SFN.Client, optional): boto3 client to use for creating, managing, and running the workflow on Step Functions. If not provided, a default boto3 client for Step Functions will be automatically created and used. (default: None)
"""
self.timeout_seconds = timeout_seconds
self.comment = comment
self.version = version
if isinstance(definition, Graph):
self.definition = definition
else:
self.definition = Graph(
definition,
timeout_seconds=self.timeout_seconds,
comment=self.comment,
version=self.version
)
self.name = name
self.role = role
self.tags = tags
self.workflow_input = execution_input
if client:
self.client = client
else:
self.client = boto3.client('stepfunctions')
append_user_agent_to_client(self.client)