Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
{
'iteration': {...},
'change': 2
}
]
},
{
# worker_1 assignment
'assignment': {...},
'change': 2,
'iterations': []
}
],
}
"""
task = Task.objects.get(id=task_id)
revert_iteration = Iteration.objects.get(id=iteration_id)
first_iteration = (
revert_iteration.assignment.iterations
.order_by('start_datetime').first())
if revert_before and revert_iteration != first_iteration:
raise InvalidRevertError(
"Cannot revert before an iteration that isn't the first of "
"its assignment.")
task_audit = _build_revert_audit(task, revert_iteration, revert_before)
if commit:
_revert_task_from_audit(task_audit)
return task_audit
def get_task_overview_for_worker(task_id, worker):
"""
Get information about `task` and its assignment for `worker`.
Args:
task_id (int):
The ID of the desired task object.
worker (orchestra.models.Worker):
The specified worker object.
Returns:
task_assignment_details (dict):
Information about `task` and its assignment for `worker`.
"""
task = Task.objects.get(id=task_id)
task_details = get_task_details(task_id)
task_details['is_project_admin'] = worker.is_project_admin()
if task.is_worker_assigned(worker):
task_assignment = TaskAssignment.objects.get(worker=worker, task=task)
elif worker.is_project_admin():
task_assignment = assignment_history(task).last()
task_details['is_read_only'] = True
else:
raise TaskAssignmentError('Worker is not associated with task')
task_assignment_details = get_task_assignment_details(task_assignment)
task_assignment_details.update(task_details)
return task_assignment_details
A dict mapping task prerequisites onto their latest task
assignment information.
"""
# Find all prerequisite steps in graph
to_visit = list(step.creation_depends_on.all())
prerequisite_steps = set(to_visit)
while to_visit:
current_step = to_visit.pop()
for step in current_step.creation_depends_on.all():
if step not in prerequisite_steps:
to_visit.append(step)
prerequisite_steps.add(step)
prerequisite_data = {}
for step in prerequisite_steps:
required_task = Task.objects.get(step=step, project=project)
if required_task.status != Task.Status.COMPLETE:
raise TaskDependencyError('Task depenency is not satisfied')
assignment = assignment_history(required_task).last()
prerequisite_data[step.slug] = assignment.in_progress_task_data
return prerequisite_data
def staff(self, task_id,
request_cause=StaffBotRequest.RequestCause.USER.value):
"""
This function handles staffing a request for the given task_id.
"""
command = 'staff {}'.format(task_id)
try:
task = Task.objects.get(id=task_id)
required_role_counter = role_counter_required_for_new_task(task)
error_msg = None
assert_new_task_status_valid(task.status)
except TaskStatusError:
error_msg = self.staffing_is_not_allowed.format(task_id)
except Task.DoesNotExist:
error_msg = self.task_does_not_exist_error.format(task_id)
except TaskAssignmentError as error:
error_msg = self.task_assignment_error.format(task_id, error)
if error_msg is not None:
logger.exception(error_msg)
return format_slack_message(
command,
attachments=[{
'color': 'danger',
def update_todos_from_todolist_template(request):
todolist_template_slug = request.data.get('todolist_template')
task_id = request.data.get('task')
try:
add_todolist_template(todolist_template_slug, task_id)
project = Task.objects.get(id=task_id).project
todos = Todo.objects.filter(
task__project__id=int(project.id)).order_by('-created_at')
serializer = TodoSerializer(todos, many=True)
return Response(serializer.data)
except TodoListTemplate.DoesNotExist:
raise BadRequest('TodoList Template not found for the given slug.')
task_id (int):
The ID of the task to be assigned.
Returns:
task (orchestra.models.Task):
The newly assigned task.
Raises:
orchestra.core.errors.TaskAssignmentError:
The specified worker is already assigned to the given task
or the task status is not compatible with new assignment.
orchestra.core.errors.WorkerCertificationError:
The specified worker is not certified for the given task.
"""
worker = Worker.objects.get(id=worker_id)
task = Task.objects.get(id=task_id)
required_role = role_required_for_new_task(task)
required_role_counter = role_counter_required_for_new_task(task)
assignment = current_assignment(task)
if not is_worker_certified_for_task(worker, task, required_role):
raise WorkerCertificationError('Worker not certified for this task.')
if task.is_worker_assigned(worker):
raise TaskAssignmentError('Worker already assigned to this task.')
assignment_counter = task.assignments.count()
in_progress_task_data = {}
if assignment:
# In-progress task data is the latest
# submission by a previous worker
in_progress_task_data = assignment.in_progress_task_data
def worker_task_recent_todo_qas(request):
"""
Returns TodoQA recommendations for the requesting user and task.
The TodoQAs for the recommendation are selected using the following logic:
1. If the given task has TodoQAs, use them.
2. Otherwise, use the TodoQAs from the requesting user's most recent
task with matching task slug.
"""
task_id = request.query_params.get('task')
task_todo_qas = TodoQA.objects.filter(todo__task=task_id)
if task_todo_qas.exists():
todo_qas = task_todo_qas
else:
task = Task.objects.get(pk=task_id)
most_recent_worker_task_todo_qa = TodoQA.objects.filter(
todo__task__assignments__worker__user=request.user,
todo__task__step__slug=task.step.slug
).order_by('-created_at').first()
if most_recent_worker_task_todo_qa:
todo_qas = TodoQA.objects.filter(
todo__task=most_recent_worker_task_todo_qa.todo.task,
approved=False)
else:
todo_qas = TodoQA.objects.none()
todos_recommendation = {todo_qa.todo.description: TodoQASerializer(
todo_qa).data for todo_qa in todo_qas}
return Response(todos_recommendation)