Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_pool_worker(self):
tx = context.Queue()
rx = context.Queue()
worker = PoolWorker(tx, rx, 1)
worker.start()
self.assertTrue(worker.is_alive())
tx.put_nowait((1, mapper, (5,), {}))
await asyncio.sleep(0.5)
result = rx.get_nowait()
self.assertEqual(result, (1, 10))
self.assertFalse(worker.is_alive()) # maxtasks == 1
async def test_pool_worker(self):
tx = context.Queue()
rx = context.Queue()
worker = PoolWorker(tx, rx, 1)
worker.start()
self.assertTrue(worker.is_alive())
tx.put_nowait((1, mapper, (5,), {}))
await asyncio.sleep(0.5)
result = rx.get_nowait()
self.assertEqual(result, (1, 10))
self.assertFalse(worker.is_alive()) # maxtasks == 1