Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.execution_pb2.NotificationList
"""
return _execution_pb2.NotificationList(
notifications=[n.to_flyte_idl() for n in self.notifications]
)
def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.execution_pb2.LiteralMapBlob
"""
return _execution_pb2.LiteralMapBlob(
values=self.values.to_flyte_idl() if self.values is not None else None,
uri=self.uri
)
def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.execution_pb2.Execution
"""
return _execution_pb2.Execution(
id=self.id.to_flyte_idl(),
closure=self.closure.to_flyte_idl(),
spec=self.spec.to_flyte_idl()
)
def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.execution_pb2.ExecutionMetadata
"""
return _execution_pb2.ExecutionMetadata(
mode=self.mode,
principal=self.principal,
nesting=self.nesting
)
def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.execution_pb2.ExecutionClosure
"""
obj = _execution_pb2.ExecutionClosure(
phase=self.phase,
error=self.error.to_flyte_idl() if self.error is not None else None,
outputs=self.outputs.to_flyte_idl() if self.outputs is not None else None
)
obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None))
return obj
def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.execution_pb2.ExecutionSpec
"""
return _execution_pb2.ExecutionSpec(
launch_plan=self.launch_plan.to_flyte_idl(),
metadata=self.metadata.to_flyte_idl(),
notifications=self.notifications.to_flyte_idl() if self.notifications else None,
disable_all=self.disable_all,
labels=self.labels.to_flyte_idl(),
annotations=self.annotations.to_flyte_idl(),
)
def to_flyte_idl(self):
"""
:rtype: _execution_pb2.WorkflowExecutionGetDataResponse
"""
return _execution_pb2.WorkflowExecutionGetDataResponse(
inputs=self.inputs.to_flyte_idl(),
outputs=self.outputs.to_flyte_idl(),
)
def get_execution(self, id):
"""
:param flytekit.common.core.identifier.WorkflowExecutionIdentifier id:
:rtype: flytekit.models.execution.Execution
"""
return _execution.Execution.from_flyte_idl(
super(SynchronousFlyteClient, self).get_execution(
_execution_pb2.WorkflowExecutionGetRequest(
id=id.to_flyte_idl()
)
def create_execution(self, project, domain, name, execution_spec):
"""
This will create an execution for the given execution spec.
:param Text project:
:param Text domain:
:param Text name:
:param flytekit.models.execution.ExecutionSpec execution_spec: This is the specification for the execution.
:returns: The unique identifier for the execution.
:rtype: flytekit.models.core.identifier.WorkflowExecutionIdentifier
"""
return _identifier.WorkflowExecutionIdentifier.from_flyte_idl(
super(SynchronousFlyteClient, self).create_execution(
_execution_pb2.ExecutionCreateRequest(
project=project,
domain=domain,
name=name,
spec=execution_spec.to_flyte_idl()
)