Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
logger.info('Command starting: scale_download_file')
file_id = options.get('file_id')
local_path = options.get('local_path')
# Validate the file paths
if os.path.exists(local_path):
logger.exception('Local file already exists: %s', local_path)
sys.exit(1)
# Attempt to fetch the file model
try:
scale_file = ScaleFile.objects.get(pk=file_id)
except ScaleFile.DoesNotExist:
logger.exception('Stored file does not exist: %s', file_id)
sys.exit(1)
try:
ScaleFile.objects.download_files([FileDownload(scale_file, local_path)])
except:
logger.exception('Unknown error occurred, exit code 1 returning')
sys.exit(1)
logger.info('Command completed: scale_download_file')
if not os.path.exists(local_path):
logger.exception('Local file does not exist: %s', local_path)
sys.exit(1)
# Attempt to fetch the workspace model
try:
workspace = Workspace.objects.get(name=workspace_name)
except Workspace.DoesNotExist:
logger.exception('Workspace does not exist: %s', workspace_name)
sys.exit(1)
# Attempt to set up a file model
try:
scale_file = ScaleFile.objects.get(file_name=file_name)
except ScaleFile.DoesNotExist:
scale_file = ScaleFile()
scale_file.update_uuid(file_name)
scale_file.file_path = remote_path
try:
ScaleFile.objects.upload_files(workspace, [FileUpload(scale_file, local_path)])
except:
logger.exception('Unknown error occurred, exit code 1 returning')
sys.exit(1)
logger.info('Command completed: scale_upload_file')
upload_dir = get_job_exe_output_data_dir(job_exe.id)
upload_work_dir = get_job_exe_output_work_dir(job_exe.id)
logger.info('Cleaning up download directory')
ScaleFile.objects.cleanup_download_dir(download_dir, download_work_dir)
logger.info('Cleaning up upload directories')
workspace_ids = job_exe.job.get_job_data().get_output_workspace_ids()
for workspace in Workspace.objects.filter(id__in=workspace_ids):
logger.info('Cleaning up upload directory for workspace %s', workspace.name)
ScaleFile.objects.cleanup_upload_dir(upload_dir, upload_work_dir, workspace)
move_work_dir = os.path.join(upload_work_dir, 'move_source_file_in_workspace')
if os.path.exists(move_work_dir):
logger.info('Cleaning up work directory for moving parsed source files')
ScaleFile.objects.cleanup_move_dir(move_work_dir)
logger.info('Deleting %s', move_work_dir)
os.rmdir(move_work_dir)
delete_normal_job_exe_dir_tree(job_exe.id)
def list_impl(self, request, product_id=None):
"""Retrieves the source files for a given product id and returns it in JSON form
:param request: the HTTP GET request
:type request: :class:`rest_framework.request.Request`
:rtype: :class:`rest_framework.response.Response`
:returns: the HTTP response to send back to the user
"""
try:
ScaleFile.objects.get(id=product_id, file_type='PRODUCT')
except ScaleFile.DoesNotExist:
raise Http404
started = rest_util.parse_timestamp(request, 'started', required=False)
ended = rest_util.parse_timestamp(request, 'ended', required=False)
rest_util.check_time_range(started, ended)
time_field = rest_util.parse_string(request, 'time_field', required=False,
accepted_values=SourceFile.VALID_TIME_FIELDS)
is_parsed = rest_util.parse_bool(request, 'is_parsed', required=False)
file_name = rest_util.parse_string(request, 'file_name', required=False)
order = rest_util.parse_string_list(request, 'order', required=False)
sources = ProductFile.objects.get_product_sources(product_id, started, ended, time_field, is_parsed,
file_name, order)
local_paths = set() # Pay attention to file name collisions and update file name if needed
counter = 0
for scale_file in files:
partial = data_files[scale_file.id][1]
local_path = os.path.join(data_files[scale_file.id][0], scale_file.file_name)
while local_path in local_paths:
# Path collision, try a different file name
counter += 1
new_file_name = '%i_%s' % (counter, scale_file.file_name)
local_path = os.path.join(data_files[scale_file.id][0], new_file_name)
local_paths.add(local_path)
file_downloads.append(FileDownload(scale_file, local_path, partial))
results[scale_file.id] = local_path
ScaleFile.objects.download_files(file_downloads)
return results
"""Validates the files with the given IDs against the given file description. If invalid, a
:class:`job.configuration.data.exceptions.InvalidData` will be thrown.
:param file_ids: List of file IDs
:type file_ids: [long]
:param file_desc: The description of the required file meta-data for validation
:type file_desc: :class:`job.configuration.interface.scale_file.ScaleFileDescription`
:returns: A list of warnings discovered during validation.
:rtype: [:class:`job.configuration.data.job_data.ValidationWarning`]
:raises :class:`job.configuration.data.exceptions.InvalidData`: If any of the files are missing.
"""
warnings = []
found_ids = set()
for scale_file in ScaleFile.objects.filter(id__in=file_ids):
found_ids.add(scale_file.id)
media_type = scale_file.media_type
if not file_desc.is_media_type_allowed(media_type):
warn = ValidationWarning('media_type',
'Invalid media type for file: %i -> %s' % (scale_file.id, media_type))
warnings.append(warn)
# Check if there were any file IDs that weren't found in the query
for file_id in file_ids:
if file_id not in found_ids:
raise InvalidData('Invalid job data: Data file for ID %i does not exist' % file_id)
return warnings
product.source_sensor = entry.source_sensor if entry.source_sensor else source_sensor
product.source_collection = entry.source_collection if entry.source_collection else source_collection
product.source_task = entry.source_task if entry.source_task else source_task
# Update product model with details derived from the job_type
product.meta_data['url'] = product.url
product.meta_data['job_name'] = job_exe.job_type.name
product.meta_data['job_version'] = job_exe.job_type.get_job_version()
product.meta_data['package_version'] = job_exe.job_type.get_package_version()
products_to_save.append(FileUpload(product, entry.local_path))
return ScaleFile.objects.upload_files(workspace, products_to_save)
class ProductFile(ScaleFile):
"""Represents a product file that has been created by Scale. This is a proxy model of the
:class:`storage.models.ScaleFile` model. It has the same set of fields, but a different manager that provides
functionality specific to product files.
"""
VALID_TIME_FIELDS = ['source', 'data', 'last_modified']
@classmethod
def create(cls):
"""Creates a new product file
:returns: The new product file
:rtype: :class:`product.models.ProductFile`
"""
product_file = ProductFile()
def _get_source_file(file_name):
"""Returns an existing or new (un-saved) source file model for the given file name
:param file_name: The name of the source file
:type file_name: string
:returns: The source file model
:rtype: :class:`source.models.SourceFile`
"""
try:
src_file = SourceFile.objects.get_source_file_by_name(file_name)
except ScaleFile.DoesNotExist:
src_file = SourceFile.create() # New file
src_file.file_name = file_name
src_file.is_deleted = True
return src_file
:param jobs: The list of jobs to augment with input files.
:type jobs: [:class:`job.models.Job`]
"""
# Build a unique set of all input file identifiers
# Build a mapping of job to its input file identifiers
file_ids = set()
job_file_map = dict()
for job in jobs:
input_file_ids = job.get_job_data().get_input_file_ids()
job_file_map[job.id] = input_file_ids
file_ids.update(input_file_ids)
job.input_files = []
# Fetch all the required source files
input_files = ScaleFile.objects.filter(id__in=file_ids)
input_files = input_files.select_related('workspace').defer('workspace__json_config')
input_files = input_files.order_by('id').distinct('id')
# Build a mapping of input file identifiers to input file
input_file_map = {input_file.id: input_file for input_file in input_files}
# Update each job with source file models
for job in jobs:
input_file_ids = job_file_map[job.id]
for input_file_id in input_file_ids:
if input_file_id in input_file_map:
job.input_files.append(input_file_map[input_file_id])
local_path = os.path.join(SCALE_JOB_EXE_INPUT_PATH, 'tmp', file_name)
with open(local_path, 'w') as metadata_file:
json.dump(input_metadata, metadata_file)
try:
scale_file = ScaleFile.objects.get(file_name=file_name)
except ScaleFile.DoesNotExist:
scale_file = ScaleFile()
scale_file.update_uuid(file_name)
remote_path = self._calculate_remote_path(job_exe)
scale_file.file_path = remote_path
for workspace in workspace_models:
try:
if not input_metadata_id:
ScaleFile.objects.upload_files(workspace, [FileUpload(scale_file, local_path)])
input_metadata_id = ScaleFile.objects.get(file_name=file_name).id
data = job_exe.job.get_job_data()
data.add_file_input('INPUT_METADATA_MANIFEST', input_metadata_id)
job_exe.job.input = data.get_dict()
job_exe.job.save()
except:
continue
if not input_metadata_id:
logger.exception('Error uploading input_metadata manifest for job_exe %d' % job_exe.job.id)