Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def mock_config():
"""Create a mock config for documentation and testing purposes."""
from . import config
filename = Path(pkgrf("mriqc", "data/config-example.toml"))
settings = loads(filename.read_text())
for sectionname, configs in settings.items():
if sectionname != "environment":
section = getattr(config, sectionname)
section.load(configs, init=False)
config.nipype.init()
config.loggers.init()
config.execution.work_dir = Path(mkdtemp())
config.execution.bids_dir = Path(pkgrf("mriqc", "data/tests/ds000005")).absolute()
config.execution.init()
yield
# Have the template id handy
tpl_id = config.workflow.template_id
# Define workflow interface
workflow = pe.Workflow(name=name)
inputnode = pe.Node(niu.IdentityInterface(fields=[
'moving_image', 'moving_mask', 'modality']), name='inputnode')
outputnode = pe.Node(niu.IdentityInterface(fields=[
'inverse_composite_transform', 'out_report']), name='outputnode')
# Spatial normalization
norm = pe.Node(RobustMNINormalization(
flavor=['testing', 'fast'][config.execution.debug],
num_threads=config.nipype.omp_nthreads,
float=config.execution.ants_float,
template=tpl_id,
template_resolution=resolution,
generate_report=True,),
name='SpatialNormalization',
# Request all MultiProc processes when ants_nthreads > n_procs
num_threads=config.nipype.omp_nthreads,
mem_gb=3)
norm.inputs.reference_mask = str(
get_template(tpl_id, resolution=resolution, desc='brain', suffix='mask'))
workflow.connect([
(inputnode, norm, [('moving_image', 'moving_image'),
('moving_mask', 'moving_mask'),
('modality', 'reference')]),
(norm, outputnode, [('inverse_composite_transform', 'inverse_composite_transform'),
('out_report', 'out_report')]),
def individual_reports(name='ReportsWorkflow'):
"""
Generate the components of the individual report.
.. workflow::
from mriqc.workflows.anatomical import individual_reports
from mriqc.testing import mock_config
with mock_config():
wf = individual_reports()
"""
from ..interfaces import PlotMosaic
from ..interfaces.reports import IndividualReport
verbose = config.execution.verbose_reports
pages = 2
extra_pages = int(verbose) * 7
workflow = pe.Workflow(name=name)
inputnode = pe.Node(niu.IdentityInterface(fields=[
'in_ras', 'brainmask', 'headmask', 'airmask', 'artmask', 'rotmask',
'segmentation', 'inu_corrected', 'noisefit', 'in_iqms',
'mni_report', 'api_id']),
name='inputnode')
mosaic_zoom = pe.Node(PlotMosaic(
out_file='plot_anat_mosaic1_zoomed.svg',
cmap='Greys_r'), name='PlotMosaicZoomed')
mosaic_noise = pe.Node(PlotMosaic(
out_file='plot_anat_mosaic2_noise.svg',
getqi2 = pe.Node(ComputeQI2(), name='ComputeQI2')
# Compute python-coded measures
measures = pe.Node(StructuralQC(), 'measures')
# Project MNI segmentation to T1 space
invt = pe.MapNode(ants.ApplyTransforms(
dimension=3, default_value=0, interpolation='Linear',
float=True),
iterfield=['input_image'], name='MNItpms2t1')
invt.inputs.input_image = [str(p) for p in get_template(
config.workflow.template_id, suffix='probseg', resolution=1,
label=['CSF', 'GM', 'WM'])]
datasink = pe.Node(IQMFileSink(
out_dir=config.execution.output_dir,
dataset=config.execution.dsname),
name='datasink', run_without_submitting=True)
def _getwm(inlist):
return inlist[-1]
workflow.connect([
(inputnode, meta, [('in_file', 'in_file')]),
(inputnode, datasink, [('in_file', 'in_file'),
(('in_file', _get_mod), 'modality')]),
(inputnode, addprov, [(('in_file', _get_mod), 'modality')]),
(meta, datasink, [('subject', 'subject_id'),
('session', 'session_id'),
('task', 'task_id'),
('acquisition', 'acq_id'),
('reconstruction', 'rec_id'),
def get(flat=False):
"""Get config as a dict."""
settings = {
"environment": environment.get(),
"execution": execution.get(),
"workflow": workflow.get(),
"nipype": nipype.get(),
}
if not flat:
return settings
return {
".".join((section, k)): v
for section, configs in settings.items()
for k, v in configs.items()
}
no_mm=True,
compress_report=False,
generate_report=True),
name="ICA", mem_gb=max(mem_gb * 5, 8))
workflow.connect([
(sanitize, melodic, [('out_file', 'in_files')]),
(skullstrip_epi, melodic, [('outputnode.out_file', 'report_mask')]),
(melodic, repwf, [('out_report', 'inputnode.ica_report')])
])
# Upload metrics
if not config.execution.no_sub:
from ..interfaces.webapi import UploadIQMs
upldwf = pe.Node(UploadIQMs(), name='UploadMetrics')
upldwf.inputs.url = config.execution.webapi_url
upldwf.inputs.strict = config.execution.upload_strict
if config.execution.webapi_port:
upldwf.inputs.port = config.execution.webapi_port
workflow.connect([
(iqmswf, upldwf, [('outputnode.out_file', 'in_iqms')]),
])
return workflow
out_file='plot_func_mean_mosaic1.svg',
cmap='Greys_r'),
name='PlotMosaicMean')
mosaic_stddev = pe.Node(PlotMosaic(
out_file='plot_func_stddev_mosaic2_stddev.svg',
cmap='viridis'), name='PlotMosaicSD')
mplots = pe.Node(niu.Merge(pages + extra_pages + int(
config.workflow.fft_spikes_detector) + int(
config.workflow.ica)), name='MergePlots')
rnode = pe.Node(IndividualReport(), name='GenerateReport')
# Link images that should be reported
dsplots = pe.Node(nio.DataSink(
base_directory=str(config.execution.output_dir),
parameterization=False),
name='dsplots', run_without_submitting=True)
workflow.connect([
(inputnode, rnode, [('in_iqms', 'in_iqms')]),
(inputnode, mosaic_mean, [('epi_mean', 'in_file')]),
(inputnode, mosaic_stddev, [('in_stddev', 'in_file')]),
(mosaic_mean, mplots, [('out_file', 'in1')]),
(mosaic_stddev, mplots, [('out_file', 'in2')]),
(bigplot, mplots, [('out_file', 'in3')]),
(mplots, rnode, [('out', 'in_plots')]),
(rnode, dsplots, [('out_file', '@html_report')]),
])
if config.workflow.fft_spikes_detector:
mosaic_spikes = pe.Node(PlotSpikes(
if config.workflow.ica:
from niworkflows.interfaces import segmentation as nws
melodic = pe.Node(nws.MELODICRPT(no_bet=True,
no_mask=True,
no_mm=True,
compress_report=False,
generate_report=True),
name="ICA", mem_gb=max(mem_gb * 5, 8))
workflow.connect([
(sanitize, melodic, [('out_file', 'in_files')]),
(skullstrip_epi, melodic, [('outputnode.out_file', 'report_mask')]),
(melodic, repwf, [('out_report', 'inputnode.ica_report')])
])
# Upload metrics
if not config.execution.no_sub:
from ..interfaces.webapi import UploadIQMs
upldwf = pe.Node(UploadIQMs(), name='UploadMetrics')
upldwf.inputs.url = config.execution.webapi_url
upldwf.inputs.strict = config.execution.upload_strict
if config.execution.webapi_port:
upldwf.inputs.port = config.execution.webapi_port
workflow.connect([
(iqmswf, upldwf, [('outputnode.out_file', 'in_iqms')]),
])
return workflow
if config.execution.participant_label is None:
config.execution.participant_label = all_subjects
participant_label = set(config.execution.participant_label)
missing_subjects = participant_label - set(all_subjects)
if missing_subjects:
parser.error(
"One or more participant labels were not found in the BIDS directory: "
f"{', '.join(missing_subjects)}."
)
config.execution.participant_label = sorted(participant_label)
# Handle analysis_level
analysis_level = set(config.workflow.analysis_level)
if not config.execution.participant_label:
analysis_level.add("group")
config.workflow.analysis_level = list(analysis_level)
# List of files to be run
bids_filters = {
"participant_label": config.execution.participant_label,
"session": config.execution.session_id,
"run": config.execution.run_id,
"task": config.execution.task_id,
"bids_type": config.execution.modalities,
}
config.workflow.inputs = {
mod: files
for mod, files in collect_bids_data(
config.execution.layout, **bids_filters
).items()
ncfg.update_config(
{
"monitoring": {
"enabled": cls.resource_monitor,
"sample_frequency": "0.5",
"summary_append": True,
}
}
)
ncfg.enable_resource_monitor()
# Nipype config (logs and execution)
ncfg.update_config(
{
"execution": {
"crashdump_dir": str(execution.log_dir),
"crashfile_format": cls.crashfile_format,
"get_linked_libs": cls.get_linked_libs,
"stop_on_first_crash": cls.stop_on_first_crash,
}