Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def dstat_get_jobs(statuses=None,
job_ids=None,
task_ids=None,
labels=None,
create_time_min=None,
create_time_max=None):
statuses = statuses or {'*'}
labels = labels or {}
labels['test-token'] = test_setup.TEST_TOKEN
labels['test-name'] = test_setup.TEST_NAME
labels_set = {job_model.LabelParam(k, v) for (k, v) in labels.items()}
return six.advance_iterator(
dstat.dstat_job_producer(
provider=get_dsub_provider(),
statuses=statuses,
job_ids=job_ids,
task_ids=task_ids,
labels=labels_set,
create_time_min=create_time_min,
create_time_max=create_time_max,
full_output=True))
inputs: list of file input parameters
inputs_recursive: list of recursive directory input parameters
outputs: list of file output parameters
outputs_recursive: list of recursive directory output parameters
mounts: list of gcs buckets to mount
input_file_param_util: Utility for producing InputFileParam objects.
output_file_param_util: Utility for producing OutputFileParam objects.
mount_param_util: Utility for producing MountParam objects.
Returns:
job_params: a dictionary of 'envs', 'inputs', and 'outputs' that defines the
set of parameters and data for a job.
"""
# Parse environmental variables and labels.
env_data = parse_pair_args(envs, job_model.EnvParam)
label_data = parse_pair_args(labels, job_model.LabelParam)
# For input files, we need to:
# * split the input into name=uri pairs (name optional)
# * get the environmental variable name, or automatically set if null.
# * create the input file param
input_data = set()
for (recursive, args) in ((False, inputs), (True, inputs_recursive)):
for arg in args:
name, value = split_pair(arg, '=', nullable_idx=0)
name = input_file_param_util.get_variable_name(name)
input_data.add(input_file_param_util.make_param(name, value, recursive))
# For output files, we need to:
# * split the input into name=uri pairs (name optional)
# * get the environmental variable name, or automatically set if null.
# * create the output file param
if not input_list:
return []
output_list = []
for zone in input_list:
if zone.endswith('*'):
prefix = zone[:-1]
output_list.extend([z for z in _ZONES if z.startswith(prefix)])
else:
output_list.append(zone)
return output_list
class Label(job_model.LabelParam):
"""Name/value label metadata for a Google Genomics pipeline.
Attributes:
name (str): the label name.
value (str): the label value (optional).
"""
_allow_reserved_keys = True
__slots__ = ()
def build_pipeline_labels(job_metadata, task_metadata, task_id_pattern=None):
"""Build a set() of standard job and task labels.
Args:
job_metadata: Job metadata, such as job-id, job-name, and user-id.
task_metadata: Task metadata, such as the task-id.
len(row), len(job_params), path, reader.line_num))
# Each row can contain "envs", "inputs", "outputs"
envs = set()
inputs = set()
outputs = set()
labels = set()
for i in range(0, len(job_params)):
param = job_params[i]
name = param.name
if isinstance(param, job_model.EnvParam):
envs.add(job_model.EnvParam(name, row[i]))
elif isinstance(param, job_model.LabelParam):
labels.add(job_model.LabelParam(name, row[i]))
elif isinstance(param, job_model.InputFileParam):
inputs.add(
input_file_param_util.make_param(name, row[i], param.recursive))
elif isinstance(param, job_model.OutputFileParam):
outputs.add(
output_file_param_util.make_param(name, row[i], param.recursive))
task_descriptors.append(
job_model.TaskDescriptor({
'task-id': task_id,
'task-attempt': 1 if retries else None
}, {
'labels': labels,
'envs': envs,
'Unexpected number of fields {} vs {}: in {} line {}'.format(
len(row), len(job_params), path, reader.line_num))
# Each row can contain "envs", "inputs", "outputs"
envs = set()
inputs = set()
outputs = set()
labels = set()
for i in range(0, len(job_params)):
param = job_params[i]
name = param.name
if isinstance(param, job_model.EnvParam):
envs.add(job_model.EnvParam(name, row[i]))
elif isinstance(param, job_model.LabelParam):
labels.add(job_model.LabelParam(name, row[i]))
elif isinstance(param, job_model.InputFileParam):
inputs.add(
input_file_param_util.make_param(name, row[i], param.recursive))
elif isinstance(param, job_model.OutputFileParam):
outputs.add(
output_file_param_util.make_param(name, row[i], param.recursive))
task_descriptors.append(
job_model.TaskDescriptor({
'task-id': task_id,
'task-attempt': 1 if retries else None
}, {
'labels': labels,
else:
formatter = output_formatter.TextOutput(args.full)
# Set up the Genomics Pipelines service interface
provider = provider_base.get_provider(args, resources)
with dsub_util.replace_print():
provider_base.emit_provider_message(provider)
# Set poll interval to zero if --wait is not set.
poll_interval = args.poll_interval if args.wait else 0
# Make sure users were provided, or try to fill from OS user. This cannot
# be made into a default argument since some environments lack the ability
# to provide a username automatically.
user_ids = set(args.users) if args.users else {dsub_util.get_os_user()}
labels = param_util.parse_pair_args(args.label, job_model.LabelParam)
job_producer = dstat_job_producer(
provider=provider,
statuses=set(args.status) if args.status else None,
user_ids=user_ids,
job_ids=set(args.jobs) if args.jobs else None,
job_names=set(args.names) if args.names else None,
task_ids=set(args.tasks) if args.tasks else None,
task_attempts=set(args.attempts) if args.attempts else None,
labels=labels if labels else None,
create_time_min=create_time_min,
max_tasks=args.limit,
full_output=args.full,
summary_output=args.summary,
poll_interval=poll_interval,
raw_format=bool(args.format == 'provider-json'))
dstat_params['statuses'] = {
job_statuses.api_to_dsub(s)
for s in query.status
} if query.status else {'*'}
if query.name:
dstat_params['job_names'] = {query.name}
if query.labels:
if query.labels.get('job-id'):
dstat_params['job_ids'] = {query.labels['job-id']}
if query.labels.get('task-id'):
dstat_params['task_ids'] = {query.labels['task-id']}
if query.labels.get('attempt'):
dstat_params['task_attempts'] = {query.labels['attempt']}
dstat_params['labels'] = {
job_model.LabelParam(k, v)
for (k, v) in query.labels.items()
if k not in ['job-id', 'task-id', 'attempt']
}
if query.submission:
dstat_params['create_time'] = query.submission
if query.extensions:
if query.extensions.user_id:
dstat_params['user_ids'] = {query.extensions.user_id}
return dstat_params
# Parse args and validate
args = _parse_arguments()
# Compute the age filter (if any)
create_time = param_util.age_to_create_time(args.age)
# Set up the Genomics Pipelines service interface
provider = provider_base.get_provider(args, resources)
# Make sure users were provided, or try to fill from OS user. This cannot
# be made into a default argument since some environments lack the ability
# to provide a username automatically.
user_ids = set(args.users) if args.users else {dsub_util.get_os_user()}
# Process user labels.
labels = param_util.parse_pair_args(args.label, job_model.LabelParam)
# Let the user know which jobs we are going to look up
with dsub_util.replace_print():
provider_base.emit_provider_message(provider)
_emit_search_criteria(user_ids, args.jobs, args.tasks, args.label)
# Delete the requested jobs
deleted_tasks = ddel_tasks(
provider,
user_ids=user_ids,
job_ids=set(args.jobs) if args.jobs else None,
task_ids=set(args.tasks) if args.tasks else None,
labels=labels,
create_time_min=create_time)
# Emit the count of deleted jobs.
# Only emit anything about tasks if any of the jobs contains a task-id.
job_params = []
for col in header:
# Reserve the "-" and "--" namespace.
# If the column has no leading "-", treat it as an environment variable
col_type = '--env'
col_value = col
if col.startswith('-'):
col_type, col_value = split_pair(col, ' ', 1)
if col_type == '--env':
job_params.append(job_model.EnvParam(col_value))
elif col_type == '--label':
job_params.append(job_model.LabelParam(col_value))
elif col_type == '--input' or col_type == '--input-recursive':
name = input_file_param_util.get_variable_name(col_value)
job_params.append(
job_model.InputFileParam(
name, recursive=(col_type.endswith('recursive'))))
elif col_type == '--output' or col_type == '--output-recursive':
name = output_file_param_util.get_variable_name(col_value)
job_params.append(
job_model.OutputFileParam(
name, recursive=(col_type.endswith('recursive'))))
else:
raise ValueError('Unrecognized column header: %s' % col)