Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
literal_map = _type_helpers.pack_python_std_map_to_literal_map(
{'a': 9}, _type_map_from_variable_map(_task_defs.add_one.interface.inputs))
input_file = os.path.join(input_dir.name, "inputs.pb")
_utils.write_proto_to_file(literal_map.to_flyte_idl(), input_file)
with _utils.AutoDeletingTempDir("out") as output_dir:
_execute_task(
_task_defs.add_one.task_module,
_task_defs.add_one.task_function_name,
input_file,
output_dir.name,
False
)
p = _utils.load_proto_from_file(
_literals_pb2.LiteralMap,
os.path.join(output_dir.name, _constants.OUTPUT_FILE_NAME)
)
raw_map = _type_helpers.unpack_literal_map_to_sdk_python_std(
_literal_models.LiteralMap.from_flyte_idl(p),
_type_map_from_variable_map(_task_defs.add_one.interface.outputs)
)
assert raw_map['b'] == 10
assert len(raw_map) == 1
orig_env_array_index = os.environ.get('AWS_BATCH_JOB_ARRAY_INDEX')
os.environ['BATCH_JOB_ARRAY_INDEX_VAR_NAME'] = 'AWS_BATCH_JOB_ARRAY_INDEX'
os.environ['AWS_BATCH_JOB_ARRAY_INDEX'] = '0'
_execute_task(
_task_defs.add_one.task_module,
_task_defs.add_one.task_function_name,
dir.name,
dir.name,
False
)
raw_map = _type_helpers.unpack_literal_map_to_sdk_python_std(
_literal_models.LiteralMap.from_flyte_idl(
_utils.load_proto_from_file(
_literals_pb2.LiteralMap,
os.path.join(input_dir, _constants.OUTPUT_FILE_NAME)
)
),
_type_map_from_variable_map(_task_defs.add_one.interface.outputs)
)
assert raw_map['b'] == 10
assert len(raw_map) == 1
# reset the env vars
if orig_env_index_var_name:
os.environ['BATCH_JOB_ARRAY_INDEX_VAR_NAME'] = orig_env_index_var_name
if orig_env_array_index:
os.environ['AWS_BATCH_JOB_ARRAY_INDEX'] = orig_env_array_index
"{} {} {}".format(
_random.random(),
_datetime.datetime.utcnow(),
job_index
)
)
# If an ArrayTask is discoverable, the original job index may be different than the one specified in
# the environment variable. Look up the correct input/outputs in the index lookup mapping file.
job_index = _map_job_index_to_child_index(input_dir, inputs, job_index)
inputs = _os.path.join(inputs, str(job_index), 'inputs.pb')
output_prefix = _os.path.join(output_prefix, str(job_index))
_data_proxy.Data.get_data(inputs, local_inputs_file)
input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file)
_engine_loader.get_engine().get_task(task_def).execute(
_literal_models.LiteralMap.from_flyte_idl(input_proto),
context={'output_prefix': output_prefix}
)
def _pack_output_references(self, context, _):
"""
TODO: Doc
:param context:
:return:
"""
with open(_os.path.join(context.working_directory.name, _constants.OUTPUT_FILE_NAME), 'r') as r:
lm_pb2 = _literals_pb2.LiteralMap()
lm_pb2.ParseFromString(r.read())
context.output_protos[_constants.OUTPUT_FILE_NAME] = _literals.LiteralMap.from_flyte_idl(lm_pb2)
def _fetch_and_stringify_literal_map(path, verbose=False):
"""
:param Text path:
:param bool verbose:
:rtype: Text
"""
with _utils.AutoDeletingTempDir("flytecli") as tmp:
try:
fname = tmp.get_named_tempfile("literalmap.pb")
_data_proxy.Data.get_data(path, fname)
literal_map = _literals.LiteralMap.from_flyte_idl(
_utils.load_proto_from_file(_literals_pb2.LiteralMap, fname)
)
return _get_io_string(literal_map, verbose=verbose)
except:
return "Failed to pull data from {}. Do you have permissions?".format(path)
def get_inputs(self):
"""
:rtype: flytekit.models.literals.LiteralMap
"""
client = _FlyteClientManager(_platform_config.URL.get(), insecure=_platform_config.INSECURE.get()).client
url_blob = client.get_task_execution_data(self.sdk_task_execution.id)
if url_blob.inputs.bytes > 0:
with _common_utils.AutoDeletingTempDir() as t:
tmp_name = _os.path.join(t.name, "inputs.pb")
_data_proxy.Data.get_data(url_blob.inputs.url, tmp_name)
return _literals.LiteralMap.from_flyte_idl(
_common_utils.load_proto_from_file(_literals_pb2.LiteralMap, tmp_name)
)
return _literals.LiteralMap({})
"""
This method forwards necessary context into the notebook Kernel. Ideally, this code shouldn't be duplicating what
is in the underlying engine, but for now...
:param bytes variable_map_bytes:
:param bytes input_bytes:
:param Text working_directory:
:rtype: dict[Text,Any]
"""
if not _os.path.exists(working_directory):
tmpdir = _utils.AutoDeletingTempDir("nb_made_")
tmpdir.__enter__()
working_directory = tmpdir.name
_data_proxy.LocalWorkingDirectoryContext(working_directory).__enter__()
_data_proxy.RemoteDataContext()
lm_pb2 = _literals_pb2.LiteralMap()
lm_pb2.ParseFromString(input_bytes)
vm_pb2 = _interface_pb2.VariableMap()
vm_pb2.ParseFromString(variable_map_bytes)
# TODO: Inject vargs and wf_params
return _type_helpers.unpack_literal_map_to_sdk_python_std(
_literals.LiteralMap.from_flyte_idl(lm_pb2),
{
k: _type_helpers.get_sdk_type_from_literal_type(v.type)
for k, v in _six.iteritems(_interface.VariableMap.from_flyte_idl(vm_pb2).variables)
}
def get_outputs(self):
"""
:rtype: flytekit.models.literals.LiteralMap
"""
client = _FlyteClientManager(_platform_config.URL.get(), insecure=_platform_config.INSECURE.get()).client
url_blob = client.get_execution_data(self.sdk_workflow_execution.id)
if url_blob.outputs.bytes > 0:
with _common_utils.AutoDeletingTempDir() as t:
tmp_name = _os.path.join(t.name, "outputs.pb")
_data_proxy.Data.get_data(url_blob.outputs.url, tmp_name)
return _literals.LiteralMap.from_flyte_idl(
_common_utils.load_proto_from_file(_literals_pb2.LiteralMap, tmp_name)
)
return _literals.LiteralMap({})