Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def reject_staffing_request_inquiry(request,
staffing_request_inquiry_id):
worker = Worker.objects.get(user=request.user)
try:
response = handle_staffing_response(
worker, staffing_request_inquiry_id, is_available=False)
except StaffingResponseException:
return render(request,
'communication/staffing_response_not_permitted.html',
{})
if response is None:
raise Http404
next_path = request.GET.get('next')
if next_path:
return HttpResponseRedirect(next_path)
else:
return render(request, 'communication/staffing_request_rejected.html',
{})
def start_timer(request):
worker = Worker.objects.get(user=request.user)
try:
if request.method == 'POST':
time_entry_data = load_encoded_json(request.body)
assignment_id = None
if 'assignment' in time_entry_data:
assignment_id = time_entry_data['assignment']
timer = time_tracking.start_timer(worker,
assignment_id=assignment_id)
serializer = TaskTimerSerializer(timer)
return serializer.data
except TaskAssignment.DoesNotExist:
raise BadRequest('Worker is not assigned to this task id.')
except TimerError as e:
raise BadRequest(e)
except Exception as e:
logger.exception(e)
worker_id (int):
The ID of the worker to be assigned.
assignment_id (int):
The ID of the assignment to be assigned.
Returns:
assignment (orchestra.models.TaskAssignment):
The newly assigned assignment.
Raises:
orchestra.core.errors.TaskAssignmentError:
The specified worker is already assigned to the given task.
orchestra.core.errors.WorkerCertificationError:
The specified worker is not certified for the given assignment.
"""
worker = Worker.objects.get(id=worker_id)
assignment = TaskAssignment.objects.get(id=assignment_id)
if assignment.assignment_counter > 0:
role = WorkerCertification.Role.REVIEWER
else:
role = WorkerCertification.Role.ENTRY_LEVEL
if not is_worker_certified_for_task(worker, assignment.task, role):
raise WorkerCertificationError(
'Worker not certified for this assignment.')
if assignment.task.is_worker_assigned(worker):
raise TaskAssignmentError('Worker already assigned to this task.')
assignment.worker = worker
assignment.save()
mark_worker_as_winner(worker, assignment.task,
def save_task_assignment(request):
assignment_information = load_encoded_json(request.body)
worker = Worker.objects.get(user=request.user)
try:
save_task(assignment_information['task_id'],
assignment_information['task_data'],
worker)
return {}
except Task.DoesNotExist:
raise BadRequest('No task for given id')
except TaskAssignmentError as e:
raise BadRequest(e)
def _can_email(communication_type, email):
"""
Try to get the Worker associated with the given email and query if they
want to receive a message for the given communication_type.
"""
try:
worker = Worker.objects.get(user__email=email)
comm_pref = CommunicationPreference.objects.get(
worker=worker,
communication_type=communication_type
)
return comm_pref.can_email()
except Worker.DoesNotExist:
return True
def perform_update(self, serializer):
old_todo = self.get_object()
todo = serializer.save()
sender = Worker.objects.get(
user=self.request.user).formatted_slack_username()
if old_todo.completed != todo.completed:
todo_change = 'complete' if todo.completed else 'incomplete'
elif old_todo.skipped_datetime != todo.skipped_datetime:
todo_change = 'not relevant' \
if todo.skipped_datetime else 'relevant'
else:
# When activity_log is updated, `todo_change = None`
# to avoid triggering any slack messages
todo_change = None
# To avoid Slack noise, only send updates for changed TODOs with
# depth 0 (no parent) or 1 (no grantparent).
if todo_change and \
(not (todo.parent_todo and todo.parent_todo.parent_todo)):
def get_timer(request):
worker = Worker.objects.get(user=request.user)
try:
if request.method == 'GET':
timer = time_tracking.get_timer_object(worker)
time_worked = time_tracking.get_timer_current_duration(worker)
data = TaskTimerSerializer(timer).data
if time_worked:
data['time_worked'] = str(time_worked)
return data
except Exception as e:
logger.error(e, exc_info=True)
raise e
def available_staffing_requests(request):
worker = Worker.objects.get(user=request.user)
return render(request, 'communication/available_staffing_requests.html',
{
'requests': get_available_requests(worker),
})
def specified_worker(task, username, **kwargs):
"""
Assign task to a specific person.
Args:
username (str):
Username of the worker to assign.
Returns:
task (orchestra.models.Task): The modified task object.
"""
worker = Worker.objects.get(user__username=username)
return assign_task(worker.id, task.id)
def submit_task_assignment(request):
assignment_information = load_encoded_json(request.body)
worker = Worker.objects.get(user=request.user)
command_type = assignment_information['command_type']
if command_type in ('submit', 'accept'):
iteration_status = Iteration.Status.REQUESTED_REVIEW
elif command_type == 'reject':
iteration_status = Iteration.Status.PROVIDED_REVIEW
else:
raise BadRequest('Illegal command')
try:
submit_task(assignment_information['task_id'],
assignment_information['task_data'],
iteration_status,
worker)
return {}
except TaskStatusError: