How to use the dsub.lib.param_util function in dsub

To help you get started, we’ve selected a few dsub examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github DataBiosphere / dsub / test / integration / e2e_python_api.py View on Github external
inputs_recursive=None,
                   outputs=None,
                   outputs_recursive=None,
                   wait=False):

  envs = envs or {}
  labels = labels or {}
  inputs = inputs or {}
  inputs_recursive = inputs_recursive or {}
  outputs = outputs or {}
  outputs_recursive = outputs_recursive or {}

  labels['test-token'] = test_setup.TEST_TOKEN
  labels['test-name'] = test_setup.TEST_NAME

  logging = param_util.build_logging_param(test.LOGGING)
  job_resources = job_model.Resources(
      image='ubuntu', logging=logging, zones=['us-central1-*'])

  env_data = {job_model.EnvParam(k, v) for (k, v) in envs.items()}
  label_data = {job_model.LabelParam(k, v) for (k, v) in labels.items()}

  input_file_param_util = param_util.InputFileParamUtil('input')
  input_data = set()
  for (recursive, items) in ((False, inputs.items()),
                             (True, inputs_recursive.items())):
    for (name, value) in items:
      name = input_file_param_util.get_variable_name(name)
      input_data.add(input_file_param_util.make_param(name, value, recursive))

  output_file_param_util = param_util.OutputFileParamUtil('output')
  output_data = set()
github DataBiosphere / dsub / test / unit / test_param_util.py View on Github external
# limitations under the License.
"""Tests for dsub.lib.param_util."""

from __future__ import absolute_import
from __future__ import print_function

import datetime
import doctest
import os
import re
import unittest
from dsub.lib import job_util
from dsub.lib import param_util
import parameterized

PL = param_util.P_LOCAL
PG = param_util.P_GCS


class ParamUtilTest(unittest.TestCase):

  def testEnvParam(self):
    env_param = param_util.EnvParam('my_name', 'my_value')
    self.assertEqual('my_name', env_param.name)
    self.assertEqual('my_value', env_param.value)

  @parameterized.parameterized.expand([
      ('gl1', 'genre', 'jazz'),
      ('gl2', 'underscores_are', 'totally_ok'),
      ('gl3', 'dashes-are', 'also-ok'),
      ('gl4', 'num_123', 'good_456'),
      ('gl5', 'final_underscore_', 'ok_too_'),
github DataBiosphere / dsub / dsub / commands / dsub.py View on Github external
def _get_job_resources(args):
  """Extract job-global resources requirements from input args.

  Args:
    args: parsed command-line arguments

  Returns:
    Resources object containing the requested resources for the job
  """
  logging = param_util.build_logging_param(
      args.logging) if args.logging else None
  timeout = param_util.timeout_in_seconds(args.timeout)
  log_interval = param_util.log_interval_in_seconds(args.log_interval)

  return job_model.Resources(
      min_cores=args.min_cores,
      min_ram=args.min_ram,
      machine_type=args.machine_type,
      disk_size=args.disk_size,
      disk_type=args.disk_type,
      boot_disk_size=args.boot_disk_size,
      image=args.image,
      regions=args.regions,
      zones=args.zones,
      logging=logging,
      logging_path=None,
github DataBiosphere / dsub / dsub / commands / dsub.py View on Github external
# Set up job parameters and job data from a tasks file or flags.
  input_file_param_util = param_util.InputFileParamUtil(
      DEFAULT_INPUT_LOCAL_PATH)
  output_file_param_util = param_util.OutputFileParamUtil(
      DEFAULT_OUTPUT_LOCAL_PATH)
  mount_param_util = param_util.MountParamUtil(DEFAULT_MOUNT_LOCAL_PATH)

  # Get job arguments from the command line
  job_params = param_util.args_to_job_params(
      args.env, args.label, args.input, args.input_recursive, args.output,
      args.output_recursive, args.mount, input_file_param_util,
      output_file_param_util, mount_param_util)
  # If --tasks is on the command-line, then get task-specific data
  if args.tasks:
    task_descriptors = param_util.tasks_file_to_task_descriptors(
        args.tasks, args.retries, input_file_param_util, output_file_param_util)

    # Validate job data + task data
    _validate_job_and_task_arguments(job_params, task_descriptors)
  else:
    # Create the implicit task
    task_metadata = {'task-id': None}
    if args.retries:
      task_metadata['task-attempt'] = 1
    task_descriptors = [
        job_model.TaskDescriptor(task_metadata, {
            'labels': set(),
            'envs': set(),
            'inputs': set(),
            'outputs': set()
        }, job_model.Resources())
github DataBiosphere / dsub / dsub / commands / dsub.py View on Github external
def _parse_arguments(prog, argv):
  """Parses command line arguments.

  Args:
    prog: The path of the program (dsub.py) or an alternate program name to
    display in usage.
    argv: The list of program arguments to parse.

  Returns:
    A Namespace of parsed arguments.
  """
  # Handle version flag and exit if it was passed.
  param_util.handle_version_flag()

  parser = provider_base.create_parser(prog)

  # Add dsub core job submission arguments
  parser.add_argument(
      '--version', '-v', default=False, help='Print the dsub version and exit.')

  parser.add_argument(
      '--unique-job-id',
      default=False,
      action='store_true',
      help="""Experimental: create a unique 32 character UUID for the dsub
          job-id using https://docs.python.org/3/library/uuid.html.""")
  parser.add_argument(
      '--name',
      help="""Name for pipeline. Defaults to the script name or
github DataBiosphere / dsub / dsub / providers / local.py View on Github external
recursive_localize_command=self._localize_inputs_recursive_command(
            task_dir, job_params['inputs'] | task_params['inputs']),
        localize_command=self._localize_inputs_command(
            task_dir, job_params['inputs'] | task_params['inputs'],
            job_metadata['user-project']),
        export_output_dirs=providers_util.build_recursive_gcs_delocalize_env(
            task_dir, job_params['outputs'] | task_params['outputs']),
        recursive_delocalize_command=self._delocalize_outputs_recursive_command(
            task_dir, job_params['outputs'] | task_params['outputs']),
        delocalize_command=self._delocalize_outputs_commands(
            task_dir, job_params['outputs'] | task_params['outputs'],
            job_metadata['user-project']),
        delocalize_logs_command=self._delocalize_logging_command(
            task_resources.logging_path, job_metadata['user-project']),
        export_mount_dirs=providers_util.build_mount_env(
            task_dir, param_util.get_local_mounts(job_params['mounts'])),
    )

    # Write the runner script and data file to the task_dir
    script_path = os.path.join(task_dir, 'runner.sh')
    script_data_path = os.path.join(task_dir, 'data.sh')
    self._write_source_file(script_path,
                            self._resources.get_resource(_RUNNER_SH_RESOURCE))
    self._write_source_file(script_data_path, script_data.encode())

    # Write the environment variables
    env_vars = set(env.items()) | job_params['envs'] | task_params['envs'] | {
        job_model.EnvParam('DATA_ROOT', providers_util.DATA_MOUNT_POINT),
        job_model.EnvParam('TMPDIR', providers_util.DATA_MOUNT_POINT + '/tmp')
    }
    env_fname = task_dir + '/docker.env'
    with open(env_fname, 'wt') as f:
github DataBiosphere / dsub / dsub / commands / dsub.py View on Github external
def _get_job_resources(args):
  """Extract job-global resources requirements from input args.

  Args:
    args: parsed command-line arguments

  Returns:
    Resources object containing the requested resources for the job
  """
  logging = param_util.build_logging_param(
      args.logging) if args.logging else None
  timeout = param_util.timeout_in_seconds(args.timeout)
  log_interval = param_util.log_interval_in_seconds(args.log_interval)

  return job_model.Resources(
      min_cores=args.min_cores,
      min_ram=args.min_ram,
      machine_type=args.machine_type,
      disk_size=args.disk_size,
      disk_type=args.disk_type,
      boot_disk_size=args.boot_disk_size,
      image=args.image,
      regions=args.regions,
      zones=args.zones,
      logging=logging,
      logging_path=None,
      service_account=args.service_account,
      scopes=args.scopes,
github DataBiosphere / job-manager / servers / dsub / jobs / controllers / dsub_client.py View on Github external
dstat_params = {
            'create_time': query.start,
            'end_time': query.end,
            'job_name_list': [query.name] if query.name else None,
            'statuses': None,
            'label_list': None
        }

        if query.statuses:
            status_set = set(
                [job_statuses.api_to_dsub(s) for s in query.statuses])
            dstat_params['statuses'] = list(status_set)

        if query.labels:
            dstat_params['label_list'] = [
                param_util.LabelParam(k, v) for (k, v) in query.labels.items()
            ]

        return dstat_params
github DataBiosphere / dsub / dsub / providers / google_v2.py View on Github external
# Set up the task labels
    labels = {
        label.name: label.value if label.value else '' for label in
        google_base.build_pipeline_labels(job_metadata, task_metadata)
        | job_params['labels'] | task_params['labels']
    }

    # Set local variables for the core pipeline values
    script = task_view.job_metadata['script']
    user_project = task_view.job_metadata['user-project'] or ''

    envs = job_params['envs'] | task_params['envs']
    inputs = job_params['inputs'] | task_params['inputs']
    outputs = job_params['outputs'] | task_params['outputs']
    mounts = job_params['mounts']
    gcs_mounts = param_util.get_gcs_mounts(mounts)

    persistent_disk_mount_params = param_util.get_persistent_disk_mounts(mounts)

    persistent_disks = [
        google_v2_pipelines.build_disk(
            name=disk.name.replace('_', '-'),  # Underscores not allowed
            size_gb=disk.disk_size or job_model.DEFAULT_MOUNTED_DISK_SIZE,
            source_image=disk.value,
            disk_type=disk.disk_type or job_model.DEFAULT_DISK_TYPE)
        for disk in persistent_disk_mount_params
    ]
    persistent_disk_mounts = [
        google_v2_pipelines.build_mount(
            disk=persistent_disk.get('name'),
            path=os.path.join(providers_util.DATA_MOUNT_POINT,
                              persistent_disk_mount_param.docker_path),
github DataBiosphere / dsub / dsub / providers / google_v2.py View on Github external
"""Submit the job (or tasks) to be executed.

    Args:
      job_descriptor: all parameters needed to launch all job tasks
      skip_if_output_present: (boolean) if true, skip tasks whose output
        is present (see --skip flag for more explanation).

    Returns:
      A dictionary containing the 'user-id', 'job-id', and 'task-id' list.
      For jobs that are not task array jobs, the task-id list should be empty.

    Raises:
      ValueError: if job resources or task data contain illegal values.
    """
    # Validate task data and resources.
    param_util.validate_submit_args_or_fail(
        job_descriptor,
        provider_name=_PROVIDER_NAME,
        input_providers=_SUPPORTED_INPUT_PROVIDERS,
        output_providers=_SUPPORTED_OUTPUT_PROVIDERS,
        logging_providers=_SUPPORTED_LOGGING_PROVIDERS)

    # Prepare and submit jobs.
    launched_tasks = []
    requests = []
    for task_view in job_model.task_view_generator(job_descriptor):

      job_params = task_view.job_params
      task_params = task_view.task_descriptors[0].task_params

      outputs = job_params['outputs'] | task_params['outputs']
      if skip_if_output_present: