Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testParseTasksFileHeader(self):
header = '--env SAMPLE_ID\t--input VCF_FILE\t--output-recursive OUTPUT_PATH'
header = header.split('\t')
input_file_param_util = param_util.InputFileParamUtil('input')
output_file_param_util = param_util.OutputFileParamUtil('output')
job_params = param_util.parse_tasks_file_header(
header, input_file_param_util, output_file_param_util)
self.assertEqual(3, len(job_params))
# The first one is the SAMPLE env param.
self.assertTrue(isinstance(job_params[0], param_util.EnvParam))
self.assertEqual('SAMPLE_ID', job_params[0].name)
self.assertTrue(isinstance(job_params[1], param_util.InputFileParam))
self.assertEqual('VCF_FILE', job_params[1].name)
self.assertFalse(job_params[1].recursive)
self.assertTrue(isinstance(job_params[2], param_util.OutputFileParam))
self.assertEqual('OUTPUT_PATH', job_params[2].name)
self.assertTrue(job_params[2].recursive)
def testTasksFileToJobData(self):
expected_tsv_file = 'test/testdata/params_tasks.tsv'
input_file_param_util = param_util.InputFileParamUtil('input')
output_file_param_util = param_util.OutputFileParamUtil('output')
all_job_data = param_util.tasks_file_to_job_data({
'path': expected_tsv_file
}, input_file_param_util, output_file_param_util)
self.assertEqual(4, len(all_job_data))
for i in range(4):
job_data = all_job_data[i]
self.assertEqual('SAMPLE_ID', job_data['envs'][0].name)
self.assertEqual('sid-00%d' % i, job_data['envs'][0].value)
self.assertEqual('VCF_FILE', job_data['inputs'][0].name)
self.assertEqual('input/gs/inputs/sid-00%d.vcf' % i,
job_data['inputs'][0].docker_path)
self.assertEqual('OUTPUT_PATH', job_data['outputs'][0].name)
self.assertEqual('output/gs/outputs/results-00%d/' % i,
job_data['outputs'][0].docker_path)
def test_uri_rewrite_out(self, _, recursive, raw_uri, path, bn, provider):
# perpare the path if local.
if provider == PL:
path = os.path.abspath(path).rstrip('/') + '/'
out_util = param_util.OutputFileParamUtil('')
out_param = out_util.make_param('TEST', raw_uri, recursive=recursive)
self.assertEqual(path, out_param.uri.path)
self.assertEqual(bn, out_param.uri.basename)
self.assertEqual(path + bn, out_param.uri)
self.assertEqual(provider, out_param.file_provider)
logging = param_util.build_logging_param(test.LOGGING)
job_resources = job_model.Resources(
image='ubuntu', logging=logging, zones=['us-central1-*'])
env_data = {job_model.EnvParam(k, v) for (k, v) in envs.items()}
label_data = {job_model.LabelParam(k, v) for (k, v) in labels.items()}
input_file_param_util = param_util.InputFileParamUtil('input')
input_data = set()
for (recursive, items) in ((False, inputs.items()),
(True, inputs_recursive.items())):
for (name, value) in items:
name = input_file_param_util.get_variable_name(name)
input_data.add(input_file_param_util.make_param(name, value, recursive))
output_file_param_util = param_util.OutputFileParamUtil('output')
output_data = set()
for (recursive, items) in ((False, outputs.items()),
(True, outputs_recursive.items())):
for (name, value) in items:
name = output_file_param_util.get_variable_name(name)
output_data.add(output_file_param_util.make_param(name, value, recursive))
job_params = {
'envs': env_data,
'inputs': input_data,
'outputs': output_data,
'labels': label_data,
}
task_descriptors = [
job_model.TaskDescriptor({
'task-id': None
def run_main(args):
"""Execute job/task submission from command-line arguments."""
if args.command and args.script:
raise ValueError('Cannot supply both a --command and --script flag')
provider_base.check_for_unsupported_flag(args)
# Set up job parameters and job data from a tasks file or flags.
input_file_param_util = param_util.InputFileParamUtil(
DEFAULT_INPUT_LOCAL_PATH)
output_file_param_util = param_util.OutputFileParamUtil(
DEFAULT_OUTPUT_LOCAL_PATH)
mount_param_util = param_util.MountParamUtil(DEFAULT_MOUNT_LOCAL_PATH)
# Get job arguments from the command line
job_params = param_util.args_to_job_params(
args.env, args.label, args.input, args.input_recursive, args.output,
args.output_recursive, args.mount, input_file_param_util,
output_file_param_util, mount_param_util)
# If --tasks is on the command-line, then get task-specific data
if args.tasks:
task_descriptors = param_util.tasks_file_to_task_descriptors(
args.tasks, args.retries, input_file_param_util, output_file_param_util)
# Validate job data + task data
_validate_job_and_task_arguments(job_params, task_descriptors)
else:
def build_logging_param(logging_uri, util_class=OutputFileParamUtil):
"""Convenience function simplifies construction of the logging uri."""
if not logging_uri:
return job_model.LoggingParam(None, None)
recursive = not logging_uri.endswith('.log')
oututil = util_class('')
_, uri, provider = oututil.parse_uri(logging_uri, recursive)
if '*' in uri.basename:
raise ValueError('Wildcards not allowed in logging URI: %s' % uri)
return job_model.LoggingParam(uri, provider)