Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
requests = prot.parse_request("""[
{"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"},
{"jsonrpc": "2.0", "method": "notify_hello", "params": [7]},
{"jsonrpc": "2.0", "method": "subtract", "params": [42,23], "id": "2"},
{"foo": "boo"},
{"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"},
{"jsonrpc": "2.0", "method": "get_data", "id": "9"}
]""")
assert isinstance(requests[3], JSONRPCInvalidRequestError)
responses = requests.create_batch_response()
responses.append(requests[0].respond(7))
responses.append(requests[2].respond(19))
responses.append(requests[3].error_respond())
responses.append(requests[4].error_respond(MethodNotFoundError('foo.get')))
responses.append(requests[5].respond(['hello', 5]))
assert _json_equal("""[
{"jsonrpc": "2.0", "result": 7, "id": "1"},
{"jsonrpc": "2.0", "result": 19, "id": "2"},
{"jsonrpc": "2.0", "error": {"code": -32600, "message": "Invalid Request"}, "id": null},
{"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": "5"},
{"jsonrpc": "2.0", "result": ["hello", 5], "id": "9"}
]""",
responses.serialize())
def test_jsonrpc_spec_v2_example4(prot):
request = prot.create_request('foobar')
request.unique_id = str(1)
assert _json_equal(
"""{"jsonrpc": "2.0", "method": "foobar", "id": "1"}""",
request.serialize()
)
response = request.error_respond(MethodNotFoundError('foobar'))
assert _json_equal(
"""{"jsonrpc": "2.0", "error": {"code": -32601, "message":
"Method not found"}, "id": "1"}""",
response.serialize()
)
def _get_code_and_message(error):
assert isinstance(error, (Exception, six.string_types))
if isinstance(error, Exception):
if hasattr(error, "msgpackrpc_error_code"):
code = error.msgpackrpc_error_code
msg = str(error)
elif isinstance(error, InvalidRequestError):
code = MSGPACKRPCInvalidRequestError.msgpackrpc_error_code
msg = MSGPACKRPCInvalidRequestError.message
elif isinstance(error, MethodNotFoundError):
code = MSGPACKRPCMethodNotFoundError.msgpackrpc_error_code
msg = MSGPACKRPCMethodNotFoundError.message
else:
# allow exception message to propagate
code = MSGPACKRPCServerError.msgpackrpc_error_code
msg = str(error)
else:
code = -32000
msg = error
return code, msg
def _get_code_message_and_data(error: Union[Exception, str]
) -> Tuple[int, str, Any]:
assert isinstance(error, (Exception, str))
data = None
if isinstance(error, Exception):
if hasattr(error, 'jsonrpc_error_code'):
code = error.jsonrpc_error_code
msg = str(error)
try:
data = error.data
except AttributeError:
pass
elif isinstance(error, InvalidRequestError):
code = JSONRPCInvalidRequestError.jsonrpc_error_code
msg = JSONRPCInvalidRequestError.message
elif isinstance(error, MethodNotFoundError):
code = JSONRPCMethodNotFoundError.jsonrpc_error_code
msg = JSONRPCMethodNotFoundError.message
elif isinstance(error, InvalidParamsError):
code = JSONRPCInvalidParamsError.jsonrpc_error_code
msg = JSONRPCInvalidParamsError.message
else:
# allow exception message to propagate
code = JSONRPCServerError.jsonrpc_error_code
if len(error.args) == 2:
msg = str(error.args[0])
data = error.args[1]
else:
msg = str(error)
else:
code = -32000
msg = error