Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_cluster_initialized_without_leader(leader=False, failover=None, sync=None, cluster_config=None):
m1 = Member(0, 'leader', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5435/postgres',
'api_url': 'http://127.0.0.1:8008/patroni', 'xlog_location': 4})
leader = Leader(0, 0, m1) if leader else None
m2 = Member(0, 'other', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
'api_url': 'http://127.0.0.1:8011/patroni',
'state': 'running',
'pause': True,
'tags': {'clonefrom': True},
'scheduled_restart': {'schedule': "2100-01-01 10:53:07.560445+00:00",
'postgres_version': '99.0.0'}})
syncstate = SyncState(0 if sync else None, sync and sync[0], sync and sync[1])
return get_cluster(SYSID, leader, [m1, m2], failover, syncstate, cluster_config)
def test_process_healthy_standby_cluster_as_cascade_replica(self):
self.p.is_leader = false
self.p.name = 'replica'
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.assertEqual(self.ha.run_cycle(), 'no action. i am a secondary and i am following a standby leader')
with patch.object(Leader, 'conn_url', PropertyMock(return_value='')):
self.assertEqual(self.ha.run_cycle(), 'continue following the old known standby leader')
history = history and TimelineHistory.from_node(history.modifiedIndex, history.value)
# get last leader operation
last_leader_operation = nodes.get(self._LEADER_OPTIME)
last_leader_operation = 0 if last_leader_operation is None else int(last_leader_operation.value)
# get list of members
members = [self.member(n) for k, n in nodes.items() if k.startswith(self._MEMBERS) and k.count('/') == 1]
# get leader
leader = nodes.get(self._LEADER)
if leader:
member = Member(-1, leader.value, None, {})
member = ([m for m in members if m.name == leader.value] or [member])[0]
index = result.etcd_index if result.etcd_index > leader.modifiedIndex else leader.modifiedIndex + 1
leader = Leader(index, leader.ttl, member)
# failover key
failover = nodes.get(self._FAILOVER)
if failover:
failover = Failover.from_node(failover.modifiedIndex, failover.value)
# get synchronization state
sync = nodes.get(self._SYNC)
sync = SyncState.from_node(sync and sync.modifiedIndex, sync and sync.value)
cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
except etcd.EtcdKeyNotFound:
cluster = Cluster(None, None, None, None, [], None, None, None)
except Exception as e:
self._handle_exception(e, 'get_cluster', raise_ex=EtcdError('Etcd is not responding properly'))
self._has_failed = False
def rewind(self, leader):
if self.is_running() and not self.stop(checkpoint=False):
return logger.warning('Can not run pg_rewind because postgres is still running')
# prepare pg_rewind connection
r = leader.conn_kwargs(self._rewind_credentials)
# 1. make sure that we are really trying to rewind from the master
# 2. make sure that pg_control contains the new timeline by:
# running a checkpoint or
# waiting until Patroni on the master will expose checkpoint_after_promote=True
checkpoint_status = leader.checkpoint_after_promote if isinstance(leader, Leader) else None
if checkpoint_status is None: # master still runs the old Patroni
leader_status = self.checkpoint(leader.conn_kwargs(self._superuser))
if leader_status:
return logger.warning('Can not use %s for rewind: %s', leader.name, leader_status)
elif not checkpoint_status:
return logger.info('Waiting for checkpoint on %s before rewind', leader.name)
elif not self.check_leader_is_not_in_recovery(**r):
return
if self.pg_rewind(r):
self._rewind_state = REWIND_STATUS.SUCCESS
elif not self.check_leader_is_not_in_recovery(**r):
logger.warning('Failed to rewind because master %s become unreachable', leader.name)
else:
logger.error('Failed to rewind from healty master: %s', leader.name)
# get last leader operation
last_leader_operation = nodes.get(self._LEADER_OPTIME)
last_leader_operation = 0 if last_leader_operation is None else int(last_leader_operation['value'])
# get list of members
members = [self.member(n) for k, n in nodes.items() if k.startswith(self._MEMBERS) and k.count('/') == 1]
# get leader
leader = nodes.get(self._LEADER)
if not self._ctl and leader and leader['value'] == self._name and self._lease != leader.get('lease'):
logger.warning('I am the leader but not owner of the lease')
if leader:
member = Member(-1, leader['value'], None, {})
member = ([m for m in members if m.name == leader['value']] or [member])[0]
leader = Leader(leader['mod_revision'], leader['lease'], member)
# failover key
failover = nodes.get(self._FAILOVER)
if failover:
failover = Failover.from_node(failover['mod_revision'], failover['value'])
# get synchronization state
sync = nodes.get(self._SYNC)
sync = SyncState.from_node(sync and sync['mod_revision'], sync and sync['value'])
cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
except UnsupportedEtcdVersion:
raise
except Exception as e:
self._handle_exception(e, 'get_cluster', raise_ex=EtcdError('Etcd is not responding properly'))
self._has_failed = False
history = nodes.get(self._HISTORY)
history = history and TimelineHistory.from_node(history['index'], history['value'])
# get last leader operation
last_leader_operation = nodes.get(self._LEADER_OPTIME)
last_leader_operation = 0 if last_leader_operation is None else int(last_leader_operation['value'])
# get list of members
members = [self.member(k, n) for k, n in nodes.items() if k.startswith(self._MEMBERS) and k.count('/') == 1]
# get leader
leader = nodes.get(self._LEADER)
if leader:
member = Member(-1, leader['value'], None, {})
member = ([m for m in members if m.name == leader['value']] or [member])[0]
leader = Leader(leader['index'], None, member)
# failover key
failover = nodes.get(self._FAILOVER)
if failover:
failover = Failover.from_node(failover['index'], failover['value'])
# get synchronization state
sync = nodes.get(self._SYNC)
sync = SyncState.from_node(sync and sync['index'], sync and sync['value'])
return Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
# get list of members
members = [self.member(n) for k, n in nodes.items() if k.startswith(self._MEMBERS) and k.count('/') == 1]
# get leader
leader = nodes.get(self._LEADER)
if not self._ctl and leader and leader['Value'] == self._name \
and self._session != leader.get('Session', 'x'):
logger.info('I am leader but not owner of the session. Removing leader node')
self._client.kv.delete(self.leader_path, cas=leader['ModifyIndex'])
leader = None
if leader:
member = Member(-1, leader['Value'], None, {})
member = ([m for m in members if m.name == leader['Value']] or [member])[0]
leader = Leader(leader['ModifyIndex'], leader.get('Session'), member)
# failover key
failover = nodes.get(self._FAILOVER)
if failover:
failover = Failover.from_node(failover['ModifyIndex'], failover['Value'])
# get synchronization state
sync = nodes.get(self._SYNC)
sync = SyncState.from_node(sync and sync['ModifyIndex'], sync and sync['Value'])
return Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
except NotFound:
return Cluster(None, None, None, None, [], None, None, None)
except Exception:
logger.exception('get_cluster')
raise ConsulError('Consul is not responding properly')
members = self.load_members(sync_standby) if self._MEMBERS[:-1] in nodes else []
# get leader
leader = self.get_node(self.leader_path) if self._LEADER in nodes else None
if leader:
client_id = self._client.client_id
if not self._ctl and leader[0] == self._name and client_id is not None \
and client_id[0] != leader[1].ephemeralOwner:
logger.info('I am leader but not owner of the session. Removing leader node')
self._client.delete(self.leader_path)
leader = None
if leader:
member = Member(-1, leader[0], None, {})
member = ([m for m in members if m.name == leader[0]] or [member])[0]
leader = Leader(leader[1].version, leader[1].ephemeralOwner, member)
self._fetch_cluster = member.index == -1
# failover key
failover = self.get_node(self.failover_path, watch=self.cluster_watcher) if self._FAILOVER in nodes else None
failover = failover and Failover.from_node(failover[1].version, failover[0])
self._cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
self._leader_observed_record = leader_record
self._leader_observed_time = time.time()
leader = leader_record.get(self._LEADER)
try:
ttl = int(leader_record.get('ttl')) or self._ttl
except (TypeError, ValueError):
ttl = self._ttl
if not metadata or not self._leader_observed_time or self._leader_observed_time + ttl < time.time():
leader = None
if metadata:
member = Member(-1, leader, None, {})
member = ([m for m in members if m.name == leader] or [member])[0]
leader = Leader(metadata.resource_version, None, member)
# failover key
failover = nodes.get(self.failover_path)
metadata = failover and failover.metadata
failover = Failover.from_node(metadata and metadata.resource_version,
metadata and (metadata.annotations or {}).copy())
# get synchronization state
sync = nodes.get(self.sync_path)
metadata = sync and sync.metadata
sync = SyncState.from_node(metadata and metadata.resource_version, metadata and metadata.annotations)
return Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
except Exception:
logger.exception('get_cluster')
raise KubernetesError('Kubernetes API is not responding properly')