Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def build_input_node(self):
"""Build and connect an input node to the pipelines.
"""
import nipype.interfaces.utility as nutil
import nipype.pipeline.engine as npe
from clinica.utils.inputs import clinica_file_reader
from clinica.utils.stream import cprint
from clinica.utils.dwi import check_dwi_volume
from clinica.utils.exceptions import ClinicaBIDSError, ClinicaException
import clinica.utils.input_files as input_files
all_errors = []
try:
t1w_files = clinica_file_reader(self.subjects,
self.sessions,
self.bids_directory,
input_files.T1W_NII)
except ClinicaException as e:
all_errors.append(e)
try:
dwi_files = clinica_file_reader(self.subjects,
self.sessions,
self.bids_directory,
input_files.DWI_NII)
except ClinicaException as e:
all_errors.append(e)
# bval files
try:
bval_files = clinica_file_reader(self.subjects,
+ self._group_id + '_template.nii*',
'description': 'T1w template file of group ' + self._group_id,
'needed_pipeline': 't1-volume or t1-volume-create-dartel'})
except ClinicaException as e:
all_errors.append(e)
iterables_fwhm = self._fwhm
if not self._apply_pvc:
iterables_fwhm = [[]] * len(self.subjects)
if self._apply_pvc:
# pvc tissues input
pvc_tissues_input = []
for tissue_number in self.parameters['pvc_mask_tissues']:
try:
current_file = clinica_file_reader(self.subjects,
self.sessions,
self.caps_directory,
{'pattern': 't1/spm/segmentation/native_space/*_*_T1w_segm-'
+ tissue_names[tissue_number]
+ '_probability.nii*',
'description': 'SPM based probability of ' + tissue_names[tissue_number]
+ ' based on T1w-MRI in native space',
'needed_pipeline': 't1-volume'})
pvc_tissues_input.append(current_file)
except ClinicaException as e:
all_errors.append(e)
if len(all_errors) == 0:
pvc_tissues_input_final = []
for subject_tissue_list in zip(*pvc_tissues_input):
pvc_tissues_input_final.append(subject_tissue_list)
from clinica.utils.stream import cprint
all_errors = []
# DWI
try:
dwi_bids = clinica_file_reader(self.subjects,
self.sessions,
self.bids_directory,
input_files.DWI_NII)
except ClinicaException as e:
all_errors.append(e)
# DWI json
try:
dwi_json = clinica_file_reader(self.subjects,
self.sessions,
self.bids_directory,
input_files.DWI_JSON)
# Create list_eff_echo_spacings and list_enc_directions
list_eff_echo_spacings = []
list_enc_directions = []
for json in dwi_json:
[eff_echo_spacing, enc_direction] = utils.parameters_from_dwi_metadata(json)
list_eff_echo_spacings.append(eff_echo_spacing)
list_enc_directions.append(enc_direction)
except ClinicaException as e:
all_errors.append(e)
# bval files
input_files.DWI_PREPROC_NII)
except ClinicaException as e:
all_errors.append(e)
# B0 brainmask
try:
dwi_brainmask_files = clinica_file_reader(self.subjects,
self.sessions,
self.caps_directory,
input_files.DWI_PREPROC_BRAINMASK)
except ClinicaException as e:
all_errors.append(e)
# Preprocessed bvec
try:
bvec_files = clinica_file_reader(self.subjects,
self.sessions,
self.caps_directory,
input_files.DWI_PREPROC_BVEC)
except ClinicaException as e:
all_errors.append(e)
# Preprocessed bval
try:
bval_files = clinica_file_reader(self.subjects,
self.sessions,
self.caps_directory,
input_files.DWI_PREPROC_BVAL)
except ClinicaException as e:
all_errors.append(e)
if len(all_errors) > 0:
self.sessions,
self.caps_directory,
input_files.T1_FS_DESTRIEUX_PARC_L)
except ClinicaException as e:
all_errors.append(e)
try:
read_parameters_node.inputs.destrieux_right = clinica_file_reader(self.subjects,
self.sessions,
self.caps_directory,
input_files.T1_FS_DESTRIEUX_PARC_R)
except ClinicaException as e:
all_errors.append(e)
try:
read_parameters_node.inputs.desikan_left = clinica_file_reader(self.subjects,
self.sessions,
self.caps_directory,
input_files.T1_FS_DESIKAN_PARC_L)
except ClinicaException as e:
all_errors.append(e)
try:
read_parameters_node.inputs.desikan_right = clinica_file_reader(self.subjects,
self.sessions,
self.caps_directory,
input_files.T1_FS_DESIKAN_PARC_R)
except ClinicaException as e:
all_errors.append(e)
if len(all_errors) > 0:
error_message = 'Clinica faced errors while trying to read files in your BIDS or CAPS directories.\n'
5: 'softtissue',
6: 'background'
}
all_errors = []
read_input_node = npe.Node(name="LoadingCLIArguments",
interface=nutil.IdentityInterface(
fields=self.get_input_fields(),
mandatory_inputs=True))
# Segmented Tissues
# =================
tissues_input = []
for tissue_number in self.parameters['tissues']:
try:
current_file = clinica_file_reader(self.subjects,
self.sessions,
self.caps_directory,
{'pattern': 't1/spm/segmentation/native_space/*_*_T1w_segm-'
+ tissue_names[tissue_number] + '_probability.nii*',
'description': 'SPM based probability of ' + tissue_names[tissue_number]
+ ' based on T1w-MRI in native space',
'needed_pipeline': 't1-volume-tissue-segmentation'})
tissues_input.append(current_file)
except ClinicaException as e:
all_errors.append(e)
# Tissues_input has a length of len(self.parameters['mask_tissues']). Each of these elements has a size of
# len(self.subjects). We want the opposite : a list of size len(self.subjects) whose elements have a size of
# len(self.parameters['mask_tissues']. The trick is to iter on elements with zip(*mylist)
tissues_input_rearranged = []
for subject_tissue_list in zip(*tissues_input):
tissues_input_rearranged.append(subject_tissue_list)
from clinica.utils.inputs import clinica_file_reader
from clinica.utils.stream import cprint
from clinica.utils.dwi import check_dwi_volume
from clinica.utils.exceptions import ClinicaBIDSError, ClinicaException
import clinica.utils.input_files as input_files
all_errors = []
try:
t1w_files = clinica_file_reader(self.subjects,
self.sessions,
self.bids_directory,
input_files.T1W_NII)
except ClinicaException as e:
all_errors.append(e)
try:
dwi_files = clinica_file_reader(self.subjects,
self.sessions,
self.bids_directory,
input_files.DWI_NII)
except ClinicaException as e:
all_errors.append(e)
# bval files
try:
bval_files = clinica_file_reader(self.subjects,
self.sessions,
self.bids_directory,
input_files.DWI_BVAL)
except ClinicaException as e:
all_errors.append(e)
# bvec files
input_files.DWI_PREPROC_BRAINMASK)
except ClinicaException as e:
all_errors.append(e)
# Preprocessed bvec
try:
bvec_files = clinica_file_reader(self.subjects,
self.sessions,
self.caps_directory,
input_files.DWI_PREPROC_BVEC)
except ClinicaException as e:
all_errors.append(e)
# Preprocessed bval
try:
bval_files = clinica_file_reader(self.subjects,
self.sessions,
self.caps_directory,
input_files.DWI_PREPROC_BVAL)
except ClinicaException as e:
all_errors.append(e)
if len(all_errors) > 0:
error_message = 'Clinica faced errors while trying to read files in your BIDS or CAPS directories.\n'
for msg in all_errors:
error_message += str(msg)
raise ClinicaCAPSError(error_message)
# Check space of DWI dataset
dwi_file_spaces = [re.search('.*_space-(.*)_preproc.nii.*', file, re.IGNORECASE).group(1) for file in dwi_files]
# Return an error if all the DWI files are not in the same space
input_files.DWI_PREPROC_BRAINMASK)
except ClinicaException as e:
all_errors.append(e)
# DWI preprocessing NIfTI
try:
dwi_caps = clinica_file_reader(self.subjects,
self.sessions,
self.caps_directory,
input_files.DWI_PREPROC_NII)
except ClinicaException as e:
all_errors.append(e)
# bval files
try:
bval_files = clinica_file_reader(self.subjects,
self.sessions,
self.caps_directory,
input_files.DWI_PREPROC_BVAL)
except ClinicaException as e:
all_errors.append(e)
# bvec files
try:
bvec_files = clinica_file_reader(self.subjects,
self.sessions,
self.caps_directory,
input_files.DWI_PREPROC_BVEC)
except ClinicaException as e:
all_errors.append(e)
if len(all_errors) > 0: