Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _retrieve_files(self, data_files):
"""Retrieves the given data files and writes them to the given local directories. If no file with a given ID
exists, it will not be retrieved and returned in the results.
:param data_files: Dict with each file ID mapping to an absolute directory path for downloading and
bool indicating if job supports partial file download (True).
:type data_files: {long: type(string, bool)}
:returns: Dict with each file ID mapping to its absolute local path
:rtype: {long: string}
:raises ArchivedWorkspace: If any of the files has an archived workspace (no longer active)
:raises DeletedFile: If any of the files has been deleted
"""
file_ids = data_files.keys()
files = ScaleFile.objects.filter(id__in=file_ids)
file_downloads = []
results = {}
local_paths = set() # Pay attention to file name collisions and update file name if needed
counter = 0
for scale_file in files:
partial = data_files[scale_file.id][1]
local_path = os.path.join(data_files[scale_file.id][0], scale_file.file_name)
while local_path in local_paths:
# Path collision, try a different file name
counter += 1
new_file_name = '%i_%s' % (counter, scale_file.file_name)
local_path = os.path.join(data_files[scale_file.id][0], new_file_name)
local_paths.add(local_path)
file_downloads.append(FileDownload(scale_file, local_path, partial))
try:
recipe_type_rev.get_definition().input_interface.validate_connection(dataset_parameters)
except InvalidInterfaceConnection as ex:
# No recipe inputs match the dataset
logger.info('None of the dataset parameters matched the recipe type inputs; No recipes will be created')
self.is_prev_batch_done = True
return messages
# Get previous recipes for dataset files:
ds_files = DataSetFile.objects.get_dataset_files(dataset.id).values_list('scale_file_id', flat=True)
recipe_ids = RecipeInputFile.objects.filter(input_file_id__in=ds_files).values_list('recipe_id', flat=True)
recipe_file_ids = RecipeInputFile.objects.filter(input_file_id__in=ds_files,
recipe__recipe_type=batch.recipe_type,
recipe__recipe_type_rev=batch.recipe_type_rev).values_list('input_file_id', flat=True)
extra_files_qry = ScaleFile.objects.filter(id__in=ds_files)
recipe_count = 0
# Reprocess previous recipes
if definition.supersedes:
if len(recipe_ids) > 0:
# Create re-process messages for all recipes
recipe_qry = Recipe.objects.filter(id__in=recipe_ids).order_by('-id')
if self.current_recipe_id:
recipe_qry = recipe_qry.filter(id__lt=self.current_recipe_id)
root_recipe_ids = []
for recipe in recipe_qry.defer('input')[:MAX_RECIPE_NUM]:
root_recipe_ids.append(recipe.id)
self.current_recipe_id = recipe.id
recipe_count = len(root_recipe_ids)
def _retrieve_files(self, data_files):
"""Retrieves the given data files and writes them to the given local directories. If no file with a given ID
exists, it will not be retrieved and returned in the results.
:param data_files: Dict with each file ID mapping to an absolute directory path for downloading and
bool indicating if job supports partial file download (True).
:type data_files: {long: type(string, bool)}
:returns: Dict with each file ID mapping to its absolute local path
:rtype: {long: string}
:raises ArchivedWorkspace: If any of the files has an archived workspace (no longer active)
:raises DeletedFile: If any of the files has been deleted
"""
file_ids = data_files.keys()
files = ScaleFile.objects.filter(id__in=file_ids)
file_downloads = []
results = {}
local_paths = set() # Pay attention to file name collisions and update file name if needed
counter = 0
for scale_file in files:
partial = data_files[scale_file.id][1]
local_path = os.path.join(data_files[scale_file.id][0], scale_file.file_name)
while local_path in local_paths:
# Path collision, try a different file name
counter += 1
new_file_name = '%i_%s' % (counter, scale_file.file_name)
local_path = os.path.join(data_files[scale_file.id][0], new_file_name)
local_paths.add(local_path)
file_downloads.append(FileDownload(scale_file, local_path, partial))
elif filter_type == 'data-type':
list_of_lists = [scale_file.data_type_tags for scale_file in ScaleFile.objects.filter(id__in=param.file_ids)]
file_values = [item for sublist in list_of_lists for item in sublist]
# attempt to run condition on list, i.e. in case we're checking 'contains'
filter_success |= ALL_CONDITIONS[cond](file_values, values)
file_success = all_files
for value in file_values:
if all_files:
# attempt to run condition on individual items, if any fail we fail the filter
file_success &= ALL_CONDITIONS[cond](value, values)
else:
# attempt to run condition on individual items, if any succeed we pass the filter
file_success |= ALL_CONDITIONS[cond](value, values)
filter_success |= file_success
elif filter_type == 'meta-data':
meta_data_list = [scale_file.meta_data for scale_file in ScaleFile.objects.filter(id__in=param.file_ids)]
if 'fields' in f:
if len(f['fields']) != len(values):
logger.exception('Length of fields (%s) and values (%s) are not equal' % (f['fields'], values))
return False
file_success = all_files
for meta_data in meta_data_list:
field_success = all_fields
for field_path, value in zip(f['fields'], values):
item = _getNestedDictField(meta_data, field_path)
if all_fields:
# attempt to run condition on individual items, if any fail we fail the filter
field_success &= ALL_CONDITIONS[cond](item, value)
else:
# attempt to run condition on individual items, if any succeed we pass the filter
field_success |= ALL_CONDITIONS[cond](item, value)
if all_files:
:keyword recipe_ids: Query product files produced by a given recipe id
:type recipe_ids: list[int]
:keyword recipe_job: Query product files produced by a given recipe name
:type recipe_job: str
:keyword recipe_type_ids: Query product files produced by a given recipe types
:type recipe_type_ids: list[int]
:keyword batch_ids: Query product files produced by batches with the given identifiers.
:type batch_ids: list[int]
:param order: A list of fields to control the sort order.
:type order: list[str]
:returns: The product file query
:rtype: :class:`django.db.models.QuerySet`
"""
# Fetch a list of product files
products = ScaleFile.objects.filter(file_type='PRODUCT', has_been_published=True)
products = products.select_related('workspace', 'job_type', 'job', 'job_exe', 'recipe', 'recipe_type', 'batch')
products = products.defer('workspace__json_config', 'job__input', 'job__output', 'job_exe__environment',
'job_exe__configuration', 'job_exe__job_metrics', 'job_exe__stdout',
'job_exe__stderr', 'job_exe__results', 'job_exe__results_manifest',
'job_type__manifest', 'job_type__docker_params', 'job_type__configuration',
'job_type__error_mapping', 'recipe__input', 'recipe_type__definition',
'batch__definition')
products = products.prefetch_related('countries')
# Apply time range filtering
if started:
if time_field == 'source':
products = products.filter(source_started__gte=started)
elif time_field == 'data':
products = products.filter(data_started__gte=started)
else:
:param product_id: The unique identifier of the product.
:type product_id: int
:returns: The product with extra related attributes: sources, ancestor/descendant products.
:rtype: :class:`storage.models.ScaleFile`
:raises :class:`storage.models.ScaleFile.DoesNotExist`: If the file does not exist
"""
# Attempt to fetch the requested product
product = ScaleFile.objects.all().select_related('workspace')
product = product.get(pk=product_id, file_type='PRODUCT')
# Attempt to fetch all ancestor files
sources = []
products = []
ancestors = ScaleFile.objects.filter(descendants__descendant_id=product.id)
ancestors = ancestors.select_related('job_type', 'workspace').defer('workspace__json_config')
ancestors = ancestors.prefetch_related('countries').order_by('created')
for ancestor in ancestors:
if ancestor.file_type == 'SOURCE':
sources.append(ancestor)
elif ancestor.file_type == 'PRODUCT':
products.append(ancestor)
product.sources = sources
product.ancestor_products = products
# Attempt to fetch all descendant products
descendants = ScaleFile.objects.filter(ancestors__ancestor_id=product.id)
descendants = descendants.select_related('job_type', 'workspace').defer('workspace__json_config')
descendants = descendants.prefetch_related('countries').order_by('created')
product.descendant_products = descendants
def get_source_ancestors(self, file_ids):
"""Returns a list of the source file ancestors for the given file IDs. This will include any of the given files
that are source files themselves.
:param file_ids: The file IDs
:type file_ids: list[int]
:returns: The list of ancestor source files
:rtype: list[:class:`storage.models.ScaleFile`]
"""
potential_src_file_ids = set(file_ids)
# Get all ancestors to include as possible source files
for ancestor_link in self.filter(descendant_id__in=file_ids).iterator():
potential_src_file_ids.add(ancestor_link.ancestor_id)
return ScaleFile.objects.filter(id__in=list(potential_src_file_ids), file_type='SOURCE')