Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
aio_pool = ThreadPoolExecutor(1)
self.loop = asyncio.new_event_loop()
self.run_until_complete = partial(run_in_thread, aio_pool, self.loop)
else:
self.loop = loop
self.run_until_complete = self.loop.run_until_complete
asyncio.set_event_loop(self.loop)
# Setup queues
self.http_queue = asyncio.Queue(loop=self.loop)
self.http_tokens = asyncio.Queue(maxsize=self.max_conn, loop=self.loop)
self.ftp_queue = asyncio.Queue(loop=self.loop)
self.ftp_tokens = asyncio.Queue(maxsize=self.max_conn, loop=self.loop)
for i in range(self.max_conn):
self.http_tokens.put_nowait(Token(i + 1))
self.ftp_tokens.put_nowait(Token(i + 1))
self.loop = asyncio.new_event_loop()
self.run_until_complete = partial(run_in_thread, aio_pool, self.loop)
else:
self.loop = loop
self.run_until_complete = self.loop.run_until_complete
asyncio.set_event_loop(self.loop)
# Setup queues
self.http_queue = asyncio.Queue(loop=self.loop)
self.http_tokens = asyncio.Queue(maxsize=self.max_conn, loop=self.loop)
self.ftp_queue = asyncio.Queue(loop=self.loop)
self.ftp_tokens = asyncio.Queue(maxsize=self.max_conn, loop=self.loop)
for i in range(self.max_conn):
self.http_tokens.put_nowait(Token(i + 1))
self.ftp_tokens.put_nowait(Token(i + 1))