Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def request_coroutine():
async with aiohttp.ClientSession(auth=auth, loop=loop) as session:
server = AiohttpClient(session, server_url)
f = getattr(server, endpoint)
response = await f(*args)
return response.data.result
try:
async def request_coroutine():
async with aiohttp.ClientSession(auth=auth) as session:
server = AiohttpClient(session, server_url, timeout=timeout)
f = getattr(server, endpoint)
response = await f(*args)
return response.data.result
try:
@ignore_exceptions
@log_exceptions
async def sync_with_remote_watchtower(self):
import aiohttp
from jsonrpcclient.clients.aiohttp_client import AiohttpClient
class myAiohttpClient(AiohttpClient):
async def request(self, *args, **kwargs):
r = await super().request(*args, **kwargs)
return r.data.result
while True:
await asyncio.sleep(5)
watchtower_url = self.config.get('watchtower_url')
if not watchtower_url:
continue
try:
async with make_aiohttp_session(proxy=self.network.proxy) as session:
watchtower = myAiohttpClient(session, watchtower_url)
for chan in self.channels.values():
await self.sync_channel_with_watchtower(chan, watchtower)
except aiohttp.client_exceptions.ClientConnectorError:
self.logger.info(f'could not contact remote watchtower {watchtower_url}')
async def init_client(self):
self.session = ClientSession(loop=self.loop)
self.api_client = AiohttpClient(
self.session,
self.endpoint,
ssl=context
)
async def main(loop):
async with aiohttp.ClientSession(loop=loop) as session:
client = AiohttpClient(session, "http://localhost:5000")
requests = [Request("ping"), Notification("ping"), Request("ping")]
response = await client.send(requests)
for data in response.data:
if data.ok:
print("{}: {}".format(data.id, data.result))
else:
logging.error("%d: %s", data.id, data.message)
async def main(loop):
async with aiohttp.ClientSession(loop=loop) as session:
client = AiohttpClient(session, "http://localhost:5000")
response = await client.request("ping")
print(response.data.result)