Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
headers.update(MYQ_HEADERS)
# The MyQ API can time out if multiple concurrent requests are made, so
# ensure that only one gets through at a time:
async with self._lock:
for attempt in (0, DEFAULT_REQUEST_RETRIES):
try:
async with self._websession.request(
method, url, headers=headers, params=params, json=json, **kwargs
) as resp:
data = await resp.json(content_type=None)
resp.raise_for_status()
return data
except ClientError as err:
if "401" in str(err) and login_request:
raise InvalidCredentialsError(
"Invalid username/password"
)
if attempt == DEFAULT_REQUEST_RETRIES - 1:
raise RequestError(
"Error requesting data from {0}: {1}".format(
url, data.get("description", str(err))
)
)
wait_for = min(2 ** attempt, 5)
_LOGGER.warning(
"Device update failed; trying again in %s seconds", wait_for
)