Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_enqueue_duplicate_with_same_kw_args(self):
# Setup
task1 = Task(kwargs, kwargs={'foo':1, 'bar':2})
self.queue.enqueue(task1)
self.assertEqual(1, len(list(self.queue._TaskQueue__storage._all_tasks())))
# Test
task2 = Task(kwargs, kwargs={'foo':1, 'bar':2})
self.queue.enqueue(task2, unique=True)
# Verify
self.assertEqual(1, len(list(self.queue._TaskQueue__storage._all_tasks())))
def test_at_time(self):
now = datetime.now()
then = timedelta(seconds=10)
at = AtScheduler(now + then)
task = Task(noop, scheduler=at)
self.queue.enqueue(task)
time.sleep(8)
self.assertTrue(task.state is task_waiting, 'state is %s' % task.state)
time.sleep(3)
self.assertTrue(task.state is task_finished, 'state is %s' % task.state)
def test_enqueue_duplicate_with_different_schedulers(self):
at = AtScheduler(datetime.now() + timedelta(minutes=10))
task1 = Task(noop, scheduler=at)
task2 = Task(noop, scheduler=ImmediateScheduler())
self.queue.enqueue(task1, True)
enqueued_2 = self.queue.enqueue(task2, True)
self.assertTrue(enqueued_2)
def test_task_with_immediate_scheduler_deserialization(self):
task1 = Task(noop, scheduler=ImmediateScheduler())
snapshot = task1.snapshot()
task2 = snapshot.to_task()
self.assertTrue(isinstance(task2.scheduler, ImmediateScheduler))
def test_find_multiple_matching(self):
# Setup
task1 = Task(noop)
task2 = Task(noop)
self.queue.enqueue(task1)
self.queue.enqueue(task2)
# Test
found = self.queue.find(state=task_waiting)
# Verify
self.assertTrue(task2 in found)
def test_task_result(self):
task = Task(result)
task.run()
self.assertTrue(task.state == task_finished)
self.assertTrue(task.result is True)
def _enqueue_three_tasks(self):
task1 = Task(noop)
task1.scheduled_time = 3
task2 = Task(noop)
task2.scheduled_time = 2
task3 = Task(noop)
task3.scheduled_time = 1
for t in (task1, task2, task3):
self.storage.enqueue_waiting(t)
def test_task_dispatch_with_scheduled_time(self):
delay_seconds = timedelta(seconds=10)
schduler = AtScheduler(datetime.now(dateutils.local_tz()) + delay_seconds)
task = Task(noop, scheduler=schduler)
self.queue.enqueue(task)
start_time = datetime.now()
self._wait_for_task(task, timeout=timedelta(seconds=20))
end_time = datetime.now()
self.assertTrue(task.state == task_finished)
self.assertTrue(end_time - start_time > delay_seconds)
def test_find_multiple_matching(self):
# Setup
task1 = Task(noop)
task2 = Task(noop)
self.queue.enqueue(task1)
self.queue.enqueue(task2)
# Test
found = self.queue.find(state=task_waiting)
# Verify
self.assertTrue(task2 in found)
def init_culling_task():
interval = datetime.timedelta(days=1)
tz = dateutils.local_tz()
now = datetime.datetime.now(tz)
if now.hour > 1:
now += interval
start_time = datetime.datetime(now.year, now.month, now.day, 1, tzinfo=tz)
scheduler = IntervalScheduler(interval, start_time)
task = Task(cull_history, scheduler=scheduler)
async.enqueue(task)