Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
dax_settings = dax.DAX_Settings()
logger = setup_info_logger()
parser = configure_parser()
parsed_args = parser.parse_args()
logger.info('testing logger')
#argument checks do their own logging
if not check_arguments(logger, parsed_args):
exit(-1)
if not dax_settings.is_cluster_valid(logger):
exit(-1)
launcher = bin.read_settings(parsed_args.settings, logger, exe='patch assessors')
with XnatUtils.get_interface(launcher.xnat_host, launcher.xnat_user,
launcher.xnat_pass) as intf:
print 'intf=', intf
print 'launcher process dict=', launcher.project_process_dict
project_list = set(launcher.project_process_dict.keys())
if parsed_args.projects:
for p in project_list:
if p not in project_list:
msg = "project {} is not in the list of projects defined in {} ({})"
logger.warning(msg.format(p, parsed_args.settings, project_list))
for p in project_list:
project_processors = launcher.project_process_dict.get(p, None)
if project_processors is None:
logger.info("Procject '{}' has no processors - skipping")
"""
Main function to upload the results / PBS / OUTLOG of assessors
from the queue folder
:param upload_settings: dictionary defining the upload information
:return: None
"""
if len(os.listdir(RESULTS_DIR)) == 0:
LOGGER.warn('No data need to be uploaded.\n')
sys.exit()
warnings = list()
for upload_dict in upload_settings:
try:
with XnatUtils.get_interface(host=upload_dict['host'],
user=upload_dict['username'],
pwd=upload_dict['password']) as intf:
LOGGER.info('='*50)
proj_str = (upload_dict['projects'] if upload_dict['projects']
else 'all')
LOGGER.info('Connecting to XNAT <%s>, upload for projects:%s' %
(upload_dict['host'], proj_str))
if not XnatUtils.has_dax_datatypes(intf):
msg = 'dax datatypes are not installed on xnat <%s>.'
raise DaxUploadError(msg % (upload_dict['host']))
# 1) Upload the assessor data
# For each assessor label that need to be upload :
LOGGER.info('Uploading results for assessors')
if DAX_SETTINGS.get_use_reference():
LOGGER.info('using upload by reference, dir is:{}'.format(
import pandas
import pyxnat
from dax import XnatUtils
from dax import utilities
if len(sys.argv)!=4 :
print('Usage:')
print('python Xnatreport_assessor.py ')
sys.exit()
project = sys.argv[1]
proctype = sys.argv[2]
outfile = sys.argv[3]
xnat = XnatUtils.get_interface()
Assrs = xnat.list_project_assessors(project)
R = list()
for assr in Assrs :
if assr.get('proctype') != proctype : continue
#print(assr['assessor_label'])
# Get desired fields
thisR = {}
for key in ('project_label','subject_label','session_label','proctype',
'assessor_id','procstatus','qcstatus','assessor_label') :
thisR[key] = assr[key]
host=_host, user=_user)))
# set test object:
tests.set_tobj(test_obj)
# Make the temp dir:
if not os.path.isdir(DAX_TEST_DIR):
os.makedirs(DAX_TEST_DIR)
# Set the log of any dax function to a temp file for user:
if hide:
logfile = os.path.join(DAX_TEST_DIR, 'dax_test.log')
else:
logfile = None
log.setup_debug_logger('dax', logfile)
with XnatUtils.get_interface(host=_host, user=username) as intf:
tests.set_xnat(intf)
tests.run_test(project, sessions, nb_sess)
print((TD_END.format(nb_test=tests.get_number(),
time="%.3f" % tests.get_time(),
state=tests.get_test_state())))
if do_not_remove:
if 'OK' == tests.get_test_state()[:2]:
shutil.rmtree(DAX_TEST_DIR)
elif f_settings.endswith('.csv'):
with open(up_file, 'rb') as csvfileread:
csvreader = csv.reader(csvfileread, delimiter=',')
for index, row in (csvreader):
if len(row) < 4:
raise DaxError("error: could not read the csv row. \
Missing args. 4 needed, %s found at line %s." % (str(len(row)), str(index)))
else:
if row != DEFAULT_HEADER:
host_projs.append(dict(list(zip(DEFAULT_HEADER,
row[:4]))))
elif f_settings.endswith('.yaml'):
doc = utilities.read_yaml(f_settings)
host_projs = doc.get('settings')
else:
raise DaxError("error: doesn't recognize the file format for the \
settings file. Please use either JSON/PYTHON/CSV format.")
else: # if not file, use the environment variables and options
_host = os.environ['XNAT_HOST']
if host:
_host = host
if projects:
projects = projects.split(',')
else:
projects = []
if username:
username = username
if not password:
MSG = "Please provide the password for user <%s> on xnat(%s):"
password = getpass.getpass(prompt=MSG % (username, _host))
if not password:
raise DaxError('empty password entered')
settings file. Please use either JSON/PYTHON/CSV format.")
else: # if not file, use the environment variables and options
_host = os.environ['XNAT_HOST']
if host:
_host = host
if projects:
projects = projects.split(',')
else:
projects = []
if username:
username = username
if not password:
MSG = "Please provide the password for user <%s> on xnat(%s):"
password = getpass.getpass(prompt=MSG % (username, _host))
if not password:
raise DaxError('empty password entered')
else:
netrc_obj = DAX_Netrc()
username, password = netrc_obj.get_login(_host)
host_projs.append(dict(list(zip(DEFAULT_HEADER, [_host, username,
password,
projects]))))
return host_projs
def download_xnat_file(self, src, dst):
"""Download XNAT specific file."""
results = None
with XnatUtils.get_interface(host=self.host, user=self.user,
pwd=self.pwd) as xnat:
try:
_res, _file = src.split('/files/')
res = xnat.select(_res)
if not res.exists():
msg = 'resources specified by %s not found on XNAT.'
raise AutoSpiderError(msg % src)
except Exception:
msg = 'resources can not be checked because the path given is \
wrong for XNAT. Please check https://wiki.xnat.org/display/XNAT16/\
XNAT+REST+API+Directory for the path.'
raise AutoSpiderError(msg % src)
try:
results = res.file(_file).get(dst)
except Exception:
raise AutoSpiderError('downloading files from XNAT failed.')
return results
pwd=self.pwd) as xnat:
try:
_res, _file = src.split('/files/')
res = xnat.select(_res)
if not res.exists():
msg = 'resources specified by %s not found on XNAT.'
raise AutoSpiderError(msg % src)
except Exception:
msg = 'resources can not be checked because the path given is \
wrong for XNAT. Please check https://wiki.xnat.org/display/XNAT16/\
XNAT+REST+API+Directory for the path.'
raise AutoSpiderError(msg % src)
try:
results = res.file(_file).get(dst)
except Exception:
raise AutoSpiderError('downloading files from XNAT failed.')
return results
def download_xnat_file(self, src, dst):
"""Download XNAT specific file."""
results = None
with XnatUtils.get_interface(host=self.host, user=self.user,
pwd=self.pwd) as xnat:
try:
_res, _file = src.split('/files/')
res = xnat.select(_res)
if not res.exists():
msg = 'resources specified by %s not found on XNAT.'
raise AutoSpiderError(msg % src)
except Exception:
msg = 'resources can not be checked because the path given is \
wrong for XNAT. Please check https://wiki.xnat.org/display/XNAT16/\
XNAT+REST+API+Directory for the path.'
raise AutoSpiderError(msg % src)
try:
results = res.file(_file).get(dst)
except Exception:
raise AutoSpiderError('downloading files from XNAT failed.')
def upload_assessor_snapshots(assessor_obj, original, thumbnail):
"""
Upload the snapshots of the assessor PDF.
(both the original and the thumbnail)
:param assessor_obj: pyxnat EObject of the assessor to upload the snapshots
:param original: The original file (full size)
:param thumbnail: The thumbnail of the original file
:return: True if it uploaded OK, False if it failed.
"""
if not os.path.isfile(original) or not os.path.isfile(thumbnail):
err = "%s: original or thumbnail snapshots don't exist."
raise XnatUtilsError(err % ('upload_assessor_snapshots'))
assessor_obj.out_resource('SNAPSHOTS')\
.file(os.path.basename(thumbnail))\
.put(thumbnail, thumbnail.split('.')[1].upper(), 'THUMBNAIL',
overwrite=True,
params={"event_reason": "DAX uploading file"})
assessor_obj.out_resource('SNAPSHOTS')\
.file(os.path.basename(original))\
.put(original, original.split('.')[1].upper(), 'ORIGINAL',
overwrite=True,
params={"event_reason": "DAX uploading file"})
return True