Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
instance_type = os.environ['instance_size']
instance_image = os.environ['docker_image']
price_type = os.environ['price_type']
region_id = os.environ['CLOUD_REGION_ID']
instance_cores = int(os.environ['CLOUD_PIPELINE_NODE_CORES']) \
if 'CLOUD_PIPELINE_NODE_CORES' in os.environ else multiprocessing.cpu_count()
max_additional_hosts = int(os.environ['CP_CAP_AUTOSCALE_WORKERS']) \
if 'CP_CAP_AUTOSCALE_WORKERS' in os.environ else 3
log_verbose = os.environ['CP_CAP_AUTOSCALE_VERBOSE'].strip().lower() == "true" \
if 'CP_CAP_AUTOSCALE_VERBOSE' in os.environ else False
Logger.init(cmd=args.debug, log_file='/common/workdir/.autoscaler.log', task='GridEngineAutoscaling',
verbose=log_verbose)
cmd_executor = CmdExecutor()
grid_engine = GridEngine(cmd_executor=cmd_executor)
host_storage = FileSystemHostStorage(cmd_executor=cmd_executor, storage_file='/common/workdir/.autoscaler.storage')
pipe = PipelineAPI(api_url=pipeline_api, log_dir='/common/workdir/.pipe.log')
scale_up_timeout = int(_retrieve_preference(pipe, 'ge.autoscaling.scale.up.timeout', default_value=30))
scale_down_timeout = int(_retrieve_preference(pipe, 'ge.autoscaling.scale.down.timeout', default_value=30))
scale_up_polling_timeout = int(_retrieve_preference(pipe, 'ge.autoscaling.scale.up.polling.timeout',
default_value=600))
scale_up_handler = GridEngineScaleUpHandler(cmd_executor=cmd_executor, pipe=pipe, grid_engine=grid_engine,
host_storage=host_storage, parent_run_id=master_run_id,
default_hostfile=default_hostfile, instance_disk=instance_disk,
instance_type=instance_type, instance_image=instance_image,
price_type=price_type, region_id=region_id,
instance_cores=instance_cores, polling_timeout=scale_up_polling_timeout)
scale_down_handler = GridEngineScaleDownHandler(cmd_executor=cmd_executor, grid_engine=grid_engine,
default_hostfile=default_hostfile, instance_cores=instance_cores)
worker_validator = GridEngineWorkerValidator(cmd_executor=cmd_executor, host_storage=host_storage,
grid_engine=grid_engine, scale_down_handler=scale_down_handler)
action=lambda: self.cmd_executor.execute(GridEngine._SHUTDOWN_HOST_EXECUTION_DAEMON % host),
msg='Shutdown GE host execution daemon.',
action=lambda: self.cmd_executor.execute(GridEngine._REMOVE_HOST_FROM_HOST_GROUP % (host, hostgroup)),
msg='Remove host from host group.',
action=lambda: self.cmd_executor.execute(GridEngine._REMOVE_HOST_FROM_QUEUE_SETTINGS % (queue, host)),
msg='Remove host from queue settings.',
def kill_jobs(self, jobs, force=False):
"""
Kills jobs in GE.
:param jobs: Grid engine jobs.
:param force: Specifies if this command should be performed with -f flag.
"""
job_ids = []
for job in jobs:
job_id = str(job.id)
job_array_index = '' if not job.array or len(job.array) > 1 else ('.%s' % job.array[0])
job_ids.append(job_id + job_array_index)
self.cmd_executor.execute((GridEngine._FORCE_KILL_JOBS if force else GridEngine._KILL_JOBS) % ' '.join(job_ids))
def _parse_date(self, line, indentations):
return datetime.strptime(self._by_indent(line, indentations, 5), GridEngine._QSTAT_DATETIME_FORMAT)
def get_parallel_environment_slots(self, pe=_PARALLEL_ENVIRONMENT):
"""
Returns number of the parallel environment slots.
:param pe: Parallel environment to return number of slots for.
"""
return int(self.cmd_executor.execute(GridEngine._SHOW_PARALLEL_ENVIRONMENT_SLOTS % pe).strip())
def get_jobs(self):
"""
Executes command and parse its output. The expected output is something like the following:
job-ID prior name user state submit/start at queue slots ja-task-ID
-----------------------------------------------------------------------------------------------------------------
2 0.75000 sleep root r 12/21/2018 11:48:00 main.q@pipeline-38415 1
9 0.25000 sleep root qw 12/21/2018 12:39:38 1
11 0.25000 sleep root qw 12/21/2018 14:34:43 1 1-10:1
:return: Grid engine jobs list.
"""
lines = self.cmd_executor.execute_to_lines(GridEngine._QSTAT)
if len(lines) == 0:
return []
jobs = []
indentations = [lines[0].index(column) for column in GridEngine._QSTAT_COLUMNS]
for line in lines[2:]:
jobs.append(GridEngineJob(
id=self._by_indent(line, indentations, 0),
name=self._by_indent(line, indentations, 2),
user=self._by_indent(line, indentations, 3),
state=GridEngineJobState.from_letter_code(self._by_indent(line, indentations, 4)),
datetime=self._parse_date(line, indentations),
host=self._parse_host(line, indentations),
array=self._parse_array(line, indentations)
))
return jobs
def disable_host(self, host, queue=_MAIN_Q):
"""
Disables host to prevent receiving new jobs from the given queue.
This command does not abort currently running jobs.
:param host: Host to be enabled.
:param queue: Queue that host is a part of.
"""
self.cmd_executor.execute(GridEngine._QMOD_DISABLE % (queue, host))