How to use the dax.errors.XnatUtilsError function in dax

To help you get started, we’ve selected a few dax examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github VUIIS / dax / dax / XnatUtils.py View on Github external
def upload_assessor_snapshots(assessor_obj, original, thumbnail):
    """
    Upload the snapshots of the assessor PDF.
     (both the original and the thumbnail)

    :param assessor_obj: pyxnat EObject of the assessor to upload the snapshots
    :param original: The original file (full size)
    :param thumbnail: The thumbnail of the original file
    :return: True if it uploaded OK, False if it failed.

    """
    if not os.path.isfile(original) or not os.path.isfile(thumbnail):
        err = "%s: original or thumbnail snapshots don't exist."
        raise XnatUtilsError(err % ('upload_assessor_snapshots'))

    assessor_obj.out_resource('SNAPSHOTS')\
                .file(os.path.basename(thumbnail))\
                .put(thumbnail, thumbnail.split('.')[1].upper(), 'THUMBNAIL',
                     overwrite=True,
                     params={"event_reason": "DAX uploading file"})
    assessor_obj.out_resource('SNAPSHOTS')\
                .file(os.path.basename(original))\
                .put(original, original.split('.')[1].upper(), 'ORIGINAL',
                     overwrite=True,
                     params={"event_reason": "DAX uploading file"})
    return True
github VUIIS / dax / dax / XnatUtils.py View on Github external
def get_scan_status(sessions, scan_path):
    path_parts = scan_path.split('/')
    sess_label = path_parts[6]
    scan_label = path_parts[8]

    for csess in sessions:
        if csess.label() == sess_label:
            for cscan in csess.scans():
                if cscan.label() == scan_label:
                    return cscan.info()['quality']

    raise XnatUtilsError('Invalid scan path:' + scan_path)
github VUIIS / dax / dax / XnatUtils.py View on Github external
if self.folder_exists(folderpath):
            if not resource_name:
                res = os.path.basename(os.path.abspath(folderpath))
            else:
                res = resource_name
            dest = os.path.join(self.directory, res)

            try:
                shutil.copytree(folderpath, dest)
                self.print_copying_statement(res, folderpath, dest)
            # Directories are the same
            except shutil.Error as excep:
                raise XnatUtilsError('Directory not copied. Error: %s' % excep)
            # Any error saying that the directory doesn't exist
            except OSError as excep:
                raise XnatUtilsError('Directory not copied. Error: %s' % excep)
github VUIIS / dax / dax / XnatUtils.py View on Github external
else:
            if alabel:
                assessor_label = alabel
            else:
                xnat_info = [project, subject, experiment]
                if scan:
                    xnat_info.append(scan)
                xnat_info.append(proctype)
                if None in xnat_info:
                    err = 'A label is not defined for SpiderProcessHandler: %s'
                    raise XnatUtilsError(err % str(xnat_info))
                assessor_label = '-x-'.join(xnat_info)
            self.assr_handler = AssessorHandler(assessor_label)
        if not self.assr_handler.is_valid():
            err = 'SpiderProcessHandler:invalid assessor handler. Wrong label.'
            raise XnatUtilsError(err)

        # Create the upload directory
        self.directory = os.path.join(DAX_SETTINGS.get_results_dir(),
                                      self.assr_handler.assessor_label)
        # if the folder already exists : remove it
        if not os.path.exists(self.directory):
            os.mkdir(self.directory)
        else:
            # Remove files in directories
            utilities.clean_directory(self.directory)

        self.print_msg("INFO: Handling results ...")
        self.print_msg('-Creating folder %s for %s'
                       % (self.directory, self.assr_handler.assessor_label))
github VUIIS / dax / dax / XnatUtils.py View on Github external
"""
    Upload a file to a pyxnat EObject

    :param filepath: Full path to the file to upload
    :param resource_obj: pyxnat EObject to upload the file to.
                         Note this should be a resource
    :param remove: Remove the file if it exists
    :param removeall: Remove all of the files
    :param fname: save the file on disk with this value as file name
    :return: True if upload was OK, False otherwise

    """
    if not os.path.isfile(filepath):  # Check existence of the file
        err = "%s: file %s doesn't exist."
        raise XnatUtilsError(err % ('upload_file_to_obj', filepath))
    elif os.path.getsize(filepath) == 0:  # Check for empty file
        err = "%s: empty file, not uploading %s."
        raise XnatUtilsError(err % ('upload_file_to_obj', filepath))
    else:
        # Remove previous resource to upload the new one
        if removeall and resource_obj.exists():
            resource_obj.delete()
        filepath = utilities.check_image_format(filepath)
        if fname:
            filename = fname
            if filepath.endswith('.gz') and not fname.endswith('.gz'):
                filename += '.gz'
        else:
            filename = os.path.basename(filepath)
        if resource_obj.file(str(filename)).exists():
            if remove:
github VUIIS / dax / dax / XnatUtils.py View on Github external
LOGGER.debug('retry {} of {}'.format(
                    str(i + 1), str(self.xnat_retries)))
                LOGGER.debug('_exec:{}:{}'.format(method, uri))
                try:
                    result = super()._exec(
                        uri, method, body, headers, force_preemptive_auth,
                        timeout=self.xnat_timeout, **kwargs)
                    break
                except (requests.Timeout, requests.ConnectionError):
                    # Do nothing
                    LOGGER.debug('retry {} timed out'.format(str(i + 1)))
                    pass

        if result is None:
            # None of the retries worked
            raise XnatUtilsError('XNAT timeout')

        return result
github VUIIS / dax / dax / XnatUtils.py View on Github external
Upload all of the files in a folder based on the pyxnat EObject passed

    :param directory: Full path of the directory to upload
    :param resource_obj: pyxnat EObject to upload the data to
    :param resource_label: label of where you want the contents of the
     directory to be stored under on XNAT. Note this is a level down
     from resource_obj
    :param remove: Remove the file if it exists if True
    :param removeall: Remove all of the files if they exist if True
    :param extract: extract the files if it's a zip
    :return: True if upload was OK, False otherwise

    """
    if not os.path.exists(directory):
        err = '%s: directory %s does not exist.'
        raise XnatUtilsError(err % ('upload_folder_to_obj', directory))

    if resource_obj.exists():
        if removeall:
            resource_obj.delete()
        elif not remove:
            # check if any files already exists on XNAT, if yes return FALSE
            for fpath in get_files_in_folder(directory):
                if resource_obj.file(fpath).exists():
                    print(("Warning: upload_folder_to_obj in XnatUtils: file \
%s already found on XNAT. No upload. Use remove/removeall." % fpath))
                    return False

    fzip = '%s.zip' % resource_label
    initdir = os.getcwd()
    # Zip all the files in the directory
    os.chdir(directory)
github VUIIS / dax / dax / XnatUtils.py View on Github external
:param key: key from dictionary to filter using regex
    :param expressions: list of regex expressions to use (OR)
    :param full_regex: use full regex
    :return: list of items from the list_dicts that match the regex
    """
    flist = list()
    if nor:
        flist = list_dicts
    if isinstance(expressions, str):
        expressions = [expressions]
    elif isinstance(expressions, list):
        pass
    else:
        err = "Wrong type for 'expressions' in filter_list_dicts_regex: %s \
found,  or  required." % type(expressions)
        raise XnatUtilsError(err)

    for exp in expressions:
        regex = extract_exp(exp, full_regex)
        if nor:
            flist = [d for d in flist if not regex.match(d[key])]
        else:
            flist.extend([d for d in list_dicts if regex.match(d[key])])

    return flist