Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_check_wait_without_subtasks(self):
t = Task.objects.create(
worker=self._worker,
arch=self._arch,
channel=self._channel,
owner=self._user,
state=TASK_STATES['FREE'],
)
req = _make_request(self._worker)
finished, unfinished = worker.check_wait(req, t.id)
self.assertEqual(len(finished), 0)
self.assertEqual(len(unfinished), 0)
def test_cancel_task(self):
task_id = Task.create_task(self._user.username, 'task', 'method')
ret = client.cancel_task(_make_request(), task_id)
self.assertIsNone(ret)
t = Task.objects.get(id=task_id)
self.assertEqual(t.state, TASK_STATES['CANCELED'])
def test_get_query_search_filter_authenticated(self):
task = Task.objects.create(
worker=self._worker,
arch=self._arch,
channel=self._channel,
owner=self._user,
method='TaskMethod',
label='Task 1',
state=TASK_STATES['FREE'],
)
Task.objects.create(
worker=self._worker,
arch=self._arch,
channel=self._channel,
owner=self._user,
method='TaskMethod',
label='Task 2',
def test_time_task_not_started(self):
t = Task(dt_started=None, dt_finished=None)
self.assertIsNone(t.time)
def _create_task(self, **kwargs):
params = {
'worker': self._worker,
'arch': self._arch,
'channel': self._channel,
'owner': self._user,
'method': 'DummyTask',
'state': TASK_STATES['FREE'],
}
params.update(kwargs)
return Task.objects.create(**params)
def test_get_time_display_task_not_started(self):
t = Task(dt_started=None, dt_finished=None)
self.assertEquals(t.get_time_display(), '')
query = Q()
if search:
query |= Q(method__icontains=search)
query |= Q(owner__username__icontains=search)
query |= Q(label__icontains=search)
if my and call_if_callable(request.user.is_authenticated):
query &= Q(owner=request.user)
if self.state is not None:
query &= Q(state__in=self.state)
#if self.kwargs:
# query &= Q(self.kwargs)
return Task.objects.filter(parent__isnull=True).filter(query).order_by(*self.order_by).defer("result", "args").select_related("owner", "worker")
def shutdown_worker(request, worker_name, kill=False):
"""shutdown_worker(worker_name, kill): None
Send shutdown request to a worker.
If kill argument is set to True, kill worker immediately,
otherwise wait until all tasks assigned to the are finished.
"""
return models.Task.create_shutdown_task(request.user.username, worker_name, kill=kill)
def __init__(self, *args, **kwargs):
self.logs = TaskLogs(self)
traceback = kwargs.pop("traceback", None)
if traceback:
self.logs["traceback.log"] = traceback
stdout = kwargs.pop("stdout", None)
if stdout:
self.logs["stdout.log"] = stdout
super(Task, self).__init__(*args, **kwargs)