Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def handle(self, *args, **options):
"""See :meth:`django.core.management.base.BaseCommand.handle`.
This method starts the file delete process.
"""
logger.info('Command starting: scale_delete_file')
file_id = options.get('file_id')
# Attempt to fetch the file model
try:
scale_file = ScaleFile.objects.get(pk=file_id)
except ScaleFile.DoesNotExist:
logger.exception('Stored file does not exist: %s', file_id)
sys.exit(1)
try:
ScaleFile.objects.delete_files([scale_file])
except:
logger.exception('Unknown error occurred, exit code 1 returning')
sys.exit(1)
logger.info('Command completed: scale_delete_file')
def list_impl(self, request, source_id=None):
"""Retrieves the products for a given source file ID and returns them in JSON form
:param request: the HTTP GET request
:type request: :class:`rest_framework.request.Request`
:param source_id: The id of the source
:type source_id: int encoded as a string
:rtype: :class:`rest_framework.response.Response`
:returns: the HTTP response to send back to the user
"""
try:
ScaleFile.objects.get(id=source_id, file_type='SOURCE')
except ScaleFile.DoesNotExist:
raise Http404
started = rest_util.parse_timestamp(request, 'started', required=False)
ended = rest_util.parse_timestamp(request, 'ended', required=False)
rest_util.check_time_range(started, ended)
time_field = rest_util.parse_string(request, 'time_field', required=False,
accepted_values=ProductFile.VALID_TIME_FIELDS)
batch_ids = rest_util.parse_int_list(request, 'batch_id', required=False)
job_type_ids = rest_util.parse_int_list(request, 'job_type_id', required=False)
job_type_names = rest_util.parse_string_list(request, 'job_type_name', required=False)
job_type_categories = rest_util.parse_string_list(request, 'job_type_category', required=False)
job_ids = rest_util.parse_int_list(request, 'job_id', required=False)
is_operational = rest_util.parse_bool(request, 'is_operational', required=False)
is_published = rest_util.parse_bool(request, 'is_published', required=False)
# Attempt to get related recipe
# Use a localized import to make higher level application dependencies optional
try:
from recipe.models import RecipeJob
recipe_jobs = RecipeJob.objects.filter(job=job).order_by('recipe__last_modified')
recipe_jobs = recipe_jobs.select_related('recipe', 'recipe__recipe_type', 'recipe__recipe_type_rev',
'recipe__recipe_type_rev__recipe_type', 'recipe__event',
'recipe__event__rule')
job.recipes = [recipe_job.recipe for recipe_job in recipe_jobs]
except:
job.recipes = []
# Fetch all the associated input files
input_file_ids = job.get_job_data().get_input_file_ids()
input_files = ScaleFile.objects.filter(id__in=input_file_ids)
input_files = input_files.select_related('workspace', 'job_type', 'job', 'job_exe')
input_files = input_files.defer('workspace__json_config', 'job__data', 'job__results', 'job_exe__environment',
'job_exe__configuration', 'job_exe__job_metrics', 'job_exe__stdout',
'job_exe__stderr', 'job_exe__results', 'job_exe__results_manifest',
'job_type__interface', 'job_type__docker_params', 'job_type__configuration',
'job_type__error_mapping')
input_files = input_files.prefetch_related('countries')
input_files = input_files.order_by('id').distinct('id')
# Attempt to get related products
output_files = ScaleFile.objects.filter(job=job)
output_files = output_files.select_related('workspace', 'job_type', 'job', 'job_exe')
output_files = output_files.defer('workspace__json_config', 'job__data', 'job__results', 'job_exe__environment',
'job_exe__configuration', 'job_exe__job_metrics', 'job_exe__stdout',
'job_exe__stderr', 'job_exe__results', 'job_exe__results_manifest',
'job_type__interface', 'job_type__docker_params', 'job_type__configuration',
if self.request.version != 'v6':
content = 'This endpoint is supported with REST API v6+'
return Response(status=status.HTTP_400_BAD_REQUEST, data=content)
file_id = rest_util.parse_int(request, 'file_id')
try:
file_id = int(file_id)
except ValueError:
content = 'The given file_id is not valid: %i' % (file_id)
return Response(status=status.HTTP_400_BAD_REQUEST, data=content)
# Attempt to fetch the ScaleFile model
try:
source_file = ScaleFile.objects.get(id=file_id)
except ScaleFile.DoesNotExist:
content = 'No file record exists for the given file_id: %i' % (file_id)
return Response(status=status.HTTP_400_BAD_REQUEST, data=content)
# Inspect the file to ensure it will purge correctly
if source_file.file_type != 'SOURCE':
content = 'The given file_id does not correspond to a SOURCE file_type: %i' % (file_id)
return Response(status=status.HTTP_400_BAD_REQUEST, data=content)
event = TriggerEvent.objects.create_trigger_event('USER', None, {'user': 'Anonymous'}, now())
PurgeResults.objects.create(source_file_id=file_id, trigger_event=event)
CommandMessageManager().send_messages([create_purge_source_file_message(source_file_id=file_id,
trigger_id=event.id)])
return Response(status=status.HTTP_204_NO_CONTENT)
def list_impl(self, request, source_id=None):
"""Retrieves the jobs for a given source file ID and returns them in JSON form
:param request: the HTTP GET request
:type request: :class:`rest_framework.request.Request`
:param source_id: The id of the source
:type source_id: int encoded as a string
:rtype: :class:`rest_framework.response.Response`
:returns: the HTTP response to send back to the user
"""
try:
ScaleFile.objects.get(id=source_id, file_type='SOURCE')
except ScaleFile.DoesNotExist:
raise Http404
started = rest_util.parse_timestamp(request, 'started', required=False)
ended = rest_util.parse_timestamp(request, 'ended', required=False)
rest_util.check_time_range(started, ended)
statuses = rest_util.parse_string_list(request, 'status', required=False)
job_ids = rest_util.parse_int_list(request, 'job_id', required=False)
job_type_ids = rest_util.parse_int_list(request, 'job_type_id', required=False)
job_type_names = rest_util.parse_string_list(request, 'job_type_name', required=False)
job_type_categories = rest_util.parse_string_list(request, 'job_type_category', required=False)
batch_ids = rest_util.parse_int_list(request, 'batch_id', required=False)
error_categories = rest_util.parse_string_list(request, 'error_category', required=False)
include_superseded = rest_util.parse_bool(request, 'include_superseded', required=False)
# Supplemental source metadata
product.source_sensor_class = entry.source_sensor_class if entry.source_sensor_class else source_sensor_class
product.source_sensor = entry.source_sensor if entry.source_sensor else source_sensor
product.source_collection = entry.source_collection if entry.source_collection else source_collection
product.source_task = entry.source_task if entry.source_task else source_task
# Update product model with details derived from the job_type
product.meta_data['url'] = product.url
product.meta_data['job_name'] = job_exe.job_type.name
product.meta_data['job_version'] = job_exe.job_type.get_job_version()
product.meta_data['package_version'] = job_exe.job_type.get_package_version()
products_to_save.append(FileUpload(product, entry.local_path))
return ScaleFile.objects.upload_files(workspace, products_to_save)
source_file.is_parsed = False
source_file.deleted = None
source_file.parsed = None
if ingest.new_workspace:
# We need a local path to copy the file, try to get a direct path from the broker, if that fails we must
# download the file and copy from there
# TODO: a future refactor should make the brokers work off of file objects instead of paths so the extra
# download is not necessary
paths = ingest.workspace.get_file_system_paths([source_file])
if paths:
local_path = paths[0]
else:
local_path = os.path.join('/tmp', file_name)
file_download = FileDownload(source_file, local_path, False)
ScaleFile.objects.download_files([file_download])
source_file.file_path = ingest.new_file_path if ingest.new_file_path else ingest.file_path
logger.info('Copying %s in workspace %s to %s in workspace %s', ingest.file_path, ingest.workspace.name,
source_file.file_path, ingest.new_workspace.name)
file_upload = FileUpload(source_file, local_path)
ScaleFile.objects.upload_files(ingest.new_workspace, [file_upload])
elif ingest.new_file_path:
logger.info('Moving %s to %s in workspace %s', ingest.file_path, ingest.new_file_path,
ingest.workspace.name)
file_move = FileMove(source_file, ingest.new_file_path)
ScaleFile.objects.move_files([file_move])
else:
logger.info('Registering %s in workspace %s', ingest.file_path, ingest.workspace.name)
_save_source_file(source_file)
if ingest.new_workspace:
# Copied file to new workspace, so delete file in old workspace (if workspace provides local path to do so)