Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_request(self):
req = AsyncRequest({"jsonrpc": "2.0", "method": "foo", "id": 1})
response = await req.call(methods)
self.assertEqual("bar", response["result"])
def __missing__(self, key):
methods.add(self.d.create_passthrough(key), key)
return self[key]
@methods.add
async def ShowNodeList(params):
from gateway import gateway_singleton
return gateway_singleton.handle_jsonrpc_request('ShowNodeList', params)
@methods.add
async def icx_getScoreApi(**request_params):
Logger.debug(f'json_rpc_server icx_getScoreApi!', TBEARS_LOG_TAG)
method = 'icx_getScoreApi'
request = {'method': method, 'params': request_params}
response = await get_icon_inner_task().query(request)
return response_to_json_query(response)
@methods.add
async def GetRouterInfo(params):
from gateway import gateway_singleton
return gateway_singleton.handle_wallet_request('GetRouterInfo', params)
def __init__(self):
self.app = web.Application()
self.chain = BlockchainClient(self.app)
methods._items = HandlerCreator(self)
if config.BLOCKCHAIN_ENABLED:
self.app.on_startup.append(self.setup_db)
self.app.on_startup.append(self.process_completions)
self.app.on_startup.append(self.create_event_task)
self.app.on_cleanup.append(self.cancel_event_task)
self.app.on_cleanup.append(self.cleanup_db)
cors = aiohttp_cors.setup(self.app)
resource = cors.add(self.app.router.add_resource("/"))
cors.add(
resource.add_route("POST", self.handle), {
"*": aiohttp_cors.ResourceOptions(
allow_credentials=False,
allow_headers=("X-Requested-With", "Content-Type"),
max_age=3600