Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_workflow_and_check_definition(workflow_graph, workflow_name, sfn_client, sfn_role_arn):
# Create workflow
workflow = Workflow(
name=workflow_name,
definition=workflow_graph,
role=sfn_role_arn,
client=sfn_client
)
state_machine_arn = workflow.create()
# Check workflow definition
state_machine_desc = sfn_client.describe_state_machine(stateMachineArn=state_machine_arn)
assert workflow.definition.to_dict() == json.loads(state_machine_desc.get('definition'))
return workflow
"ParamE": test_step_01.output()["Response"]["Key04"]
}
)
test_step_03 = Pass(
state_id='StateThree',
parameters={
'ParamG': "SampleValueG",
"ParamF": execution_input["Key06"],
"ParamH": "SampleValueH",
"ParamI": test_step_02.output()
}
)
workflow_definition = Chain([test_step_01, test_step_02, test_step_03])
workflow = Workflow(
name='TestWorkflow',
definition=workflow_definition,
role='testRoleArn',
execution_input=execution_input,
client=client
)
return workflow
"""
self.preprocessor = preprocessor
self.estimator = estimator
self.inputs = inputs
self.s3_bucket = s3_bucket
for key in self.__class__.__allowed_kwargs:
setattr(self, key, kwargs.pop(key, None))
if not self.pipeline_name:
self.pipeline_name = 'inference-pipeline-{date}'.format(date=self._generate_timestamp())
self.definition = self.build_workflow_definition()
self.input_template = self._extract_input_template(self.definition)
workflow = Workflow(name=self.pipeline_name, definition=self.definition, role=role, format_json=True, client=client)
super(InferencePipeline, self).__init__(s3_bucket=s3_bucket, workflow=workflow, role=role, client=client)