Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_order(self):
_q = janus.PriorityQueue()
q = _q.async_q
for i in [1, 3, 2]:
q.put_nowait(i)
items = [q.get_nowait() for _ in range(3)]
self.assertEqual([1, 2, 3], items)
self.assertFalse(_q._sync_mutex.locked())
_q.close()
await _q.wait_closed()
q = _q.sync_q
self.assertEqual(q.maxsize, 5)
_q.close()
await _q.wait_closed()
class QueueTest(BaseQueueTestMixin, unittest.TestCase):
type2test = janus.Queue
class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase):
type2test = janus.LifoQueue
class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase):
type2test = janus.PriorityQueue
# A Queue subclass that can provoke failure at a moment's notice :)
class FailingQueueException(Exception):
pass
class FailingQueue(janus.Queue):
def __init__(self, *args, **kwargs):
self.fail_next_put = False
self.fail_next_get = False
super().__init__(*args, **kwargs)
def _put(self, item):
if self.fail_next_put:
self.fail_next_put = False
self.assertFalse(_q._sync_mutex.locked())
_q.close()
await _q.wait_closed()
class QueueJoinTests(_QueueJoinTestMixin, unittest.TestCase):
q_class = janus.Queue
class LifoQueueJoinTests(_QueueJoinTestMixin, unittest.TestCase):
q_class = janus.LifoQueue
class PriorityQueueJoinTests(_QueueJoinTestMixin, unittest.TestCase):
q_class = janus.PriorityQueue