Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def execute(self, command):
process = subprocess.Popen(command, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
out, err = process.communicate()
exit_code = process.wait()
if exit_code != 0:
Logger.warn('Command \'%s\' execution has failed due to %s.' % (command, err))
raise ExecutionError('Command \'%s\' execution has failed due to %s.' % (command, err))
return out
def scale_down(self, child_host):
"""
Kills pipeline, removes it from the GE cluster configuration.
Also removes host from master /etc/hosts and self.default_hostfile.
:param child_host: Host name of the pipeline to be killed.
:return: True if the pipeline killing went well, False otherwise.
"""
Logger.info('Disable additional worker with host=%s.' % child_host)
self.grid_engine.disable_host(child_host)
jobs = self.grid_engine.get_jobs()
disabled_host_jobs = [job for job in jobs if job.host == child_host]
if disabled_host_jobs:
Logger.warn('Disabled additional worker with host=%s has %s associated jobs. Scaling down is interrupted.'
% (child_host, len(disabled_host_jobs)))
Logger.info('Enable additional worker with host=%s again.' % child_host)
self.grid_engine.enable_host(child_host)
return False
self._decrease_parallel_environment_slots(self.instance_cores)
self._remove_host_from_grid_engine_configuration(child_host)
self._stop_pipeline(child_host)
self._remove_host_from_hosts(child_host)
self._remove_host_from_default_hostfile(child_host)
Logger.info('Additional worker with host=%s has been stopped.' % child_host, crucial=True)
return True
def is_valid(self, host):
"""
Validates host in GE checking corresponding execution host availability.
:param host: Host to be checked.
:return: True if execution host exists.
"""
try:
self.cmd_executor.execute_to_lines(GridEngine._SHOW_EXECUTION_HOST % host)
return True
except RuntimeError:
Logger.warn('Execution host %s in GE wasn\'t found.' % host)
return False
def _try_stop_worker(self, run_id):
try:
Logger.info('Stop pipeline with run_id=%s' % run_id)
self.executor.execute(GridEngineWorkerValidator._STOP_PIPELINE % run_id)
except:
Logger.warn('Invalid additional worker disabling has failed.')
def start(self):
while True:
try:
time.sleep(self.timeout)
self.worker_validator.validate_hosts()
self.autoscaler.scale()
except KeyboardInterrupt:
Logger.warn('Manual stop the autoscaler daemon.')
break
except Exception as e:
Logger.fail('Scaling step has failed due to %s.' % e)
def validate_hosts(self):
"""
Checks additional hosts if they are valid execution hosts in GE and kills invalid ones.
"""
hosts = self.host_storage.load_hosts()
Logger.info('Validate %s additional workers.' % len(hosts))
invalid_hosts = [host for host in hosts if not self.grid_engine.is_valid(host)]
for host in invalid_hosts:
Logger.warn('Invalid additional host %s was found. It will be downscaled.' % host)
run_id = self.scale_down_handler._get_run_id_from_host(host)
self._try_stop_worker(run_id)
self._try_disable_worker(host, run_id)
self._try_kill_invalid_host_jobs(host)
self.grid_engine.delete_host(host, skip_on_failure=True)
self._remove_worker_from_hosts(host)
self.host_storage.remove_host(host)
Logger.info('Additional hosts validation has finished.')
def _try_disable_worker(self, host, run_id):
try:
Logger.info('Disable worker in GE with run_id=%s' % run_id)
self.grid_engine.disable_host(host)
except:
Logger.warn('Invalid additional worker disabling has failed.')
def _perform_command(self, action, msg, error_msg, skip_on_failure):
Logger.info(msg)
try:
action()
except RuntimeError as e:
Logger.warn(error_msg)
if not skip_on_failure:
raise RuntimeError(error_msg, e)