Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
envs = envs or {}
labels = labels or {}
inputs = inputs or {}
inputs_recursive = inputs_recursive or {}
outputs = outputs or {}
outputs_recursive = outputs_recursive or {}
labels['test-token'] = test_setup.TEST_TOKEN
labels['test-name'] = test_setup.TEST_NAME
logging = param_util.build_logging_param(test.LOGGING)
job_resources = job_model.Resources(
image='ubuntu', logging=logging, zones=['us-central1-*'])
env_data = {job_model.EnvParam(k, v) for (k, v) in envs.items()}
label_data = {job_model.LabelParam(k, v) for (k, v) in labels.items()}
input_file_param_util = param_util.InputFileParamUtil('input')
input_data = set()
for (recursive, items) in ((False, inputs.items()),
(True, inputs_recursive.items())):
for (name, value) in items:
name = input_file_param_util.get_variable_name(name)
input_data.add(input_file_param_util.make_param(name, value, recursive))
output_file_param_util = param_util.OutputFileParamUtil('output')
output_data = set()
for (recursive, items) in ((False, outputs.items()),
(True, outputs_recursive.items())):
for (name, value) in items:
name = output_file_param_util.get_variable_name(name)
task_resources.logging_path, job_metadata['user-project']),
export_mount_dirs=providers_util.build_mount_env(
task_dir, param_util.get_local_mounts(job_params['mounts'])),
)
# Write the runner script and data file to the task_dir
script_path = os.path.join(task_dir, 'runner.sh')
script_data_path = os.path.join(task_dir, 'data.sh')
self._write_source_file(script_path,
self._resources.get_resource(_RUNNER_SH_RESOURCE))
self._write_source_file(script_data_path, script_data.encode())
# Write the environment variables
env_vars = set(env.items()) | job_params['envs'] | task_params['envs'] | {
job_model.EnvParam('DATA_ROOT', providers_util.DATA_MOUNT_POINT),
job_model.EnvParam('TMPDIR', providers_util.DATA_MOUNT_POINT + '/tmp')
}
env_fname = task_dir + '/docker.env'
with open(env_fname, 'wt') as f:
for e in env_vars:
f.write(e[0] + '=' + e[1] + '\n')
# Execute the local runner script.
# Redirecting the output to a file ensures that
# JOBID=$(dsub ...) doesn't block until docker returns.
runner_log = open(task_dir + '/runner-log.txt', 'wt')
runner = subprocess.Popen(
[script_path, script_data_path], stderr=runner_log, stdout=runner_log)
pid = runner.pid
f = open(task_dir + '/task.pid', 'wt')
f.write(str(pid) + '\n')
f.close()
Raises:
ValueError: If a header contains a ":" and the prefix is not supported.
"""
job_params = []
for col in header:
# Reserve the "-" and "--" namespace.
# If the column has no leading "-", treat it as an environment variable
col_type = '--env'
col_value = col
if col.startswith('-'):
col_type, col_value = split_pair(col, ' ', 1)
if col_type == '--env':
job_params.append(job_model.EnvParam(col_value))
elif col_type == '--label':
job_params.append(job_model.LabelParam(col_value))
elif col_type == '--input' or col_type == '--input-recursive':
name = input_file_param_util.get_variable_name(col_value)
job_params.append(
job_model.InputFileParam(
name, recursive=(col_type.endswith('recursive'))))
elif col_type == '--output' or col_type == '--output-recursive':
name = output_file_param_util.get_variable_name(col_value)
job_params.append(
job_model.OutputFileParam(
name, recursive=(col_type.endswith('recursive'))))
if len(row) != len(job_params):
raise ValueError(
'Unexpected number of fields {} vs {}: in {} line {}'.format(
len(row), len(job_params), path, reader.line_num))
# Each row can contain "envs", "inputs", "outputs"
envs = set()
inputs = set()
outputs = set()
labels = set()
for i in range(0, len(job_params)):
param = job_params[i]
name = param.name
if isinstance(param, job_model.EnvParam):
envs.add(job_model.EnvParam(name, row[i]))
elif isinstance(param, job_model.LabelParam):
labels.add(job_model.LabelParam(name, row[i]))
elif isinstance(param, job_model.InputFileParam):
inputs.add(
input_file_param_util.make_param(name, row[i], param.recursive))
elif isinstance(param, job_model.OutputFileParam):
outputs.add(
output_file_param_util.make_param(name, row[i], param.recursive))
task_descriptors.append(
job_model.TaskDescriptor({
'task-id': task_id,
'task-attempt': 1 if retries else None
delocalize_logs_command=self._delocalize_logging_command(
task_resources.logging_path, job_metadata['user-project']),
export_mount_dirs=providers_util.build_mount_env(
task_dir, param_util.get_local_mounts(job_params['mounts'])),
)
# Write the runner script and data file to the task_dir
script_path = os.path.join(task_dir, 'runner.sh')
script_data_path = os.path.join(task_dir, 'data.sh')
self._write_source_file(script_path,
self._resources.get_resource(_RUNNER_SH_RESOURCE))
self._write_source_file(script_data_path, script_data.encode())
# Write the environment variables
env_vars = set(env.items()) | job_params['envs'] | task_params['envs'] | {
job_model.EnvParam('DATA_ROOT', providers_util.DATA_MOUNT_POINT),
job_model.EnvParam('TMPDIR', providers_util.DATA_MOUNT_POINT + '/tmp')
}
env_fname = task_dir + '/docker.env'
with open(env_fname, 'wt') as f:
for e in env_vars:
f.write(e[0] + '=' + e[1] + '\n')
# Execute the local runner script.
# Redirecting the output to a file ensures that
# JOBID=$(dsub ...) doesn't block until docker returns.
runner_log = open(task_dir + '/runner-log.txt', 'wt')
runner = subprocess.Popen(
[script_path, script_data_path], stderr=runner_log, stdout=runner_log)
pid = runner.pid
f = open(task_dir + '/task.pid', 'wt')
f.write(str(pid) + '\n')