Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test(self, *_):
response = HTTPClient("http://test/").request("http://foo", "foo")
assert response.data.result == "bar"
def test(self):
responses.add(
responses.POST,
"http://foo",
status=200,
body='{"jsonrpc": "2.0", "result": 5, "id": 1}',
)
HTTPClient("http://foo").send_message(
str(Request("foo")), response_expected=True
)
def test(self, *_):
response = HTTPClient("http://test/").notify("http://foo", "foo")
assert response.data.result == "bar"
def test_init_default_headers(self):
client = HTTPClient("http://test/")
# Default headers
assert client.session.headers["Content-Type"] == "application/json"
assert client.session.headers["Accept"] == "application/json"
# Ensure the Requests default_headers are also there
assert "Connection" in client.session.headers
try:
self.logger.debug("calling url: {}".format(url))
request = str(Request(
method="whisper.validate",
senderId=sender_id,
receiverName=target_user_name,
))
request_and_hash = self.private_key + request
sign_hash = hashlib.md5(request_and_hash.encode('utf-8')).hexdigest()
self.logger.info("request_and_hash: {}".format(request_and_hash))
self.logger.info("sign_hash: {}".format(sign_hash))
client = HTTPClient(url)
client.session.headers.update({
"Content-Type": "application/json-rpc",
"X-RPC-SIGN": sign_hash
})
response = client.send(request).data
except Exception as e:
self.logger.error("could not call remote endpoint {}: {}".format(url, str(e)))
self.env.capture_exception(sys.exc_info())
self.logger.exception(e)
return True, ErrorCodes.REMOTE_ERROR
if response is None:
self.logger.error("received None response for jsonrpc call")
return True, ErrorCodes.OK
from jsonrpcclient.clients.http_client import HTTPClient
client = HTTPClient("http://localhost:5000")
response = client.request("ping")
print(response.data.result)
def __init__(self, node: str, devel=False):
self.client = HTTPClient(node)
self.account = Account(self.client)
self.chain = Chain(self.client)
self.engine = Engine(self.client)
self.mempool = Mempool(self.client)
self.net = Net(self.client)
if devel:
self.devel = Devel(self.client)
def send(endpoint: str, *args: list, **kwargs: dict) -> Response:
from .clients.http_client import HTTPClient
return HTTPClient(endpoint).send(*args, **kwargs)
def notify(endpoint: str, *args: list, **kwargs: dict) -> Response:
from .clients.http_client import HTTPClient
return HTTPClient(endpoint).request(*args, **kwargs)
from jsonrpcclient.clients.http_client import HTTPClient
response = HTTPClient("http://localhost:5000/").request("ping")
if response.result.ok:
print(response.data.result)
else:
print("Error: {}".format(response.data.message))