Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
log.debug('Schedule pass:')
running = list(plan.running)
log.debug('running: %s', ' '.join(plan.running))
log.debug('finished: %s', ' '.join(plan.finished))
launched = []
for process_name in plan.running:
if self.is_process_lost(process_name):
self._set_process_status(process_name, ProcessState.LOST)
now = self._clock.time()
runnable = list(plan.runnable_at(now))
waiting = list(plan.waiting_at(now))
log.debug('runnable: %s', ' '.join(runnable))
log.debug('waiting: %s', ' '.join(
'%s[T-%.1fs]' % (process, plan.get_wait(process)) for process in waiting))
def pick_processes(process_list):
if self._task.max_concurrency().get() == 0:
return process_list
num_to_pick = max(self._task.max_concurrency().get() - len(running), 0)
return process_list[:num_to_pick]
for process_name in pick_processes(runnable):
tp = self._task_processes.get(process_name)
if tp:
current_run = self._current_process_run(process_name)
assert current_run.state == ProcessState.WAITING
else:
self._set_process_status(process_name, ProcessState.WAITING)
tp = self._task_processes[process_name]
self._task_monitor = task_monitor # exposes PIDs, sandbox
self._task_id = task_monitor._task_id
log.debug('Initialising resource collection for task %s' % self._task_id)
self._process_collectors = dict() # ProcessStatus => ProcessTreeCollector
# TODO(jon): sandbox is also available through task_monitor, but typically the first checkpoint
# isn't written (and hence the header is not available) by the time we initialise here
self._sandbox = sandbox
self._process_collector_factory = process_collector
self._disk_collector = disk_collector(self._sandbox)
self._process_collection_interval = process_collection_interval.as_(Time.SECONDS)
self._disk_collection_interval = disk_collection_interval.as_(Time.SECONDS)
min_collection_interval = min(self._process_collection_interval, self._disk_collection_interval)
history_length = int(history_time.as_(Time.SECONDS) / min_collection_interval)
if history_length > self.MAX_HISTORY:
raise ValueError("Requested history length too large")
log.debug("Initialising ResourceHistory of length %s" % history_length)
self._history = ResourceHistory(history_length)
self._kill_signal = threading.Event()
threading.Thread.__init__(self)
self.daemon = True
'USER': username,
'PATH': os.environ['PATH']
})
wrapped_cmdline = self.wrapped_cmdline(cwd)
log.debug('Wrapped cmdline: %s', wrapped_cmdline)
real_thermos_profile_path = os.path.join(
os.environ['MESOS_DIRECTORY'],
TASK_FILESYSTEM_MOUNT_POINT,
thermos_profile.lstrip('/')) if taskfs_isolated else thermos_profile
if os.path.exists(real_thermos_profile_path):
env.update(BASH_ENV=thermos_profile)
log.debug('ENV is: %s', env)
subprocess_args = {
'args': wrapped_cmdline,
'close_fds': self.FD_CLOEXEC,
'cwd': subprocess_cwd,
'env': env,
'pathspec': self._pathspec
}
log_destination_resolver = LogDestinationResolver(
self._pathspec,
destination=self._logger_destination,
mode=self._logger_mode,
rotate_log_size=self._rotate_log_size,
rotate_log_backups=self._rotate_log_backups)
stdout, stderr, handlers_are_files = log_destination_resolver.get_handlers()
if handlers_are_files:
def _get_tasks_by_instance_id(self, instance_ids):
log.debug('Querying instance statuses.')
query = TaskQuery()
query.owner = Identity(role=self._job_key.role)
query.environment = self._job_key.environment
query.jobName = self._job_key.name
query.statuses = set([ScheduleStatus.RUNNING])
query.instanceIds = instance_ids
try:
resp = self._scheduler.getTasksStatus(query)
except IOError as e:
log.error('IO Exception during scheduler call: %s' % e)
return []
tasks = []
if resp.responseCode == ResponseCode.OK:
tasks = resp.result.scheduleStatusResult.tasks
def stats_uploader_daemon(self, stats):
"""
Starts the StatsUploader as a daemon process if it is already not running
"""
log.debug("Checking if the statsUploaderDaemon is already running")
stats_pid = os.path.join("/tmp", self._user, ".pid_stats")
stats_uploader_dir = os.path.join("/tmp", self._user)
dirutil.safe_mkdir(stats_uploader_dir)
if not os.path.exists(stats_pid):
log.debug("Starting the daemon")
stats_log_file = os.path.join("/tmp", self._user, "buildtime_uploader")
log.debug("The logs are written to %s" % stats_log_file)
if spawn_daemon(pidfile=stats_pid, quiet=True):
su = StatsUploader(STATS_COLLECTION_URL, STATS_COLLECTION_PORT, STATS_COLLECTION_ENDPOINT,
self._max_delay, self._get_default_stats_file(), self._user,
self._force_upload)
su.upload_sync(stats)
def wait_start(self, timeout=MAX_WAIT):
log.debug('Waiting for task to start.')
def is_started():
return self._monitor and (self._monitor.active or self._monitor.finished)
waited = Amount(0, Time.SECONDS)
while waited < timeout:
if not is_started():
log.debug(' - sleeping...')
self._clock.sleep(self.POLL_INTERVAL.as_(Time.SECONDS))
waited += self.POLL_INTERVAL
else:
break
if not self.is_alive:
if self._popen_rc != 0:
raise TaskError('Task failed: %s' % self.compute_status().reason)
else:
# We can end up here if the process exited between the call to Popen and
# waitpid (in is_alive), which is fine.
log.info('Task runner exited: %s' % self.compute_status().reason)
break
if not is_started():
log.error('Task did not start with in deadline, forcing loss.')
def is_healthy(self):
"""Check whether the TaskRunner is healthy. A healthy TaskRunner is not deadlocked and has not
reached its max_failures count."""
max_failures = self._task.max_failures().get()
deadlocked = self.deadlocked()
under_failure_limit = max_failures == 0 or len(self._regular_plan.failed) < max_failures
log.debug('max_failures:%d failed:%d under_failure_limit:%s deadlocked:%s ==> health:%s',
max_failures, len(self._regular_plan.failed), under_failure_limit, deadlocked,
not deadlocked and under_failure_limit)
return not deadlocked and under_failure_limit
def enqueue_request(self, event=None):
request = self.request()
if event:
request.update(event=event)
url = '%s?%s' % (self._tracker, urllib.urlencode(request))
log.debug('Sending tracker request: %s' % url)
self._http_client.fetch(url, self.handle_response)
log.debug('Tracker request sent')
def _restart_instances(self, instance_data):
"""Instructs the scheduler to batch-restart instances.
Arguments:
instance_data -- list of InstanceData to restart.
"""
instance_ids = [data.instance_id for data in instance_data]
log.debug('Batch restarting instances: %s' % instance_ids)
resp = self._scheduler.restartShards(self._job_key, instance_ids, self._lock)
self._check_and_log_response(resp)
log.debug('Done batch restarting instances: %s' % instance_ids)