Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
inputs_recursive=None,
outputs=None,
outputs_recursive=None,
wait=False):
envs = envs or {}
labels = labels or {}
inputs = inputs or {}
inputs_recursive = inputs_recursive or {}
outputs = outputs or {}
outputs_recursive = outputs_recursive or {}
labels['test-token'] = test_setup.TEST_TOKEN
labels['test-name'] = test_setup.TEST_NAME
logging = param_util.build_logging_param(test.LOGGING)
job_resources = job_model.Resources(
image='ubuntu', logging=logging, zones=['us-central1-*'])
env_data = {job_model.EnvParam(k, v) for (k, v) in envs.items()}
label_data = {job_model.LabelParam(k, v) for (k, v) in labels.items()}
input_file_param_util = param_util.InputFileParamUtil('input')
input_data = set()
for (recursive, items) in ((False, inputs.items()),
(True, inputs_recursive.items())):
for (name, value) in items:
name = input_file_param_util.get_variable_name(name)
input_data.add(input_file_param_util.make_param(name, value, recursive))
output_file_param_util = param_util.OutputFileParamUtil('output')
output_data = set()
# limitations under the License.
"""Tests for dsub.lib.param_util."""
from __future__ import absolute_import
from __future__ import print_function
import datetime
import doctest
import os
import re
import unittest
from dsub.lib import job_util
from dsub.lib import param_util
import parameterized
PL = param_util.P_LOCAL
PG = param_util.P_GCS
class ParamUtilTest(unittest.TestCase):
def testEnvParam(self):
env_param = param_util.EnvParam('my_name', 'my_value')
self.assertEqual('my_name', env_param.name)
self.assertEqual('my_value', env_param.value)
@parameterized.parameterized.expand([
('gl1', 'genre', 'jazz'),
('gl2', 'underscores_are', 'totally_ok'),
('gl3', 'dashes-are', 'also-ok'),
('gl4', 'num_123', 'good_456'),
('gl5', 'final_underscore_', 'ok_too_'),
def _get_job_resources(args):
"""Extract job-global resources requirements from input args.
Args:
args: parsed command-line arguments
Returns:
Resources object containing the requested resources for the job
"""
logging = param_util.build_logging_param(
args.logging) if args.logging else None
timeout = param_util.timeout_in_seconds(args.timeout)
log_interval = param_util.log_interval_in_seconds(args.log_interval)
return job_model.Resources(
min_cores=args.min_cores,
min_ram=args.min_ram,
machine_type=args.machine_type,
disk_size=args.disk_size,
disk_type=args.disk_type,
boot_disk_size=args.boot_disk_size,
image=args.image,
regions=args.regions,
zones=args.zones,
logging=logging,
logging_path=None,
# Set up job parameters and job data from a tasks file or flags.
input_file_param_util = param_util.InputFileParamUtil(
DEFAULT_INPUT_LOCAL_PATH)
output_file_param_util = param_util.OutputFileParamUtil(
DEFAULT_OUTPUT_LOCAL_PATH)
mount_param_util = param_util.MountParamUtil(DEFAULT_MOUNT_LOCAL_PATH)
# Get job arguments from the command line
job_params = param_util.args_to_job_params(
args.env, args.label, args.input, args.input_recursive, args.output,
args.output_recursive, args.mount, input_file_param_util,
output_file_param_util, mount_param_util)
# If --tasks is on the command-line, then get task-specific data
if args.tasks:
task_descriptors = param_util.tasks_file_to_task_descriptors(
args.tasks, args.retries, input_file_param_util, output_file_param_util)
# Validate job data + task data
_validate_job_and_task_arguments(job_params, task_descriptors)
else:
# Create the implicit task
task_metadata = {'task-id': None}
if args.retries:
task_metadata['task-attempt'] = 1
task_descriptors = [
job_model.TaskDescriptor(task_metadata, {
'labels': set(),
'envs': set(),
'inputs': set(),
'outputs': set()
}, job_model.Resources())
def _parse_arguments(prog, argv):
"""Parses command line arguments.
Args:
prog: The path of the program (dsub.py) or an alternate program name to
display in usage.
argv: The list of program arguments to parse.
Returns:
A Namespace of parsed arguments.
"""
# Handle version flag and exit if it was passed.
param_util.handle_version_flag()
parser = provider_base.create_parser(prog)
# Add dsub core job submission arguments
parser.add_argument(
'--version', '-v', default=False, help='Print the dsub version and exit.')
parser.add_argument(
'--unique-job-id',
default=False,
action='store_true',
help="""Experimental: create a unique 32 character UUID for the dsub
job-id using https://docs.python.org/3/library/uuid.html.""")
parser.add_argument(
'--name',
help="""Name for pipeline. Defaults to the script name or
recursive_localize_command=self._localize_inputs_recursive_command(
task_dir, job_params['inputs'] | task_params['inputs']),
localize_command=self._localize_inputs_command(
task_dir, job_params['inputs'] | task_params['inputs'],
job_metadata['user-project']),
export_output_dirs=providers_util.build_recursive_gcs_delocalize_env(
task_dir, job_params['outputs'] | task_params['outputs']),
recursive_delocalize_command=self._delocalize_outputs_recursive_command(
task_dir, job_params['outputs'] | task_params['outputs']),
delocalize_command=self._delocalize_outputs_commands(
task_dir, job_params['outputs'] | task_params['outputs'],
job_metadata['user-project']),
delocalize_logs_command=self._delocalize_logging_command(
task_resources.logging_path, job_metadata['user-project']),
export_mount_dirs=providers_util.build_mount_env(
task_dir, param_util.get_local_mounts(job_params['mounts'])),
)
# Write the runner script and data file to the task_dir
script_path = os.path.join(task_dir, 'runner.sh')
script_data_path = os.path.join(task_dir, 'data.sh')
self._write_source_file(script_path,
self._resources.get_resource(_RUNNER_SH_RESOURCE))
self._write_source_file(script_data_path, script_data.encode())
# Write the environment variables
env_vars = set(env.items()) | job_params['envs'] | task_params['envs'] | {
job_model.EnvParam('DATA_ROOT', providers_util.DATA_MOUNT_POINT),
job_model.EnvParam('TMPDIR', providers_util.DATA_MOUNT_POINT + '/tmp')
}
env_fname = task_dir + '/docker.env'
with open(env_fname, 'wt') as f:
def _get_job_resources(args):
"""Extract job-global resources requirements from input args.
Args:
args: parsed command-line arguments
Returns:
Resources object containing the requested resources for the job
"""
logging = param_util.build_logging_param(
args.logging) if args.logging else None
timeout = param_util.timeout_in_seconds(args.timeout)
log_interval = param_util.log_interval_in_seconds(args.log_interval)
return job_model.Resources(
min_cores=args.min_cores,
min_ram=args.min_ram,
machine_type=args.machine_type,
disk_size=args.disk_size,
disk_type=args.disk_type,
boot_disk_size=args.boot_disk_size,
image=args.image,
regions=args.regions,
zones=args.zones,
logging=logging,
logging_path=None,
service_account=args.service_account,
scopes=args.scopes,
dstat_params = {
'create_time': query.start,
'end_time': query.end,
'job_name_list': [query.name] if query.name else None,
'statuses': None,
'label_list': None
}
if query.statuses:
status_set = set(
[job_statuses.api_to_dsub(s) for s in query.statuses])
dstat_params['statuses'] = list(status_set)
if query.labels:
dstat_params['label_list'] = [
param_util.LabelParam(k, v) for (k, v) in query.labels.items()
]
return dstat_params
# Set up the task labels
labels = {
label.name: label.value if label.value else '' for label in
google_base.build_pipeline_labels(job_metadata, task_metadata)
| job_params['labels'] | task_params['labels']
}
# Set local variables for the core pipeline values
script = task_view.job_metadata['script']
user_project = task_view.job_metadata['user-project'] or ''
envs = job_params['envs'] | task_params['envs']
inputs = job_params['inputs'] | task_params['inputs']
outputs = job_params['outputs'] | task_params['outputs']
mounts = job_params['mounts']
gcs_mounts = param_util.get_gcs_mounts(mounts)
persistent_disk_mount_params = param_util.get_persistent_disk_mounts(mounts)
persistent_disks = [
google_v2_pipelines.build_disk(
name=disk.name.replace('_', '-'), # Underscores not allowed
size_gb=disk.disk_size or job_model.DEFAULT_MOUNTED_DISK_SIZE,
source_image=disk.value,
disk_type=disk.disk_type or job_model.DEFAULT_DISK_TYPE)
for disk in persistent_disk_mount_params
]
persistent_disk_mounts = [
google_v2_pipelines.build_mount(
disk=persistent_disk.get('name'),
path=os.path.join(providers_util.DATA_MOUNT_POINT,
persistent_disk_mount_param.docker_path),
"""Submit the job (or tasks) to be executed.
Args:
job_descriptor: all parameters needed to launch all job tasks
skip_if_output_present: (boolean) if true, skip tasks whose output
is present (see --skip flag for more explanation).
Returns:
A dictionary containing the 'user-id', 'job-id', and 'task-id' list.
For jobs that are not task array jobs, the task-id list should be empty.
Raises:
ValueError: if job resources or task data contain illegal values.
"""
# Validate task data and resources.
param_util.validate_submit_args_or_fail(
job_descriptor,
provider_name=_PROVIDER_NAME,
input_providers=_SUPPORTED_INPUT_PROVIDERS,
output_providers=_SUPPORTED_OUTPUT_PROVIDERS,
logging_providers=_SUPPORTED_LOGGING_PROVIDERS)
# Prepare and submit jobs.
launched_tasks = []
requests = []
for task_view in job_model.task_view_generator(job_descriptor):
job_params = task_view.job_params
task_params = task_view.task_descriptors[0].task_params
outputs = job_params['outputs'] | task_params['outputs']
if skip_if_output_present: