Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
training_initial_job_id = int(request.query_params['training_job'])
logger.info("Creating replay_prediction task")
try:
training_initial_job = Job.objects.get(pk=training_initial_job_id)
replay_job = Job.objects.filter(pk=job_id)[0]
replay_prediction_job = duplicate_orm_row(replay_job)
replay_prediction_job.parent_job = Job.objects.filter(pk=job_id)[0]
replay_prediction_job.type = JobTypes.REPLAY_PREDICT.value
replay_prediction_job.status = JobStatuses.CREATED.value
replay_prediction_job.save()
except Job.DoesNotExist:
return Response({'error': 'Job ' + str(job_id) + ' not in database'}, status=status.HTTP_404_NOT_FOUND)
logger.info("Enqueuing replay_prediction task ID {}".format(replay_prediction_job.id))
log = import_log_from_string(request.data.decode('utf-8'))
django_rq.enqueue(replay_prediction_task, replay_prediction_job, training_initial_job, log)
serializer = JobSerializer(jobs, many=True)
return Response(serializer.data, status=status.HTTP_201_CREATED)