Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def rendezvous_barrier(self):
self._destroy_process_group()
try:
self.store, self.rank, self.world_size = self.rendezvous.next_rendezvous()
except RendezvousClosedException:
# Sets the local variable to True
self.stop_training = True
raise StopException(
"Rank {0} received RendezvousClosedException."
" Raising a StopException".format(self.rank)
)
except (RuntimeError, Exception) as e:
raise NonRetryableException(
"Rank {0} received an Exception."
" Detailed message: {1}".format(self.rank, str(e))
)
log.info(
"Got next rendezvous: rank {0}, world size {1}".format(
self.rank, self.world_size
)
)
def init_phase(self):
try:
active_version = self.try_create_rendezvous()
state = json.loads(active_version.value)
log.info("New rendezvous state created: " + str(state))
except etcd.EtcdAlreadyExist:
active_version, state = self.get_rdzv_state()
# Note: it is possible for above query to fail (etcd.EtcdKeyNotFound),
# but this is ok for us - just means we'll restart from beginning.
log.info("Observed existing rendezvous state: " + str(state))
if state["status"] == "closed":
raise RendezvousClosedException()
if state["status"] == "joinable":
return self.join_phase(state["version"])
if state["status"] == "final":
self.handle_existing_rendezvous(state["version"])
raise EtcdRendezvousRetryImmediately()
# We observed some intermediate state, which is best handled by retrying later
self.try_wait_for_state_change(etcd_index=active_version.etcd_index + 1)
raise EtcdRendezvousRetryableFailure()
return self.init_phase()
except EtcdRendezvousRetryImmediately:
# The type of failure suggests we can retry without delay
pass
except EtcdRendezvousRetryableFailure:
# In case of retryable failure, wait a small delay
# to avoid spamming etcd
time.sleep(1)
except RendezvousTimeoutException:
log.info("Rendezvous timeout occured in EtcdRendezvousHandler")
raise
except RendezvousClosedException:
log.info(
f"Rendezvous for run_id={self._run_id} was observed to be closed"
)
raise
except RendezvousNonRetryableError:
raise
except Exception as e:
# In case of a general exception, wait a small delay
# to avoid spamming etcd
# FIXME: there are a few things that fall under this like
# etcd.EtcdKeyNotFound, etc, which could be handled more explicitly.
log.info("Rendezvous attempt failed, will retry. Reason: " + str(e))
time.sleep(1)