Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_interrupt_tasks_fails_to_interrupt_another_worker_task(self):
w = Worker.objects.create(
worker_key='other-worker',
name='other-worker',
)
t = Task.objects.create(
worker=w,
arch=self._arch,
channel=self._channel,
owner=self._user,
state=TASK_STATES['OPEN'],
)
req = _make_request(self._worker)
with self.assertRaises(Task.DoesNotExist):
worker.interrupt_tasks(req, [t.id])
def test_worker_count(self):
arch = Arch.objects.create(name='i386', pretty_name='32 bit')
self.assertEquals(arch.worker_count, 0)
worker = Worker.objects.create(
worker_key='mock-worker',
name='mock-worker',
)
worker.arches.add(arch)
worker.save()
self.assertEquals(arch.worker_count, 1)
def test_export_with_arch_and_channel(self):
worker = Worker.objects.create(worker_key='worker', name='Worker')
worker.arches.add(self._arch)
worker.channels.add(self._channel)
worker.save()
data = worker.export()
self.assertTrue(data['id'] > 0)
self.assertEquals(data['name'], 'Worker')
self.assertEquals(data['arches'][0]['name'], 'i386')
self.assertEquals(data['channels'][0]['name'], 'test')
self.assertEquals(data['enabled'], True)
self.assertEquals(data['max_load'], 1)
self.assertEquals(data['ready'], True)
self.assertEquals(data['task_count'], 0)
self.assertEquals(data['current_load'], 0)
self.assertEquals(data['version'], '1.0.0')
def test_fail_task_fails_to_fail_another_worker_task(self):
w = Worker.objects.create(
worker_key='other-worker',
name='other-worker',
)
t = Task.objects.create(
worker=w,
arch=self._arch,
channel=self._channel,
owner=self._user,
state=TASK_STATES['OPEN'],
)
req = _make_request(self._worker)
with self.assertRaises(Task.DoesNotExist):
worker.fail_task(req, t.id, '')
def test_ready(self):
worker1 = Worker.objects.create(worker_key='worker-1', name='Worker 1', enabled=True)
Task.objects.create(
worker=worker1,
arch=self._arch,
channel=self._channel,
owner=self._user,
method='DummyTask',
state=TASK_STATES['OPEN'],
weight=100
)
worker1.save()
worker2 = Worker.objects.create(worker_key='worker-2', name='Worker 2', enabled=True)
workers = Worker.objects.ready()
def test_enabled(self):
Worker.objects.create(worker_key='worker-disabled', name='Worker disabled', enabled=False)
worker = Worker.objects.create(worker_key='worker-enabled', name='Worker enabled', enabled=True)
workers = Worker.objects.enabled()
self.assertEquals(len(workers), 1)
self.assertEquals(workers[0].id, worker.id)
def test_assign_task_fails_to_assing_another_worker_task(self):
w = Worker.objects.create(
worker_key='other-worker',
name='other-worker',
)
t = Task.objects.create(
worker=w,
arch=self._arch,
channel=self._channel,
owner=self._user,
state=TASK_STATES['FREE'],
)
req = _make_request(self._worker)
with self.assertRaises(Exception):
worker.assign_task(req, t.id)
def test_save_creates_worker_key_if_empty(self):
worker = Worker.objects.create(name='Worker')
self.assertIsNotNone(worker.worker_key)
self.assertEquals(len(worker.worker_key), 64)