Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@tenacity.retry(stop=tenacity.stop_after_attempt(2),
retry=tenacity.retry_if_exception_type(CloudError),
reraise=True)
def _get_or_create_storage_account(self):
if self._storage_account:
return self._storage_account
else:
try:
self._storage_account = \
self.get_storage_account(self.storage_account)
except CloudError as cloud_error:
if cloud_error.error.error == "ResourceNotFound":
storage_account_params = {
'sku': {
'name': 'Standard_LRS'
},
'kind': 'storage',
@tenacity.retry(retry=tenacity.retry_if_exception_type(),
stop=tenacity.stop_after_attempt(
CONF.networking.max_retries),
wait=tenacity.wait_exponential(
multiplier=CONF.networking.retry_backoff,
min=CONF.networking.retry_interval,
max=CONF.networking.retry_max), reraise=True)
def execute(self, port_id, passive_failure=False):
"""Delete the network port."""
if port_id is None:
return
if self.execute.retry.statistics.get(constants.ATTEMPT_NUMBER, 1) == 1:
LOG.debug("Deleting network port %s", port_id)
else:
LOG.warning('Retrying network port %s delete attempt %s of %s.',
port_id,
self.execute.retry.statistics[
@retry(
wait=wait_exponential(multiplier=1, max=10),
stop=stop_after_attempt(3),
reraise=True,
retry=retry_if_exception_type(redis.ConnectionError),
after=log_retry_redis,
)
def _get_redis_rules(self):
""" Yields a traptor rule from redis. This function
expects that the redis keys are set up like follows:
traptor-::
For example,
traptor-follow:0:34
@tenacity.retry(
stop=tenacity.stop_after_attempt(attempts),
retry=tenacity.retry_if_exception_type(
exception.UpdateInProgress),
before=prepare_attempt,
wait=tenacity.wait_random(max=2),
reraise=True)
def create_replacement():
return resource_objects.Resource.replacement(self.context,
self.id,
update_data,
rs,
self._atomic_key)
@retry(
reraise=True,
wait=wait_exponential(multiplier=2),
stop=stop_after_attempt(5))
def pull_from_url(dep, configs):
'''
:param dep: name of a dependency
:param configs: a dict from dep_urls_py.yaml
:return: boolean
It downloads files form urls to a temp directory first in order to avoid
to deal with any temp files. It helps keep clean final directory.
'''
if dep in configs:
config = configs[dep]
dest_dir = '/'.join([LICENSE_DIR, dep])
cur_temp_dir = tempfile.mkdtemp()
@retry(wait=wait_fixed(2), stop=stop_after_attempt(3))
async def do_request_async(url: str, body, headers: dict):
timeout = aiohttp.ClientTimeout(total=25)
async with aiohttp.ClientSession() as session:
async with session.post(
url, data=body, headers=headers, timeout=timeout
) as response:
await response.text()
return response
@retry(wait=wait_fixed(2), stop=stop_after_attempt(5))
def delete(self):
proxmox = connect_proxmox()
proxmox.nodes(self.node).qemu(self.id).delete()
@retry
def get_auth(app):
auth = OIDCAuthentication(
app,
issuer=app.config['OIDC_ISSUER'],
client_registration_info=app.config['OIDC_CLIENT_CONFIG'])
return auth
@tenacity.retry(
stop=tenacity.stop_after_attempt(10),
wait=tenacity.wait_fixed(0.5),
retry=tenacity.retry_if_exception(retry_if_operationaborted)
)
def create_bucket(conn, name, region_name):
if region_name:
kwargs = dict(CreateBucketConfiguration={
"LocationConstraint": region_name,
})
else:
kwargs = {}
return conn.create_bucket(Bucket=name, **kwargs)
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=0.01),
reraise=True, stop=tenacity.stop_after_delay(60) |
tenacity.stop_after_attempt(100))
def neutron_agent_appears(neutron_client, binary):
"""Wait for Neutron agent to appear and return agent_id.
:param neutron_client: Neutron client
:type neutron_client: Pointer to Neutron client object
:param binary: Name of agent we want to appear
:type binary: str
:returns: result set from Neutron list_agents call
:rtype: dict
:raises: exceptions.NeutronAgentMissing
"""
result = neutron_client.list_agents(binary=binary)
for agent in result.get('agents', []):
agent_id = agent.get('id', None)