Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
anew['age'] = sess_id2mod[asse['session_ID']][5]
anew['last_modified'] =\
sess_id2mod[asse['session_ID']][6]
anew['last_updated'] =\
sess_id2mod[asse['session_ID']][7]
anew['resources'] = [asse['%s/out/file/label' % pfix]]
assessors_dict[key] = anew
if has_genproc_datatypes(self):
# Then add genProcData
post_uri = SE_ARCHIVE_URI
post_uri += ASSESSOR_PR_PROJ_POST_URI.format(
project=projectid, pstype=DEFAULT_DATATYPE)
assessor_list = self._get_json(post_uri)
pfix = DEFAULT_DATATYPE.lower()
for asse in assessor_list:
if asse['label']:
key = asse['label']
if assessors_dict.get(key):
res = '%s/out/file/label' % pfix
assessors_dict[key]['resources'].append(asse[res])
else:
anew = {}
anew['ID'] = asse['ID']
anew['label'] = asse['label']
anew['uri'] = asse['URI']
anew['assessor_id'] = asse['ID']
anew['assessor_label'] = asse['label']
anew['assessor_uri'] = asse['URI']
anew['project_id'] = projectid
anew['project_label'] = projectid
def get_status(self):
"""
Get the procstatus of an assessor
:return: The string of the procstatus of the assessor.
DOES_NOT_EXIST if the assessor does not exist
"""
if not self.assessor.exists():
xnat_status = DOES_NOT_EXIST
elif self.atype.lower() in [DEFAULT_DATATYPE.lower(),
DEFAULT_FS_DATATYPE.lower()]:
xnat_status = self.assessor.attrs.get('%s/procstatus'
% self.atype.lower())
else:
xnat_status = 'UNKNOWN_xsiType: %s' % self.atype
return xnat_status
assr_info['xsiType'] = self.get(xmltype).lower()
if assr_info['xsiType'].lower() == DEFAULT_FS_DATATYPE.lower():
# FreeSurfer
assr_info['procstatus'] = self.get('fs:procstatus')
assr_info['qcstatus'] = self.get('xnat:validation/status')
assr_info['qcnotes'] = self.get('xnat:validation/notes')
assr_info['version'] = self.get('fs:procversion')
assr_info['jobid'] = self.get('fs:jobid')
assr_info['jobstartdate'] = self.get('fs:jobstartdate')
assr_info['memused'] = self.get('fs:memused')
assr_info['walltimeused'] = self.get('fs:walltimeused')
assr_info['jobnode'] = self.get('fs:jobnode')
assr_info['proctype'] = 'FreeSurfer'
elif assr_info['xsiType'].lower() == DEFAULT_DATATYPE.lower():
# genProcData
assr_info['procstatus'] = self.get('proc:procstatus')
assr_info['proctype'] = self.get('proc:proctype')
assr_info['qcstatus'] = self.get('xnat:validation/status')
assr_info['qcnotes'] = self.get('xnat:validation/notes')
assr_info['version'] = self.get('proc:procversion')
assr_info['jobid'] = self.get('proc:jobid')
assr_info['jobstartdate'] = self.get('proc:jobstartdate')
assr_info['memused'] = self.get('proc:memused')
assr_info['walltimeused'] = self.get('proc:walltimeused')
assr_info['jobnode'] = self.get('proc:jobnode')
else:
msg = 'Warning:unknown xsitype for assessor: %s'
print((msg % assr_info['xsiType']))
self.assr_info_ = assr_info
def list_project_assessor_types(self, projectid):
"""
List all the assessors that you have access to based on passed project.
:param projectid: ID of a project on XNAT
:return: List of all the assessors for the project
"""
assr_types = []
if has_genproc_datatypes(self):
# genProcData
post_uri = SE_ARCHIVE_URI + '''?project={project}&xsiType={pstype}\
&columns=ID,xsiType,project,{pstype}/proctype'''
post_uri = post_uri.format(project=projectid, pstype=DEFAULT_DATATYPE)
assessor_list = self._get_json(post_uri)
proctype_field = '{}/proctype'.format(DEFAULT_DATATYPE.lower())
assr_types = set(x[proctype_field] for x in assessor_list)
if has_fs_datatypes(self):
# FreeSurfer
post_uri = SE_ARCHIVE_URI + '''?project={project}&xsiType={fstype}\
&columns=ID,xsiType,project'''
post_uri = post_uri.format(project=projectid, fstype=DEFAULT_FS_DATATYPE)
assessor_list = self._get_json(post_uri)
if len(assessor_list) > 0:
assr_types.add('FreeSurfer')
return list(assr_types)
qcstatus, then jobid.
"""
if cached_sessions:
for csess in cached_sessions:
for cassr in csess.assessors():
if cassr.label() == self.assessor_label:
pstatus = cassr.info()['procstatus']
qstatus = cassr.info()['qcstatus']
jobid = cassr.info()['jobid']
return pstatus, qstatus, jobid
if not self.assessor.exists():
xnat_status = DOES_NOT_EXIST
qcstatus = DOES_NOT_EXIST
jobid = ''
elif self.atype.lower() in [DEFAULT_DATATYPE.lower(),
DEFAULT_FS_DATATYPE.lower()]:
xnat_status, qcstatus, jobid = self.assessor.attrs.mget([
'%s/procstatus' % self.atype,
'%s/validation/status' % self.atype,
'%s/jobid' % self.atype
])
else:
xnat_status = 'UNKNOWN_xsiType: %s' % self.atype
qcstatus = 'UNKNOWN_xsiType: %s' % self.atype
jobid = ''
return xnat_status, qcstatus, jobid
def create_assessor(self, xnatsession, inputs, relabel=False):
attempts = 0
while attempts < 100:
guid = str(uuid4())
assessor = xnatsession.assessor(guid)
if not assessor.exists():
kwargs = {}
if self.xsitype.lower() == DEFAULT_FS_DATATYPE.lower():
fsversion = '{}/fsversion'.format(self.xsitype.lower())
kwargs[fsversion] = 0
elif self.xsitype.lower() == DEFAULT_DATATYPE.lower():
proctype = '{}/proctype'.format(self.xsitype.lower())
kwargs[proctype] = self.name
procversion = '{}/procversion'.format(self.xsitype.lower())
kwargs[procversion] = self.version
input_key = '{}/inputs'.format(self.xsitype.lower())
kwargs[input_key] = self._serialize_inputs(inputs)
if relabel:
_proj = assessor.parent().parent().parent().label()
_subj = assessor.parent().parent().label()
_sess = assessor.parent().label()
label = '-x-'.join([_proj, _subj, _sess, self.name, guid])
else:
label = guid
# Set creation date to today
date_key = '{}/date'.format(self.xsitype.lower())
anew['session_label'] = asse['session_label']
anew['procstatus'] = asse['%s/procstatus' % pfix]
anew['qcstatus'] = asse['%s/validation/status' % pfix]
anew['proctype'] = 'FreeSurfer'
anew['xsiType'] = asse['xsiType']
new_list.append(anew)
if has_genproc_datatypes(self):
# Then add genProcData
post_uri = ASSESSORS_URI.format(project=projectid,
subject=subjectid,
session=sessionid)
post_uri += ASSESSOR_PR_POST_URI.format(pstype=DEFAULT_DATATYPE)
assessor_list = self._get_json(post_uri)
pfix = DEFAULT_DATATYPE.lower()
for asse in assessor_list:
anew = {}
anew['ID'] = asse['ID']
anew['label'] = asse['label']
anew['uri'] = asse['URI']
anew['assessor_id'] = asse['ID']
anew['assessor_label'] = asse['label']
anew['assessor_uri'] = asse['URI']
anew['project_id'] = projectid
anew['project_label'] = projectid
anew['subject_id'] = asse['xnat:imagesessiondata/subject_id']
anew['session_id'] = asse['session_ID']
anew['session_label'] = asse['session_label']
anew['procstatus'] = asse['%s/procstatus' % pfix]
anew['proctype'] = asse['%s/proctype' % pfix]
anew['qcstatus'] = asse['%s/validation/status' % pfix]
def get_qcstatus(self):
"""
Get the qcstatus of the assessor
:return: A string of the qcstatus for the assessor if it exists.
If it does not, it returns DOES_NOT_EXIST.
The else case returns an UNKNOWN xsiType with the xsiType of the
assessor as stored on XNAT.
"""
qcstatus = ''
if not self.assessor.exists():
qcstatus = DOES_NOT_EXIST
elif self.atype.lower() in [DEFAULT_DATATYPE.lower(),
DEFAULT_FS_DATATYPE.lower()]:
qcstatus = self.assessor.attrs.get('%s/validation/status'
% self.atype)
else:
qcstatus = 'UNKNOWN_xsiType: %s' % self.atype
return qcstatus