Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _process_output(res, parse_json=True):
"""Process output."""
res_payload = res.payload.decode('utf-8')
output = res_payload.strip()
_LOGGER.debug('Status: %s, Received: %s', res.code, output)
if not output:
return None
if not res.code.is_successful():
if 128 <= res.code < 160:
raise ClientError(output)
elif 160 <= res.code < 192:
raise ServerError(output)
if not parse_json:
return output
return json.loads(output)
def _process_output(output, parse_json=True):
"""Process output."""
output = output.strip()
_LOGGER.debug('Received: %s', output)
if not output:
return None
elif 'decrypt_verify' in output:
raise RequestError(
'Please compile coap-client without debug output. See '
'instructions at '
'https://github.com/ggravlingen/pytradfri#installation')
elif output.startswith(CLIENT_ERROR_PREFIX):
raise ClientError(output)
elif output.startswith(SERVER_ERROR_PREFIX):
raise ServerError(output)
elif not parse_json:
return output
return json.loads(output)
async def _get_response(self, msg):
"""Perform the request, get the response."""
try:
protocol = await self._get_protocol()
pr = protocol.request(msg)
r = await pr.response
return pr, r
except ConstructionRenderableError as e:
raise ClientError("There was an error with the request.", e)
except RequestTimedOut as e:
await self._reset_protocol(e)
raise RequestTimeout('Request timed out.', e)
except (OSError, socket.gaierror, Error) as e:
# aiocoap sometimes raises an OSError/socket.gaierror too.
# aiocoap issue #124
await self._reset_protocol(e)
raise ServerError("There was an error with the request.", e)
except asyncio.CancelledError as e:
await self._reset_protocol(e)
raise e