Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@_exception_scopes.system_entry_point
def add_inputs(self, inputs):
"""
Adds the inputs to this task. This can be called multiple times, but it will fail if an input with a given
name is added more than once, a name collides with an output, or if the name doesn't exist as an arg name in
the wrapped function.
:param dict[Text, flytekit.models.interface.Variable] inputs: names and variables
"""
self._validate_inputs(inputs)
self.interface.inputs.update(inputs)
@_exception_scopes.system_entry_point
def execute(self, context, inputs):
"""
Executes hive batch task's user code and produces futures file as well as all sub-task inputs.pb files.
:param flytekit.engines.common.EngineContext context:
:param flytekit.models.literals.LiteralMap inputs:
:rtype: dict[Text, flytekit.models.common.FlyteIdlEntity]
:returns: This function must return a dictionary mapping 'filenames' to Flyte Interface Entities. These
entities will be used by the engine to pass data from node to node, populate metadata, etc. etc.. Each
engine will have different behavior. For instance, the Flyte engine will upload the entities to a remote
working directory (with the names provided), which will in turn allow Flyte Propeller to push along the
workflow. Where as local engine will merely feed the outputs directly into the next node.
"""
spec = self._produce_dynamic_job_spec(context, inputs)
generated_files = {}
@_exception_scopes.system_entry_point
def __call__(self, *args, **input_map):
"""
:param list[T] args: Do not specify. Kwargs only are supported for this function.
:param dict[str, T] input_map: Map of inputs. Can be statically defined or OutputReference links.
:rtype: flytekit.common.nodes.SdkNode
"""
if len(args) > 0:
raise _user_exceptions.FlyteAssertion(
"When adding a task as a node in a workflow, all inputs must be specified with kwargs only. We "
"detected {} positional args.".format(len(args))
)
bindings, upstream_nodes = self.interface.create_bindings_for_inputs(input_map)
# TODO: Remove DEADBEEF
return _nodes.SdkNode(
@_exception_scopes.system_entry_point
def fetch(cls, project, domain, name, version):
"""
This function uses the engine loader to call create a hydrated task from Admin.
:param Text project:
:param Text domain:
:param Text name:
:param Text version:
:rtype: SdkTask
"""
task_id = _identifier.Identifier(_identifier_model.ResourceType.TASK, project, domain, name, version)
admin_task = _engine_loader.get_engine().fetch_task(task_id=task_id)
sdk_task = cls.promote_from_model(admin_task.closure.compiled_task.template)
sdk_task._id = task_id
return sdk_task
@_exception_scopes.system_entry_point
def __exit__(self, exc_type, exc_val, exc_tb):
self._io_object = None
return self._mp_blob.__exit__(exc_type, exc_val, exc_tb)
@_exception_scopes.system_entry_point
def __call__(self, *args, **input_map):
if len(args) > 0:
raise _user_exceptions.FlyteAssertion(
"When adding a workflow as a node in a workflow, all inputs must be specified with kwargs only. We "
"detected {} positional args.".format(len(args))
)
# Take the default values from the Inputs
compiled_inputs = {
v.name: v.sdk_default
for v in self.user_inputs if not v.sdk_required
}
compiled_inputs.update(input_map)
bindings, upstream_nodes = self.interface.create_bindings_for_inputs(compiled_inputs)
@_exception_scopes.system_entry_point
def download(self, local_path=None, overwrite=False):
"""
:param Text local_path: [Optional] A local path to which to download the object. If specified, the object
will not be managed and might not be cleaned up by the system upon exiting the context.
:param bool overwrite: If True, objects will be overwritten at the provided local_path in order to fetch this
object. Default is False.
:rtype: Schema
"""
self.multipart_blob.download(local_path=local_path, overwrite=overwrite)
@system_entry_point
def serialize_workflows(pkgs):
# Create map to look up tasks by their unique identifier. This is so we can compile them into the workflow closure.
tmap = {}
for _, _, t in iterate_registerable_entities_in_order(pkgs, include_entities={_sdk_task.SdkTask}):
tmap[t.id] = t
for m, k, w in iterate_registerable_entities_in_order(pkgs, include_entities={_workflow.SdkWorkflow}):
click.echo('Serializing {}'.format(_utils.fqdn(m.__name__, k, entity_type=w.resource_type)))
task_templates = []
for n in w.nodes:
if n.task_node is not None:
task_templates.append(tmap[n.task_node.reference_id])
wc = _WorkflowClosure(workflow=w, tasks=task_templates)
wc_pb = wc.to_flyte_idl()
@_exception_scopes.system_entry_point
def from_python_std(cls, t_value, schema_type=None):
"""
:param T t_value:
:param SchemaType schema_type: [Optional] If specified, we will ensure
:rtype: Schema
"""
if isinstance(t_value, (str, _six.text_type)):
if _os.path.isdir(t_value):
schema = cls.create_at_any_location(schema_type=schema_type)
schema.multipart_blob._directory = _utils.Directory(t_value)
schema.upload()
else:
schema = cls.create_at_known_location(t_value, schema_type=schema_type)
return schema
elif isinstance(t_value, cls):
return t_value
@_exception_scopes.system_entry_point
def __call__(self, *args, **input_map):
"""
:param list[T] args: Do not specify. Kwargs only are supported for this function.
:param dict[Text,T] input_map: Map of inputs. Can be statically defined or OutputReference links.
:rtype: flytekit.common.nodes.SdkNode
"""
if len(args) > 0:
raise _user_exceptions.FlyteAssertion(
"When adding a launchplan as a node in a workflow, all inputs must be specified with kwargs only. We "
"detected {} positional args.".format(self, len(args))
)
# Take the default values from the launch plan
default_inputs = {
k: v.sdk_default
for k, v in _six.iteritems(self.default_inputs.parameters) if not v.required