Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_too_many_tries(self):
retry = Retry(delay=0)
self.assertRaises(RetryFailedError, retry, self._fail(times=999))
self.assertEqual(retry._attempts, 1)
def test_maximum_delay(self):
retry = Retry(delay=10, max_tries=100)
retry(self._fail(times=10))
self.assertTrue(retry._cur_delay < 4000, retry._cur_delay)
# gevent's sleep function is picky about the type
self.assertEqual(type(retry._cur_delay), float)
def query(self, sql, *params, **kwargs):
if not kwargs.get('retry', False):
return self.server.query(sql, *params)
retry = Retry(delay=1, retry_exceptions=PostgresConnectionException)
return retry(self.server.query, sql, *params)
self.__thread_ident = current_thread().ident
self.slots_handler = SlotsHandler(self)
self._callback_executor = CallbackExecutor()
self.__cb_called = False
self.__cb_pending = None
self.cancellable = CancellableSubprocess()
self._sysid = None
self.retry = Retry(max_tries=-1, deadline=config['retry_timeout']/2.0, max_delay=1,
retry_exceptions=PostgresConnectionException)
# Retry 'pg_is_in_recovery()' only once
self._is_leader_retry = Retry(max_tries=1, deadline=config['retry_timeout']/2.0, max_delay=1,
retry_exceptions=PostgresConnectionException)
self._role_lock = Lock()
self.set_role(self.get_postgres_role_from_data_directory())
self._state_entry_timestamp = None
self._cluster_info_state = {}
self._cached_replica_timeline = None
# Last known running process
self._postmaster_proc = None
if self.is_running():
self.set_state('running')
self.set_role('master' if self.is_leader() else 'replica')
self.config.write_postgresql_conf() # we are "joining" already running postgres
def copy(self):
"""Return a clone of this retry manager"""
return Retry(max_tries=self.max_tries, delay=self.delay, backoff=self.backoff,
max_jitter=self.max_jitter / 100.0, max_delay=self.max_delay, sleep_func=self.sleep_func,
deadline=self.deadline, retry_exceptions=self.retry_exceptions)
def __init__(self, config):
super(Consul, self).__init__(config)
self._scope = config['scope']
self._session = None
self.__do_not_watch = False
self._retry = Retry(deadline=config['retry_timeout'], max_delay=1, max_tries=-1,
retry_exceptions=(ConsulInternalError, HTTPException,
HTTPError, socket.error, socket.timeout))
kwargs = {}
if 'url' in config:
r = urlparse(config['url'])
config.update({'scheme': r.scheme, 'host': r.hostname, 'port': r.port or 8500})
elif 'host' in config:
host, port = split_host_port(config.get('host', '127.0.0.1:8500'), 8500)
config['host'] = host
if 'port' not in config:
config['port'] = int(port)
if config.get('cacert'):
config['ca_cert'] = config.pop('cacert')
def __init__(self, cluster_name):
self.available = False
self.cluster_name = cluster_name if cluster_name is not None else 'unknown'
self._retry = Retry(deadline=300, max_delay=30, max_tries=-1, retry_exceptions=(boto.exception.StandardError,))
try:
# get the instance id
r = requests.get('http://169.254.169.254/latest/dynamic/instance-identity/document', timeout=2.1)
except RequestException:
logger.error('cannot query AWS meta-data')
return
if r.ok:
try:
content = r.json()
self.instance_id = content['instanceId']
self.region = content['region']
except Exception:
logger.exception('unable to fetch instance id and region from AWS meta-data')
return
self.available = True