Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def try_wait_for_state_change(self, etcd_index, timeout=None):
# Don't sleep past the overall deadline (at least more than by 1s)
overall_timeout = max(self._rendezvous_deadline - time.time(), 0.0) + 1.0
timeout = overall_timeout if timeout is None else min(timeout, overall_timeout)
try:
self.client.watch(
self.get_path("/rdzv/active_version"), index=etcd_index, timeout=timeout
)
except (etcd.EtcdEventIndexCleared, etcd.EtcdWatchTimedOut):
pass
if time.time() > self._rendezvous_deadline:
raise RendezvousTimeoutException()
# Unfortunately, we have to do another fetch in order to get last etcd_index.
return self.get_rdzv_state()
# We just have to wait until something changes and re-check.
try:
overall_timeout = (
max(self._rendezvous_deadline - time.time(), 0.0) + 1.0
)
self.client.watch(
key=self.get_path("/rdzv"),
index=active_version.etcd_index + 1,
recursive=True,
timeout=overall_timeout,
)
except (etcd.EtcdEventIndexCleared, etcd.EtcdWatchTimedOut):
pass
if time.time() > self._rendezvous_deadline:
raise RendezvousTimeoutException()
active_version, state = self.get_rdzv_state()
# Dis-own our lease in the previous rendezvous, if exists
if self._lease_this_rank_stop is not None:
self._lease_this_rank_stop.set()
return self.init_phase()
except EtcdRendezvousRetryImmediately:
# The type of failure suggests we can retry without delay
pass
except EtcdRendezvousRetryableFailure:
# In case of retryable failure, wait a small delay
# to avoid spamming etcd
time.sleep(1)
except RendezvousTimeoutException:
log.info("Rendezvous timeout occured in EtcdRendezvousHandler")
raise
except RendezvousClosedException:
log.info(
f"Rendezvous for run_id={self._run_id} was observed to be closed"
)
raise
except RendezvousNonRetryableError:
raise
except Exception as e:
# In case of a general exception, wait a small delay
# to avoid spamming etcd
# FIXME: there are a few things that fall under this like