Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _maybe_update_failure_count(self, is_healthy, reason):
if not is_healthy:
log.warning('Health check failure: %s' % reason)
self._current_consecutive_failures += 1
if self._current_consecutive_failures > self._max_consecutive_failures:
log.warning('Reached consecutive failure limit.')
self._healthy = False
self._reason = reason
else:
if self._current_consecutive_failures > 0:
log.debug('Reset consecutive failures counter.')
self._current_consecutive_failures = 0
def run(self):
self.runner._run_plan(self.runner._finalizing_plan)
log.debug('TaskRunnerStage[FINALIZING]: Finalization remaining: %s' %
self.runner._finalization_remaining())
if self.runner.deadlocked(self.runner._finalizing_plan):
log.warning('Finalizing plan deadlocked.')
return None
if self.runner._finalization_remaining() > 0 and not self.runner._finalizing_plan.is_complete():
return min(self.runner._finalization_remaining(), self.MAX_ITERATION_WAIT.as_(Time.SECONDS))
while current_watermark < self._watermarks[process_name]:
last_pos = fp.tell()
record = rr.try_read()
if record is None:
break
new_watermark = record.process_status.seq
if new_watermark > self._watermarks[process_name]:
log.debug('Over-seeked %s [watermark = %s, high watermark = %s], rewinding.' % (
process_name, new_watermark, self._watermarks[process_name]))
fp.seek(last_pos)
break
current_watermark = new_watermark
records += 1
if current_watermark < self._watermarks[process_name]:
log.warning('Only able to fast forward to %s@sequence=%s, high watermark is %s' % (
process_name, current_watermark, self._watermarks[process_name]))
if records:
log.debug('Fast forwarded %s %s record(s) to seq=%s.' % (process_name, records,
current_watermark))
runner_update = rr.try_read()
if not runner_update:
break
try:
self._dispatcher.dispatch(self._runnerstate, runner_update)
except CheckpointDispatcher.InvalidSequenceNumber as e:
log.error('Checkpoint stream is corrupt: %s' % e)
break
new_ckpt_head = fp.tell()
updated = self._ckpt_head != new_ckpt_head
self._ckpt_head = new_ckpt_head
return updated
except OSError as e:
if e.errno == errno.ENOENT:
# The log doesn't yet exist, will retry later.
log.warning('Could not read from discovered task %s.' % self._task_id)
return False
else:
raise
def __call__(self, endpoint, use_post_method=False, expected_response=None):
"""Returns a (boolean, string|None) tuple of (call success, failure reason)"""
try:
response = self.query(endpoint, '' if use_post_method else None).strip().lower()
if expected_response is not None and response != expected_response:
def shorten(string):
return (string if len(string) < self.FAILURE_REASON_LENGTH
else "%s..." % string[:self.FAILURE_REASON_LENGTH - 3])
reason = 'Response differs from expected response (expected "%s", got "%s")'
log.warning(reason % (expected_response, response))
return (False, reason % (shorten(str(expected_response)), shorten(str(response))))
else:
return (True, None)
except self.QueryError as e:
return (False, str(e))
def synthesize_url(scheduler_client, role=None, env=None, job=None):
scheduler_url = scheduler_client.url
if not scheduler_url:
log.warning("Unable to find scheduler web UI!")
return None
if env and not role:
die('If env specified, must specify role')
if job and not (role or env):
die('If job specified, must specify role and env')
scheduler_url = urljoin(scheduler_url, 'scheduler')
if role:
scheduler_url += '/' + role
if env:
scheduler_url += '/' + env
if job:
scheduler_url += '/' + job
return scheduler_url
def terminate_runner(self, as_loss=False):
"""
Terminate the underlying runner process, if it exists.
"""
if self._kill_signal.is_set():
log.warning('Duplicate kill/lose signal received, ignoring.')
return
self._kill_signal.set()
if self.is_alive:
sig = 'SIGUSR2' if as_loss else 'SIGUSR1'
log.info('Runner is alive, sending %s' % sig)
try:
self._popen.send_signal(getattr(signal, sig))
except OSError as e:
log.error('Got OSError sending %s: %s' % (sig, e))
else:
log.info('Runner is dead, skipping kill.')
def _reconnect(self):
"""Reconnect to ZK and update endpoints once complete."""
for _ in range(self._retries):
try:
self._zk.restart()
self._start()
break
except ZooKeeper.ConnectionTimeout:
log.warning('Connection establishment to %r timed out, retrying.' % self._zk)
else:
raise ServerSetClient.ReconnectFailed('Re-establishment of connection to ZK servers failed')
def _construct_scheduler(self):
"""
Populates:
self._scheduler
self._client
"""
self._scheduler = SchedulerClient.get(self.cluster, verbose=self.verbose)
assert self._scheduler, "Could not find scheduler (cluster = %s)" % self.cluster
start = time.time()
while (time.time() - start) < self.CONNECT_MAXIMUM_WAIT.as_(Time.SECONDS):
try:
self._client = self._scheduler.get_thrift_client()
break
except SchedulerClient.CouldNotConnect as e:
log.warning('Could not connect to scheduler: %s' % e)
if not self._client:
raise self.TimeoutError('Timed out trying to connect to scheduler at %s' % self.cluster)
def synthesize_url(scheduler_url, role=None, env=None, job=None, update_id=None):
if not scheduler_url:
log.warning("Unable to find scheduler web UI!")
return None
if env and not role:
die('If env specified, must specify role')
if job and not (role and env):
die('If job specified, must specify role and env')
scheduler_url = urljoin(scheduler_url, 'scheduler')
if role:
scheduler_url += '/' + role
if env:
scheduler_url += '/' + env
if job:
scheduler_url += '/' + job
if update_id:
scheduler_url += '/update/' + update_id