Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# but this is ok for us - just means we'll restart from beginning.
log.info("Observed existing rendezvous state: " + str(state))
if state["status"] == "closed":
raise RendezvousClosedException()
if state["status"] == "joinable":
return self.join_phase(state["version"])
if state["status"] == "final":
self.handle_existing_rendezvous(state["version"])
raise EtcdRendezvousRetryImmediately()
# We observed some intermediate state, which is best handled by retrying later
self.try_wait_for_state_change(etcd_index=active_version.etcd_index + 1)
raise EtcdRendezvousRetryableFailure()
def wait_for_final(self, expected_version):
active_version, state = self.get_rdzv_state()
while True:
if state["status"] == "final" and state["version"] == expected_version:
# Succcess. This rendezvous is final, and we accept it.
return active_version
elif state["status"] == "frozen" and state["version"] == expected_version:
# Continue waiting for any interesting events.
active_version, state = self.try_wait_for_state_change(
etcd_index=active_version.etcd_index + 1
)
else:
# No valid transition possible at this point
raise EtcdRendezvousRetryableFailure(
"Rendezvous state transition no longer possible. Must re-enter."
)
def join_rendezvous(self, expected_version):
# Use compare-and-swap to add self to rendezvous state:
while True:
cas_delay()
active_version, state = self.get_rdzv_state()
if state["status"] != "joinable":
raise EtcdRendezvousRetryableFailure(
"Rendezvous state became non-joinable before we could join. "
"Must join next one."
)
if state["version"] != expected_version:
raise EtcdRendezvousRetryImmediately(
"Rendezvous version changed. Must try join the new one."
)
assert (
len(state["participants"]) < self._num_max_workers
), "Logic error: joinable rendezvous should always have space left"
this_rank = len(state["participants"])
state["participants"].append(this_rank)
def wait_for_peers(self, expected_version):
active_version, state = self.get_rdzv_state()
while True:
if state["status"] == "frozen" and state["version"] == expected_version:
# Success, all peers arrived.
return active_version
elif state["status"] == "joinable" and state["version"] == expected_version:
# Continue waiting for any interesting events.
active_version, state = self.try_wait_for_state_change(
etcd_index=active_version.etcd_index + 1
)
else:
# No valid transition possible at this point
raise EtcdRendezvousRetryableFailure(
"Rendezvous state transition no longer possible. Must re-enter."
)
# Here we expect to see state
# Exit gracefully if either:
# 1. state becomes
# 2. timeout happens (reaching deadline), in which case
# we try the tranisiton to
# Exit with exception otherwise.
active_version, state = self.get_rdzv_state()
while True:
if state["status"] == "frozen" and state["version"] == expected_version:
# Worker set became frozen before last-call timeout. This is possible
# when num_max_workers is reached before the tiemout.
return
if state["status"] != "joinable" or state["version"] != expected_version:
raise EtcdRendezvousRetryableFailure(
"Rendezvous state transition no longer possible. Must re-enter."
)
# If timeout occurred, attempt a state transition (joinable -> frozen)
if time.time() >= deadline:
state["status"] = "frozen"
state["keep_alives"] = []
try:
active_version = self.client.test_and_set(
key=self.get_path("/rdzv/active_version"),
value=json.dumps(state),
prev_value=active_version.value,
ttl=CONST_ETCD_FROZEN_TTL,
)
# We successfully made this rendezvous frozen.
return
if time.time() > self._rendezvous_deadline:
raise RendezvousTimeoutException()
log.info("Attempting to join next rendezvous")
try:
# Dis-own our lease in the previous rendezvous, if exists
if self._lease_this_rank_stop is not None:
self._lease_this_rank_stop.set()
return self.init_phase()
except EtcdRendezvousRetryImmediately:
# The type of failure suggests we can retry without delay
pass
except EtcdRendezvousRetryableFailure:
# In case of retryable failure, wait a small delay
# to avoid spamming etcd
time.sleep(1)
except RendezvousTimeoutException:
log.info("Rendezvous timeout occured in EtcdRendezvousHandler")
raise
except RendezvousClosedException:
log.info(
f"Rendezvous for run_id={self._run_id} was observed to be closed"
)
raise
except RendezvousNonRetryableError:
raise