Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
jmsg = response.serialize()
assert isinstance(jmsg, bytes)
jmsg = jmsg.decode()
decoded = json.loads(jmsg)
print("decoded=", decoded)
assert decoded['error']['code'] == -32000
assert decoded['error']['message'] == custom_msg
assert decoded['error']['data'] == data
# on the client side, when reply is parsed
parsed_reply = prot.parse_reply(jmsg)
serialized_reply = parsed_reply.serialize().decode("utf-8")
decoded_reply = json.loads(serialized_reply)
print("decoded_reply=", decoded_reply)
assert isinstance(parsed_reply, JSONRPCErrorResponse)
assert hasattr(parsed_reply, "data")
assert serialized_reply == jmsg
assert decoded_reply == decoded
jmsg = response.serialize()
assert isinstance(jmsg, bytes)
jmsg = jmsg.decode()
decoded = json.loads(jmsg)
print("decoded=", decoded)
assert decoded['error']['code'] == -32700
assert decoded['error']['message'] == JSONRPCParseError.message
assert decoded['error']['data'] == data
# on the client side, when reply is parsed
parsed_reply = prot.parse_reply(jmsg)
serialized_reply = parsed_reply.serialize().decode("utf-8")
decoded_reply = json.loads(serialized_reply)
print("decoded_reply=", decoded_reply)
assert isinstance(parsed_reply, JSONRPCErrorResponse)
assert hasattr(parsed_reply, "data")
assert serialized_reply == jmsg
assert decoded_reply == decoded
def error_respond(self) -> 'JSONRPCErrorResponse':
"""Converts the error to an error response object.
:return: An error response object ready to be serialized and sent to the client.
:rtype: :py:class:`JSONRPCErrorResponse`
"""
response = JSONRPCErrorResponse()
response.error = self.message
response.unique_id = self.request_id
response._jsonrpc_error_code = self.jsonrpc_error_code
if hasattr(self, 'data'):
response.data = self.data
return response
if 'jsonrpc' not in rep:
raise InvalidReplyError('Missing jsonrpc (version) in response.')
if rep['jsonrpc'] != self.JSON_RPC_VERSION:
raise InvalidReplyError('Wrong JSONRPC version')
if 'id' not in rep:
raise InvalidReplyError('Missing id in response')
if ('error' in rep) and ('result' in rep):
raise InvalidReplyError(
'Reply must contain exactly one of result and error.'
)
if 'error' in rep:
response = JSONRPCErrorResponse()
error = rep['error']
response.error = error["message"]
response._jsonrpc_error_code = error["code"]
if "data" in error:
response.data = error["data"]
else:
response = JSONRPCSuccessResponse()
response.result = rep.get('result', None)
response.unique_id = rep['id']
return response
def __init__(
self, error: Union['JSONRPCErrorResponse', Dict[str, Any]]
) -> None:
if isinstance(error, JSONRPCErrorResponse):
super(JSONRPCError, self).__init__(error.error)
self._jsonrpc_error_code = error._jsonrpc_error_code
if hasattr(error, 'data'):
self.data = error.data
else:
super(JSONRPCError, self).__init__()
self.message = error['message']
self._jsonrpc_error_code = error['code']
if 'data' in error:
self.data = error['data']