Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# ignore contains nested list with 2 elements, 2 user @id values that should be ignored
check.full_output = {'result': [], 'ignore': []}
check.brief_output = []
check.status = 'PASS'
query = ('/search/?type=User&sort=display_title'
'&field=display_title&field=contact_email&field=preferred_email&field=email')
# if check was limited to certain emails
if kwargs.get('emails'):
emails = kwargs['emails'].split(',')
for an_email in emails:
an_email = an_email.strip()
if an_email:
query += '&email=' + an_email.strip()
# get users
all_users = ff_utils.search_metadata(query, key=connection.ff_keys)
# combine all emails for each user
for a_user in all_users:
mail_fields = ['email', 'contact_email', 'preferred_email']
user_mails = []
for f in mail_fields:
if a_user.get(f):
user_mails.append(a_user[f].lower())
a_user['all_mails'] = list(set(user_mails))
# go through each combination
combs = itertools.combinations(all_users, 2)
cases = []
for comb in combs:
us1 = comb[0]
us2 = comb[1]
# is there a common email between the 2 users
def biosource_cell_line_value(connection, **kwargs):
'''
checks cell line biosources to make sure they have an associated ontology term
'''
check = CheckResult(connection, 'biosource_cell_line_value')
cell_line_types = ["primary cell", "primary cell line", "immortalized cell line",
"in vitro differentiated cells", "induced pluripotent stem cell line",
"stem cell", "stem cell derived cell line"]
biosources = ff_utils.search_metadata('search/?type=Biosource&frame=object',
key=connection.ff_keys, page_limit=200)
missing = []
for biosource in biosources:
# check if the biosource type is a cell/cell line
if biosource.get('biosource_type') and biosource.get('biosource_type') in cell_line_types:
# append if cell_line field is missing
if not biosource.get('cell_line'):
missing.append({'uuid': biosource['uuid'],
'@id': biosource['@id'],
'biosource_type': biosource.get('biosource_type'),
'description': biosource.get('description'),
'error': 'Missing cell_line metadata'})
check.full_output = missing
check.brief_output = [item['uuid'] for item in missing]
if missing:
check.status = 'WARN'
# Build the query
query = '/search/?status=uploading&status=upload failed'
# add file type
f_type = kwargs.get('file_type')
query += '&type=' + f_type
# add date
s_date = kwargs.get('start_date')
if s_date:
query += '&date_created.from=' + s_date
# add lab
lab = kwargs.get('lab_title')
if lab:
query += '&lab.display_title=' + lab
# The search
res = ff_utils.search_metadata(query, key=my_auth)
if not res:
check.summary = 'All Good!'
return check
# if there are files, make sure they are not on s3
no_s3_file = []
running = []
missing_md5 = []
not_switched_status = []
# multiple failed runs
problems = []
my_s3_util = s3Utils(env=connection.ff_env)
raw_bucket = my_s3_util.raw_file_bucket
out_bucket = my_s3_util.outfile_bucket
for a_file in res:
# lambda has a time limit (300sec), kill before it is reached so we get some results
now = datetime.utcnow()
def check_help_page_urls(connection, **kwargs):
check = CheckResult(connection, 'check_help_page_urls')
server = connection.ff_keys['server']
results = ff_utils.search_metadata('search/?type=StaticSection&q=help&status!=draft&field=body&field=options',
key=connection.ff_keys)
sections_w_broken_links = {}
for result in results:
broken_links = []
body = result.get('body', '')
urls = []
if result.get('options', {}).get('filetype') == 'md':
# look for markdown links - e.g. [text](link)
links = re.findall('\[[^\]]+\]\([^\)]+\)', body)
for link in links:
# test only link part of match (not text part, even if it looks like a link)
idx = link.index(']')
url = link[link.index('(', idx)+1:-1]
# remove these from body so body can be checked for other types of links
body = body[:body.index(link)] + body[body.index(link)+len(link):]
# looks for links starting with http (full) or / (relative) inside parentheses or brackets
check = CheckResult(connection, 'identify_files_without_qc_summary_pairs')
# must set this to be the function name of the action
check.action = 'patch_quality_metric_summary_pairs'
default_filetype = 'FileProcessed' # skip fastq
default_stati = 'released%20to%20project&status=released&status=uploaded&status=pre-release'
filetype = kwargs.get('file_type') or default_filetype
stati = 'status=' + (kwargs.get('status') or default_stati)
search_query = 'search/?type={}&{}&frame=object'.format(filetype, stati)
search_query += '&file_format.file_format=' + fileformat
addon = kwargs.get('search_add_on')
if addon is not None:
if not addon.startswith('&'):
addon = '&' + addon
search_query += addon
problem_files = []
file_hits = ff_utils.search_metadata(search_query, key=connection.ff_keys, page_limit=200)
for hit in file_hits:
if round(time.time() - t0, 2) > time_limit:
break
if hit.get('quality_metric') and not hit.get('quality_metric_summary', ''):
hit_dict = {
'accession': hit.get('accession'),
'uuid': hit.get('uuid'),
'@type': hit.get('@type'),
'upload_key': hit.get('upload_key'),
'file_format': hit.get('file_format'),
'quality_metric': hit.get('quality_metric')
}
problem_files.append(hit_dict)
check.summary = '{} files with no quality metric summary'.format(len(problem_files))
check.full_output = problem_files
if problem_files:
check.summary = check.description = "No search query provided, nothing to update."
check.status = 'PASS'
check.allow_action = False
return check
expsets_by_accession = {}
# Use all of the search queries to make a list of the ExpSets we will work on.
for query in search_queries:
# Interpolate the timestamps, if needed
query = interpolate_query_check_timestamps(connection, query, action_name, check, minutes_leeway)
# Add to base search
processed_expsets_query = "/search/?type=ExperimentSetReplicate" + query + fields_to_include
# Query the Experiment Sets
search_res = ff_utils.search_metadata(processed_expsets_query, key=connection.ff_keys)
# Collate the results into a dict of ExpSets, ordered by accession
for expset in search_res:
expsets_by_accession[ expset["accession"] ] = expset
# Get the reference files
reference_files_by_ga = get_reference_files(connection)
check.full_output['reference_files'] = reference_files_by_ga
# Collate all of the Higlass Items that need to be updated. Store them by genome assembly, then accession.
target_files_by_ga = {}
for expset_accession, expset in expsets_by_accession.items():
# Get all of the processed files. Stop if there is an error.
file_info = gather_processedfiles_for_expset(expset)
if file_info["error"]:
def compare_badges_and_messages(obj_id_dict, item_type, badge, ff_keys):
'''
Compares items that should have a given badge to items that do have the given badge.
Also compares badge messages to see if the message is the right one or needs to be updated.
Input (first argument) should be a dictionary of item's @id and the badge message it should have.
'''
search_url = 'search/?type={}&badges.badge.@id=/badges/{}/'.format(item_type, badge)
has_badge = ff_utils.search_metadata(search_url + '&frame=object', key=ff_keys)
needs_badge = {}
badge_edit = {}
badge_ok = []
remove_badge = {}
for item in has_badge:
if item['@id'] in obj_id_dict.keys():
# handle differences in badge messages
for a_badge in item['badges']:
if a_badge['badge'].endswith(badge + '/'):
if a_badge.get('messages') == obj_id_dict[item['@id']]:
badge_ok.append(item['@id'])
else:
if a_badge.get('message'):
del a_badge['message']
a_badge['messages'] = obj_id_dict[item['@id']]
badge_edit[item['@id']] = item['badges']
"&status=pre-release&status=released&status=released%20to%20project&status=uploaded")
# fastqc not properly reporting for long reads
skip_instruments = ['PromethION', 'GridION', 'MinION', 'PacBio RS II']
skip_add = "".join(['&instrument!=' + i for i in skip_instruments])
query += skip_add
# add date
s_date = kwargs.get('start_date')
if s_date:
query += '&date_created.from=' + s_date
# add lab
lab = kwargs.get('lab_title')
if lab:
query += '&lab.display_title=' + lab
# The search
res = ff_utils.search_metadata(query, key=my_auth)
if not res:
check.summary = 'All Good!'
return check
check = wfr_utils.check_runs_without_output(res, check, 'fastqc-0-11-4-1', my_auth, start)
return check