Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def join_rendezvous(self, expected_version):
# Use compare-and-swap to add self to rendezvous state:
while True:
cas_delay()
active_version, state = self.get_rdzv_state()
if state["status"] != "joinable":
raise EtcdRendezvousRetryableFailure(
"Rendezvous state became non-joinable before we could join. "
"Must join next one."
)
if state["version"] != expected_version:
raise EtcdRendezvousRetryImmediately(
"Rendezvous version changed. Must try join the new one."
)
assert (
len(state["participants"]) < self._num_max_workers
), "Logic error: joinable rendezvous should always have space left"
this_rank = len(state["participants"])
state["participants"].append(this_rank)
# When reaching min workers, or changing state to frozen, we'll set
# the active_version node to be ephemeral.
if len(state["participants"]) == self._num_max_workers:
state["status"] = "frozen"
state["keep_alives"] = []
set_ttl = CONST_ETCD_FROZEN_TTL
def announce_self_waiting(self, expected_version):
while True:
cas_delay()
active_version, state = self.get_rdzv_state()
if state["status"] != "final" or state["version"] != expected_version:
raise EtcdRendezvousRetryImmediately()
# Increment counter to signal an additional waiting worker.
state["num_workers_waiting"] += 1
try:
active_version = self.client.test_and_set(
key=self.get_path("/rdzv/active_version"),
value=json.dumps(state),
prev_value=active_version.value,
)
return active_version
except etcd.EtcdCompareFailed:
log.info("Announce self as waiting CAS unsuccessful, retrying")
def confirm_membership(self, expected_version, this_rank):
# Compare-and-swap loop
while True:
cas_delay()
active_version, state = self.get_rdzv_state()
if state["status"] != "frozen":
raise EtcdRendezvousRetryImmediately(
"Rendezvous no longer frozen, before we confirmed. "
"Must join next one"
)
if state["version"] != expected_version:
raise EtcdRendezvousRetryImmediately(
"Rendezvous version changed. Must try join the new one."
)
this_lease_key = self.get_path(
"/rdzv/v_{}/rank_{}".format(expected_version, this_rank)
)
self.client.set(this_lease_key, value=None, ttl=CONST_WORKER_KEEPALIVE_TTL)
state["keep_alives"].append(this_lease_key)
if len(state["keep_alives"]) == len(state["participants"]):
# Everyone confirmed (this rank is last to do so)
def confirm_membership(self, expected_version, this_rank):
# Compare-and-swap loop
while True:
cas_delay()
active_version, state = self.get_rdzv_state()
if state["status"] != "frozen":
raise EtcdRendezvousRetryImmediately(
"Rendezvous no longer frozen, before we confirmed. "
"Must join next one"
)
if state["version"] != expected_version:
raise EtcdRendezvousRetryImmediately(
"Rendezvous version changed. Must try join the new one."
)
this_lease_key = self.get_path(
"/rdzv/v_{}/rank_{}".format(expected_version, this_rank)
)
self.client.set(this_lease_key, value=None, ttl=CONST_WORKER_KEEPALIVE_TTL)
state["keep_alives"].append(this_lease_key)
if len(state["keep_alives"]) == len(state["participants"]):
# Everyone confirmed (this rank is last to do so)
state["status"] = "final"
state["num_workers_waiting"] = 0
finalize = True
else:
finalize = False
log.info("New rendezvous state created: " + str(state))
except etcd.EtcdAlreadyExist:
active_version, state = self.get_rdzv_state()
# Note: it is possible for above query to fail (etcd.EtcdKeyNotFound),
# but this is ok for us - just means we'll restart from beginning.
log.info("Observed existing rendezvous state: " + str(state))
if state["status"] == "closed":
raise RendezvousClosedException()
if state["status"] == "joinable":
return self.join_phase(state["version"])
if state["status"] == "final":
self.handle_existing_rendezvous(state["version"])
raise EtcdRendezvousRetryImmediately()
# We observed some intermediate state, which is best handled by retrying later
self.try_wait_for_state_change(etcd_index=active_version.etcd_index + 1)
raise EtcdRendezvousRetryableFailure()