Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
elif f_settings.endswith('.csv'):
with open(up_file, 'rb') as csvfileread:
csvreader = csv.reader(csvfileread, delimiter=',')
for index, row in (csvreader):
if len(row) < 4:
raise DaxError("error: could not read the csv row. \
Missing args. 4 needed, %s found at line %s." % (str(len(row)), str(index)))
else:
if row != DEFAULT_HEADER:
host_projs.append(dict(list(zip(DEFAULT_HEADER,
row[:4]))))
elif f_settings.endswith('.yaml'):
doc = utilities.read_yaml(f_settings)
host_projs = doc.get('settings')
else:
raise DaxError("error: doesn't recognize the file format for the \
settings file. Please use either JSON/PYTHON/CSV format.")
else: # if not file, use the environment variables and options
_host = os.environ['XNAT_HOST']
if host:
_host = host
if projects:
projects = projects.split(',')
else:
projects = []
if username:
username = username
if not password:
MSG = "Please provide the password for user <%s> on xnat(%s):"
password = getpass.getpass(prompt=MSG % (username, _host))
if not password:
raise DaxError('empty password entered')
settings file. Please use either JSON/PYTHON/CSV format.")
else: # if not file, use the environment variables and options
_host = os.environ['XNAT_HOST']
if host:
_host = host
if projects:
projects = projects.split(',')
else:
projects = []
if username:
username = username
if not password:
MSG = "Please provide the password for user <%s> on xnat(%s):"
password = getpass.getpass(prompt=MSG % (username, _host))
if not password:
raise DaxError('empty password entered')
else:
netrc_obj = DAX_Netrc()
username, password = netrc_obj.get_login(_host)
host_projs.append(dict(list(zip(DEFAULT_HEADER, [_host, username,
password,
projects]))))
return host_projs
:param projects: XNAT list of projects
:return: list of dictionaries info_dict
info_dict for the host [key:value]:
host : string for XNAT host
username : string for XNAT username
password : string for XNAT password
(can be the environment variable containing the value)
projects : list of projects to upload for the host
"""
host_projs = list()
# If settings file given, load it and use it:
if f_settings is not None:
up_file = os.path.abspath(f_settings)
if not os.path.isfile(up_file):
raise DaxError('No upload settings file found: %s' % up_file)
if f_settings.endswith('.json'):
with open(up_file) as data_file:
host_projs = json.load(data_file)
elif f_settings.endswith('.py'):
settings = imp.load_source('settings', up_file)
host_projs = settings.host_projects
elif f_settings.endswith('.csv'):
with open(up_file, 'rb') as csvfileread:
csvreader = csv.reader(csvfileread, delimiter=',')
for index, row in (csvreader):
if len(row) < 4:
raise DaxError("error: could not read the csv row. \
Missing args. 4 needed, %s found at line %s." % (str(len(row)), str(index)))
else:
if row != DEFAULT_HEADER:
host_projs.append(dict(list(zip(DEFAULT_HEADER,
def load_from_file(filepath, args, logger, singularity_imagedir=None):
"""
Check if a file exists and if it's a python file
:param filepath: path to the file to test
:return: True the file pass the test, False otherwise
"""
if args is not None:
_tmp = 'test.{}(**args)'
else:
_tmp = 'test.{}()'
if not os.path.isfile(filepath):
raise DaxError('File %s does not exists.' % filepath)
if filepath.endswith('.py'):
test = imp.load_source('test', filepath)
try:
#print('evaling:' + _tmp.format(os.path.basename(filepath)[:-3]))
return eval(_tmp.format(os.path.basename(filepath)[:-3]))
except AttributeError as e:
#print('attribute error', str(e))
pass
err = '[ERROR] Module or processor NOT FOUND in the python file {}.'
logger.error(err.format(filepath))
elif filepath.endswith('.yaml'):
return processors.load_from_yaml(
XnatUtils, filepath, args, singularity_imagedir)
def raise_yaml_error_if_no_key(doc, yaml_file, key):
"""Method to raise an execption if the key is not in the dict
:param doc: dict to check
:param yaml_file: YAMLfile path
:param key: key to search
"""
if key not in list(doc.keys()):
err = 'YAML File {} does not have {} defined. See example.'
raise DaxError(err.format(yaml_file, key))
def read_yaml_settings(yaml_file, logger):
"""
Method to read the settings yaml file and generate the launcher object.
:param yaml_file: path to yaml file defining the settings
:return: launcher object
"""
if not os.path.isfile(yaml_file):
err = 'Path not found for {}'
raise DaxError(err.format(yaml_file))
doc = utilities.read_yaml(yaml_file)
# Set Inputs from Yaml
check_default_keys(yaml_file, doc)
# Set attributes for settings
attrs = doc.get('attrs')
# Set singularity image dir
singularity_imagedir = doc.get('singularity_imagedir')
# Read modules
modulelib = doc.get('modulelib')
mods = dict()
modules = doc.get('modules', list())
class DaxXnatError(DaxError):
"""Basic exception for errors related to XNAT raised by dax."""
# DAX Spider error:
class DaxSpiderError(DaxError):
"""Basic exception for errors related to spider raised by dax."""
# DAX Processor error:
class DaxProcessorError(DaxError):
"""Basic exception for errors related to processor raised by dax."""
# dax_setup errors
class DaxSetupError(DaxError, ValueError):
"""DaxSetup exception."""
def __init__(self, message):
Exception.__init__(self, 'Error in dax setup: %s' % message)
# Launcher errors:
class DaxUploadError(DaxError):
"""Custom exception raised with dax upload."""
def __init__(self, message):
Exception.__init__(self, 'Error with dax upload: %s' % message)
# Dax netrc errors
class DaxNetrcError(netrc.NetrcParseError):
"""Basic exception for errors related to DAX_Netrc raised by dax."""
def read_settings(settings_path, logger, exe):
logger.info('Current Process ID: %s' % str(os.getpid()))
msg = 'Current Process Name: dax.bin.{}({})'
logger.info(msg.format(exe, settings_path))
# Load the settings file
logger.info('loading settings from: %s' % settings_path)
if settings_path.endswith('.py'):
settings = imp.load_source('settings', settings_path)
dax_launcher = settings.myLauncher
elif settings_path.endswith('.yaml'):
dax_launcher = read_yaml_settings(settings_path, logger)
else:
raise DaxError('Wrong type of settings file given. Please use a \
python file describing the Launcher object or a YAML file.')
# Run the updates
logger.info('running launcher, Start Time: %s' % str(datetime.now()))
return dax_launcher
'DaxSetupError', 'DaxNetrcError', 'DaxUploadError',
'XnatAuthentificationError', 'XnatUtilsError', 'XnatAccessError',
'XnatToolsError', 'XnatToolsUserError',
'ClusterLaunchException', 'ClusterCountJobsException',
'ClusterJobIDException',
'SpiderError', 'AutoSpiderError',
'AutoProcessorError']
# DAX error:
class DaxError(Exception):
"""Basic exception for errors raised by dax."""
# DAX XNAT error:
class DaxXnatError(DaxError):
"""Basic exception for errors related to XNAT raised by dax."""
# DAX Spider error:
class DaxSpiderError(DaxError):
"""Basic exception for errors related to spider raised by dax."""
# DAX Processor error:
class DaxProcessorError(DaxError):
"""Basic exception for errors related to processor raised by dax."""
# dax_setup errors
class DaxSetupError(DaxError, ValueError):
"""DaxSetup exception."""