Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def demo_task_for_promote(wf_params, a, b, c):
b.set(a + 1)
c.set(a + 2)
@workflow_class()
class OneTaskWFForPromote(object):
wf_input = Input(Types.Integer, required=True)
my_task_node = demo_task_for_promote(a=wf_input)
wf_output_b = Output(my_task_node.outputs.b, sdk_type=Integer)
wf_output_c = Output(my_task_node.outputs.c, sdk_type=Integer)
:rtype: flytekit.models.core.workflow.WorkflowTemplate
"""
workflow_template_pb = _workflow_pb2.WorkflowTemplate()
# So that tests that use this work when run from any directory
basepath = _path.dirname(__file__)
filepath = _path.abspath(_path.join(basepath, "resources/protos", "OneTaskWFForPromote.pb"))
with open(filepath, "rb") as fh:
workflow_template_pb.ParseFromString(fh.read())
wt = _workflow_model.WorkflowTemplate.from_flyte_idl(workflow_template_pb)
return wt
def to_flyte_idl(self):
"""
:rtype: flyteidl.core.workflow_pb2.WorkflowTemplate
"""
return _core_workflow.WorkflowTemplate(
id=self.id.to_flyte_idl(),
metadata=self.metadata.to_flyte_idl(),
metadata_defaults=self.metadata_defaults.to_flyte_idl(),
interface=self.interface.to_flyte_idl(),
nodes=[n.to_flyte_idl() for n in self.nodes],
outputs=[o.to_flyte_idl() for o in self.outputs],
failure_node=self.failure_node.to_flyte_idl() if self.failure_node is not None else None
)