Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_manual_failover_from_leader_in_pause(self):
self.ha.has_lock = true
self.ha.is_paused = true
scheduled = datetime.datetime.now()
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, scheduled))
self.assertEqual('PAUSE: no action. i am the leader with the lock', self.ha.run_cycle())
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, '', None))
self.assertEqual('PAUSE: no action. i am the leader with the lock', self.ha.run_cycle())
def test_manual_failover_process_no_leader(self):
self.p.is_leader = false
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'leader', None))
self.p.set_role('replica')
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, self.p.name, '', None))
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
self.ha.fetch_node_status = get_node_status(reachable=False) # inaccessible, in_recovery
self.p.set_role('replica')
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
# set failover flag to True for all members of the cluster
# this should elect the current member, as we are not going to call the API for it.
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None))
self.ha.fetch_node_status = get_node_status(nofailover=True) # accessible, in_recovery
self.p.set_role('replica')
def test_manual_failover_while_starting(self):
self.ha.has_lock = true
self.p.check_for_startup = true
f = Failover(0, self.p.name, '', None)
self.ha.cluster = get_cluster_initialized_with_leader(f)
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
# Failover scheduled time must include timezone
scheduled = datetime.datetime.now()
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, scheduled))
self.ha.run_cycle()
scheduled = datetime.datetime.utcnow().replace(tzinfo=tzutc)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, scheduled))
self.assertEqual('no action. i am the leader with the lock', self.ha.run_cycle())
scheduled = scheduled + datetime.timedelta(seconds=30)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, scheduled))
self.assertEqual('no action. i am the leader with the lock', self.ha.run_cycle())
scheduled = scheduled + datetime.timedelta(seconds=-600)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, scheduled))
self.assertEqual('no action. i am the leader with the lock', self.ha.run_cycle())
scheduled = None
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, scheduled))
self.assertEqual('no action. i am the leader with the lock', self.ha.run_cycle())
def test_manual_failover_from_leader(self):
self.ha.fetch_node_status = get_node_status()
self.ha.has_lock = true
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', '', None))
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', 'blabla', None))
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
f = Failover(0, self.p.name, '', None)
self.ha.cluster = get_cluster_initialized_with_leader(f)
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
self.ha._rewind.rewind_or_reinitialize_needed_and_possible = true
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
self.ha.fetch_node_status = get_node_status(nofailover=True)
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.fetch_node_status = get_node_status(watchdog_failed=True)
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.fetch_node_status = get_node_status(timeline=1)
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.fetch_node_status = get_node_status(wal_position=1)
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
def test_manual_failover_from_leader(self):
self.ha.fetch_node_status = get_node_status()
self.ha.has_lock = true
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', '', None))
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', 'blabla', None))
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
f = Failover(0, self.p.name, '', None)
self.ha.cluster = get_cluster_initialized_with_leader(f)
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
self.ha._rewind.rewind_or_reinitialize_needed_and_possible = true
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
self.ha.fetch_node_status = get_node_status(nofailover=True)
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.fetch_node_status = get_node_status(watchdog_failed=True)
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.fetch_node_status = get_node_status(timeline=1)
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
def test_manual_failover_from_leader(self):
self.ha.fetch_node_status = get_node_status()
self.ha.has_lock = true
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', '', None))
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', 'blabla', None))
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
f = Failover(0, self.p.name, '', None)
self.ha.cluster = get_cluster_initialized_with_leader(f)
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
self.ha._rewind.rewind_or_reinitialize_needed_and_possible = true
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
self.ha.fetch_node_status = get_node_status(nofailover=True)
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.fetch_node_status = get_node_status(watchdog_failed=True)
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.fetch_node_status = get_node_status(timeline=1)
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.fetch_node_status = get_node_status(wal_position=1)
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
# manual failover from the previous leader to us won't happen if we hold the nofailover flag
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
client_id = self._client.client_id
if not self._ctl and leader[0] == self._name and client_id is not None \
and client_id[0] != leader[1].ephemeralOwner:
logger.info('I am leader but not owner of the session. Removing leader node')
self._client.delete(self.leader_path)
leader = None
if leader:
member = Member(-1, leader[0], None, {})
member = ([m for m in members if m.name == leader[0]] or [member])[0]
leader = Leader(leader[1].version, leader[1].ephemeralOwner, member)
self._fetch_cluster = member.index == -1
# failover key
failover = self.get_node(self.failover_path, watch=self.cluster_watcher) if self._FAILOVER in nodes else None
failover = failover and Failover.from_node(failover[1].version, failover[0])
self._cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
last_leader_operation = 0 if last_leader_operation is None else int(last_leader_operation['value'])
# get list of members
members = [self.member(k, n) for k, n in nodes.items() if k.startswith(self._MEMBERS) and k.count('/') == 1]
# get leader
leader = nodes.get(self._LEADER)
if leader:
member = Member(-1, leader['value'], None, {})
member = ([m for m in members if m.name == leader['value']] or [member])[0]
leader = Leader(leader['index'], None, member)
# failover key
failover = nodes.get(self._FAILOVER)
if failover:
failover = Failover.from_node(failover['index'], failover['value'])
# get synchronization state
sync = nodes.get(self._SYNC)
sync = SyncState.from_node(sync and sync['index'], sync and sync['value'])
return Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
ttl = int(leader_record.get('ttl')) or self._ttl
except (TypeError, ValueError):
ttl = self._ttl
if not metadata or not self._leader_observed_time or self._leader_observed_time + ttl < time.time():
leader = None
if metadata:
member = Member(-1, leader, None, {})
member = ([m for m in members if m.name == leader] or [member])[0]
leader = Leader(metadata.resource_version, None, member)
# failover key
failover = nodes.get(self.failover_path)
metadata = failover and failover.metadata
failover = Failover.from_node(metadata and metadata.resource_version,
metadata and (metadata.annotations or {}).copy())
# get synchronization state
sync = nodes.get(self.sync_path)
metadata = sync and sync.metadata
sync = SyncState.from_node(metadata and metadata.resource_version, metadata and metadata.annotations)
return Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
except Exception:
logger.exception('get_cluster')
raise KubernetesError('Kubernetes API is not responding properly')