Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ReceivedErrorResponse(self):
with self.assertRaises(exceptions.ReceivedErrorResponse):
raise exceptions.ReceivedErrorResponse(1, 'Member not found')
def test_ReceivedErrorResponse(self):
with self.assertRaises(exceptions.ReceivedErrorResponse):
raise exceptions.ReceivedErrorResponse(1, 'Member not found')
def test_handle_response_notification_with_error_without_data(self):
response = '{"jsonrpc": "2.0", "error": {"code": -32000, "message": "Not Found"}, "id": null}'
with self.assertRaises(exceptions.ReceivedErrorResponse) as e:
self.server._handle_response(response)
self.assertEqual(e.exception.code, -32000)
self.assertEqual(e.exception.message, 'Not Found')
self.assertEqual(e.exception.data, None)
if not expected_response and 'result' in response_dict:
raise exceptions.InvalidResponse()
# Validate the response against the Response schema
try:
jsonschema.validate(
response_dict,
json.loads(pkgutil.get_data(
__name__, 'response-schema.json').decode('utf-8')))
except jsonschema.ValidationError:
raise exceptions.InvalidResponse()
# If the response was "error", raise it, to ensure it's handled
if 'error' in response_dict:
raise exceptions.ReceivedErrorResponse(
response_dict['error']['code'],
response_dict['error']['message'])
# Otherwise, surely we have a result to return
print(response_dict['result'])
return None
if isinstance(result, types.AsyncGeneratorType):
try:
stream = StreamResponse()
await stream.prepare(request)
async for block in result:
await stream.write(block)
await stream.write_eof()
return stream
# Halt handling
except asyncio.CancelledError:
logger.warn('halted response')
return stream
# regilar response
else:
return json_response(result, request=request)
except ReceivedErrorResponse as e:
return json_response(response.error(e.message), request=request)
except Exception as e:
logger.exception("exc")
return json_response(response.error(str(e)), request=request)