Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class BridgeBusy(pydeconzException):
"""The Bridge is busy, too many requests (more than 20)."""
ERRORS = {
1: Unauthorized, # Unauthorized user
2: BadRequest, # Body contains invalid JSON
3: ResourceNotFound, # Resource not available
4: RequestError, # Method not available for resource
5: BadRequest, # Missing parameters in body
6: RequestError, # Parameter not available
7: RequestError, # Invalid value for parameter
8: RequestError, # Parameter is not modifiable
901: BridgeBusy, # May occur when sending too fast
}
def raise_error(error):
if error:
cls = ERRORS.get(error["type"], pydeconzException)
raise cls("{} {}".format(error["address"], error["description"]))
async def async_set(self, field, data, tries=0):
"""Set state of device."""
self.cancel_retry()
try:
await self._request("put", field, json=data)
except BridgeBusy:
LOGGER.debug("BridgeBusy, schedule retry %s %s", field, str(data))
def retry_set():
"""Retry set state."""
self._cancel_retry = None
self._loop.create_task(self.async_set(field, data, tries + 1))
if tries < 3:
retry_delay = 2 ** (tries + 1)
self._cancel_retry = self._loop.call_later(retry_delay, retry_set)